from __future__ import annotations import os, sqlite3, csv, io, json, datetime as dt from typing import Optional, Dict, Any from flask import Flask, g, request, redirect, url_for, render_template, send_file, jsonify app = Flask(__name__) # ----- Config ----- DB_PATH = os.environ.get("SPLITBUDDY_DB", "splitbuddy.db") CURRENCY = os.environ.get("SPLITBUDDY_CURRENCY", "₪") PERSON_A = os.environ.get("SPLITBUDDY_ME", "Me") # you PERSON_B = os.environ.get("SPLITBUDDY_ROOMIE", "Idan") # roommate DEFAULT_ACTOR = os.environ.get("SPLITBUDDY_ACTOR", "anon") # who is using this device (optional) DEFAULT_A_SHARE_PCT = os.environ.get("SPLITBUDDY_DEFAULT_A_SHARE_PCT", 66.6667) # <-- you pay 2/3 by default WEBAPP_PORT = os.environ.get("SPLITBUDDY_WEBAPP_PORT", 42069) # ----- Template filters ----- @app.template_filter('human_time') def human_time(value: Any) -> str: """Format ISO-ish timestamps into a concise human-readable string. Accepts values like '2025-09-14T18:37' or '2025-09-14T18:37:00Z'. """ if not value: return "" try: if isinstance(value, dt.datetime): d = value else: s = str(value).strip() # fromisoformat doesn't accept trailing 'Z' (UTC); strip it if present if s.endswith('Z'): s = s[:-1] d = dt.datetime.fromisoformat(s) return d.strftime("%b %d, %Y %H:%M") except Exception: return str(value) # ----- DB helpers ----- def get_db() -> sqlite3.Connection: if "db" not in g: g.db = sqlite3.connect(DB_PATH) g.db.row_factory = sqlite3.Row return g.db @app.teardown_appcontext def close_db(_=None): db = g.pop("db", None) if db is not None: db.close() def init_db(): db = get_db() # entries db.execute(""" CREATE TABLE IF NOT EXISTS entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, kind TEXT NOT NULL DEFAULT 'bill', -- 'bill' | 'transfer' total REAL NOT NULL DEFAULT 0, payer TEXT NOT NULL DEFAULT 'A', -- 'A' or 'B' a_share REAL NOT NULL DEFAULT 0.666667, -- your share (0..1) for bills method TEXT NOT NULL DEFAULT 'cash', note TEXT ) """) # audits db.execute(""" CREATE TABLE IF NOT EXISTS audits ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, -- UTC ISO timestamp action TEXT NOT NULL, -- 'add' | 'edit' | 'delete' entry_id INTEGER, -- affected entry id (may be null for failures) actor TEXT, -- who (env/config or future auth) ip TEXT, user_agent TEXT, device TEXT, -- parsed (macOS, Windows, iPhone, Android, etc.) old_row TEXT, -- JSON of previous row (if any) new_row TEXT -- JSON of new row (if any) ) """) # (best-effort) migrations, ignore if exists for ddl in [ "ALTER TABLE entries ADD COLUMN kind TEXT NOT NULL DEFAULT 'bill'", "ALTER TABLE entries ADD COLUMN total REAL", "ALTER TABLE entries ADD COLUMN payer TEXT", "ALTER TABLE entries ADD COLUMN a_share REAL", ]: try: db.execute(ddl) except sqlite3.OperationalError: pass db.execute(""" UPDATE entries SET kind = COALESCE(kind, 'bill'), total = COALESCE(total, 0), payer = COALESCE(payer, 'A'), a_share= COALESCE(a_share, 0.666667) """) db.commit() # ----- Utility & models ----- def _now_local_iso_min() -> str: return dt.datetime.now().replace(second=0, microsecond=0).isoformat(timespec="minutes") def _now_utc_iso() -> str: return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" def _client_ip() -> str: # works behind simple proxy too xff = request.headers.get("X-Forwarded-For") return xff.split(",")[0].strip() if xff else (request.remote_addr or "") def _guess_device(ua: str) -> str: u = (ua or "").lower() if "iphone" in u: return "iPhone" if "ipad" in u: return "iPad" if "android" in u: return "Android" if "mac os x" in u or "macintosh" in u: return "macOS" if "windows" in u: return "Windows" if "linux" in u and "android" not in u: return "Linux" return "Unknown" class ByMethod: def __init__(self, cash=0.0, transfer=0.0, other=0.0): self.cash = cash; self.transfer = transfer; self.other = other class Summary: def __init__(self, total: float, count: int, latest: Optional[str], by_method: ByMethod): self.total = total; self.count = count; self.latest = latest; self.by_method = by_method def _delta_for_entry(kind: str, total: float, payer: str, a_share: float) -> float: """ Positive => YOU owe Idan. Negative => Idan owes YOU. - bill: delta = (your_share * total) - (total if YOU paid else 0) - transfer: delta = -total if YOU sent; +total if Idan sent """ if kind == "transfer": return -total if payer == "A" else total paid_by_a = total if payer == "A" else 0.0 return a_share * total - paid_by_a def _row_to_dict(row: sqlite3.Row | Dict[str, Any] | None) -> Dict[str, Any] | None: if row is None: return None return dict(row) def _audit(action: str, entry_id: Optional[int], old_row: Optional[dict], new_row: Optional[dict]): db = get_db() ua = request.headers.get("User-Agent", "") db.execute(""" INSERT INTO audits (ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( _now_utc_iso(), action, entry_id, DEFAULT_ACTOR, _client_ip(), ua, _guess_device(ua), json.dumps(old_row, ensure_ascii=False) if old_row is not None else None, json.dumps(new_row, ensure_ascii=False) if new_row is not None else None, )) db.commit() # ----- Routes ----- @app.before_request def _ensure_db(): init_db() @app.get("/") def index(): db = get_db() rows = db.execute(""" SELECT id, created_at, kind, total, payer, a_share, method, note FROM entries ORDER BY datetime(created_at) DESC, id DESC """).fetchall() entries = [] for r in rows: e = dict(r) e["delta"] = _delta_for_entry(e["kind"], e["total"], e["payer"], e["a_share"]) entries.append(e) total_balance = sum(e["delta"] for e in entries) if entries else 0.0 latest = entries[0]["created_at"] if entries else None bm = ByMethod( cash=sum(e["delta"] for e in entries if e["method"] == "cash"), transfer=sum(e["delta"] for e in entries if e["method"] == "transfer"), other=sum(e["delta"] for e in entries if e["method"] == "other"), ) summary = Summary(total=total_balance, count=len(entries), latest=latest, by_method=bm) return render_template( "index.html", entries=entries, summary=summary, A=PERSON_A, B=PERSON_B, currency=CURRENCY, now_local=_now_local_iso_min(), default_a_share_pct=DEFAULT_A_SHARE_PCT, ) @app.post("/add") def add(): kind = (request.form.get("kind") or "bill").strip().lower() payer = (request.form.get("payer") or "A").strip().upper() total = request.form.get("total", type=float) a_share_pct = request.form.get("a_share_pct", type=float) method = (request.form.get("method") or "cash").strip().lower() note = (request.form.get("note") or "").strip() created_at = request.form.get("created_at") or _now_local_iso_min() if total is None or total < 0: return redirect(url_for("index")) if payer not in ("A","B"): payer = "A" if kind not in ("bill","transfer"): kind = "bill" if kind == "bill": if a_share_pct is None: a_share_pct = DEFAULT_A_SHARE_PCT a_share = max(0.0, min(100.0, a_share_pct)) / 100.0 else: a_share = DEFAULT_A_SHARE_PCT / 100.0 # stored but ignored for transfers db = get_db() cur = db.execute( "INSERT INTO entries (created_at, kind, total, payer, a_share, method, note) VALUES (?, ?, ?, ?, ?, ?, ?)", (created_at, kind, total, payer, a_share, method, note), ) db.commit() new_id = cur.lastrowid new_row = _row_to_dict(db.execute("SELECT * FROM entries WHERE id = ?", (new_id,)).fetchone()) _audit("add", new_id, old_row=None, new_row=new_row) return redirect(url_for("index")) @app.post("/delete/") def delete(entry_id: int): db = get_db() old = db.execute("SELECT * FROM entries WHERE id = ?", (entry_id,)).fetchone() db.execute("DELETE FROM entries WHERE id = ?", (entry_id,)) db.commit() _audit("delete", entry_id, old_row=_row_to_dict(old), new_row=None) return redirect(url_for("index")) # Minimal edit endpoint (POST-only). Accepts same fields as /add. @app.post("/edit/") def edit(entry_id: int): db = get_db() old = db.execute("SELECT * FROM entries WHERE id = ?", (entry_id,)).fetchone() if not old: _audit("edit", entry_id, old_row=None, new_row=None) return redirect(url_for("index")) kind = (request.form.get("kind") or old["kind"]).strip().lower() payer = (request.form.get("payer") or old["payer"]).strip().upper() total = request.form.get("total", type=float) a_share_pct = request.form.get("a_share_pct", type=float) method = (request.form.get("method") or old["method"]).strip().lower() note = (request.form.get("note") or old["note"] or "").strip() created_at = request.form.get("created_at") or old["created_at"] if total is None: total = float(old["total"]) if payer not in ("A","B"): payer = old["payer"] if kind not in ("bill","transfer"): kind = old["kind"] if kind == "bill": if a_share_pct is None: a_share = float(old["a_share"]) else: a_share = max(0.0, min(100.0, a_share_pct)) / 100.0 else: a_share = float(old["a_share"]) # ignored but stored db.execute(""" UPDATE entries SET created_at = ?, kind = ?, total = ?, payer = ?, a_share = ?, method = ?, note = ? WHERE id = ? """, (created_at, kind, total, payer, a_share, method, note, entry_id)) db.commit() new = db.execute("SELECT * FROM entries WHERE id = ?", (entry_id,)).fetchone() _audit("edit", entry_id, old_row=_row_to_dict(old), new_row=_row_to_dict(new)) return redirect(url_for("index")) @app.get("/export.csv") def export_csv(): db = get_db() rows = db.execute(""" SELECT id, created_at, kind, total, payer, a_share, method, note FROM entries ORDER BY datetime(created_at) DESC, id DESC """).fetchall() buff = io.StringIO() w = csv.writer(buff) w.writerow(["id","created_at","kind","payer","a_share_pct","total","method","note","delta"]) for r in rows: a_share_pct = float(r["a_share"]) * 100.0 delta = _delta_for_entry(r["kind"], r["total"], r["payer"], r["a_share"]) w.writerow([r["id"], r["created_at"], r["kind"], r["payer"], f"{a_share_pct:.2f}", f"{r['total']:.2f}", r["method"], r["note"] or "", f"{delta:.2f}"]) buff.seek(0) return send_file(io.BytesIO(buff.read().encode("utf-8")), mimetype="text/csv", as_attachment=True, download_name="splitbuddy_export.csv") # ----- Audit viewers ----- @app.get("/audit") def audit_html(): db = get_db() # --- filters from query params --- entry_id = request.args.get("entry_id", type=int) action = request.args.get("action", type=str) # add|edit|delete actor = request.args.get("actor", type=str) device = request.args.get("device", type=str) q = request.args.get("q", type=str) # substring search in old/new JSON page = max(1, request.args.get("page", default=1, type=int)) per_page = min(200, request.args.get("per_page", default=50, type=int)) offset = (page - 1) * per_page # --- build WHERE clause dynamically --- wh, params = [], [] if entry_id is not None: wh.append("entry_id = ?"); params.append(entry_id) if action: wh.append("action = ?"); params.append(action) if actor: wh.append("actor LIKE ?"); params.append(f"%{actor}%") if device: wh.append("device = ?"); params.append(device) if q: wh.append("(COALESCE(old_row,'') LIKE ? OR COALESCE(new_row,'') LIKE ? OR COALESCE(user_agent,'') LIKE ?)") params.extend([f"%{q}%", f"%{q}%", f"%{q}%"]) where_sql = ("WHERE " + " AND ".join(wh)) if wh else "" # total count for pagination total = db.execute(f"SELECT COUNT(*) FROM audits {where_sql}", params).fetchone()[0] rows = db.execute(f""" SELECT id, ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row FROM audits {where_sql} ORDER BY datetime(ts) DESC, id DESC LIMIT ? OFFSET ? """, (*params, per_page, offset)).fetchall() # pretty JSON (server-side) to avoid giant lines def pretty(s): if not s: return "" try: obj = json.loads(s) return json.dumps(obj, ensure_ascii=False, indent=2) except Exception: return s # enrich rows for template data = [] for r in rows: d = dict(r) d["old_pretty"] = pretty(d.get("old_row")) d["new_pretty"] = pretty(d.get("new_row")) data.append(d) # distinct devices/actions for filter dropdowns devices = [r[0] for r in db.execute("SELECT DISTINCT device FROM audits WHERE device IS NOT NULL ORDER BY device").fetchall()] actions = ["add", "edit", "delete"] return render_template( "audit.html", rows=data, total=total, page=page, per_page=per_page, pages=max(1, (total + per_page - 1) // per_page), # current filters f_entry_id=entry_id, f_action=action, f_actor=actor, f_device=device, f_q=q, devices=devices, actions=actions ) @app.get("/audit.csv") def audit_csv(): db = get_db() rows = db.execute(""" SELECT id, ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row FROM audits ORDER BY datetime(ts) DESC, id DESC """).fetchall() buff = io.StringIO() w = csv.writer(buff) w.writerow(["id","ts","action","entry_id","actor","ip","device","user_agent","old_row","new_row"]) for r in rows: w.writerow([r["id"], r["ts"], r["action"], r["entry_id"], r["actor"], r["ip"], r["device"], r["user_agent"], r["old_row"] or "", r["new_row"] or ""]) buff.seek(0) return send_file(io.BytesIO(buff.read().encode("utf-8")), mimetype="text/csv", as_attachment=True, download_name="splitbuddy_audit.csv") @app.get("/api/audit") def audit_api(): db = get_db() rows = db.execute(""" SELECT id, ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row FROM audits ORDER BY datetime(ts) DESC, id DESC LIMIT 200 """).fetchall() data = [dict(r) for r in rows] return jsonify(data) # ----- Main ----- if __name__ == "__main__": os.makedirs(os.path.dirname(DB_PATH) or ".", exist_ok=True) with app.app_context(): init_db() app.run(debug=True, port=WEBAPP_PORT)