#!/usr/bin/env python3 import os import sys import json import smtplib import traceback from decimal import Decimal, InvalidOperation from datetime import datetime, timedelta, timezone from email.message import EmailMessage from pathlib import Path import mysql.connector from dotenv import load_dotenv BASE_DIR = Path("/home/def/otb_billing") load_dotenv(BASE_DIR / ".env") LOG_PATH = BASE_DIR / "logs" / "crypto_reconciliation_worker.log" TZ_UTC = timezone.utc URGENT_TO = [ "natural_gas_fitter@yahoo.ca", "support@outsidethebox.top", ] def log(msg: str) -> None: stamp = datetime.now(TZ_UTC).isoformat() LOG_PATH.parent.mkdir(parents=True, exist_ok=True) with open(LOG_PATH, "a", encoding="utf-8") as fh: fh.write(f"[{stamp}] {msg}\n") print(msg) def env(name: str, default: str = "") -> str: return os.getenv(name, default).strip() DB_CONFIG = { "host": env("OTB_BILLING_DB_HOST", "127.0.0.1"), "port": int(env("OTB_BILLING_DB_PORT", "3306")), "user": env("OTB_BILLING_DB_USER", "otb_billing"), "password": env("OTB_BILLING_DB_PASSWORD", ""), "database": env("OTB_BILLING_DB_NAME", "otb_billing"), } SMTP_CONFIG = { "host": env("SMTP_HOST"), "port": int(env("SMTP_PORT", "587") or "587"), "user": env("SMTP_USER"), "password": env("SMTP_PASS"), "from_email": env("SMTP_FROM_EMAIL", env("BUSINESS_EMAIL")), "from_name": env("SMTP_FROM_NAME", env("BUSINESS_NAME", "OTB Billing")), "use_tls": env("SMTP_USE_TLS", "1") == "1", "use_ssl": env("SMTP_USE_SSL", "0") == "1", } RPC_MAP = { "ETH": [u for u in [env("RPC_ETH_URL"), env("RPC_ETH_URL_2")] if u], "ETHO": [u for u in [env("RPC_ETHO_URL", "http://62.72.177.111:7545"), env("RPC_ETHO_URL_2", "http://192.168.0.177:6645")] if u], "ETI": [u for u in [env("RPC_ETICA_URL"), env("RPC_ETICA_URL_2")] if u], "USDC": [u for u in [env("RPC_ARB_URL"), env("RPC_ARB_URL_2")] if u], } def now_utc(): return datetime.now(TZ_UTC).replace(tzinfo=None) def d(val) -> Decimal: try: return Decimal(str(val or "0")) except (InvalidOperation, ValueError): return Decimal("0") def db(): return mysql.connector.connect(**DB_CONFIG) def send_email(to_list, subject, body, bcc_list=None): if not SMTP_CONFIG["host"] or not SMTP_CONFIG["from_email"] or not to_list: log(f"email skipped: subject={subject!r} to={to_list!r}") return False msg = EmailMessage() msg["Subject"] = subject msg["From"] = f'{SMTP_CONFIG["from_name"]} <{SMTP_CONFIG["from_email"]}>' if SMTP_CONFIG["from_name"] else SMTP_CONFIG["from_email"] msg["To"] = ", ".join(to_list) if bcc_list: msg["Bcc"] = ", ".join(bcc_list) msg.set_content(body) if SMTP_CONFIG["use_ssl"]: with smtplib.SMTP_SSL(SMTP_CONFIG["host"], SMTP_CONFIG["port"], timeout=30) as s: if SMTP_CONFIG["user"]: s.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"]) s.send_message(msg) else: with smtplib.SMTP(SMTP_CONFIG["host"], SMTP_CONFIG["port"], timeout=30) as s: if SMTP_CONFIG["use_tls"]: s.starttls() if SMTP_CONFIG["user"]: s.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"]) s.send_message(msg) return True def append_note(existing, line): existing = (existing or "").strip() if existing: return existing + "\n" + line return line def _rpc_post(rpc_url, method, params): import json import urllib.request payload = json.dumps({ "jsonrpc": "2.0", "id": 1, "method": method, "params": params, }).encode("utf-8") req = urllib.request.Request( rpc_url, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=20) as resp: data = json.loads(resp.read().decode("utf-8")) if isinstance(data, dict) and data.get("error"): raise RuntimeError(str(data["error"])) return (data or {}).get("result") def _chain_meta(symbol): symbol = str(symbol or "").upper() mapping = { "ETH": { "asset": "ETH", "network": "ethereum", "asset_type": "native", "decimals": 18, "token_contract": None, }, "ETHO": { "asset": "ETHO", "network": "etho", "asset_type": "native", "decimals": 18, "token_contract": None, }, "ETI": { "asset": "ETI", "network": "etica", "asset_type": "token", "decimals": 18, "token_contract": "0x34c61EA91bAcdA647269d4e310A86b875c09946f", }, "USDC": { "asset": "USDC", "network": "arbitrum", "asset_type": "token", "decimals": 6, "token_contract": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831", }, } return mapping.get(symbol) def _to_base_units(amount_text, decimals): amount_dec = Decimal(str(amount_text or "0")) scale = Decimal(10) ** int(decimals) return int((amount_dec * scale).quantize(Decimal("1"))) def _strip_0x(value): return str(value or "").lower().replace("0x", "") def parse_erc20_transfer_input(input_data): data = _strip_0x(input_data) if not data.startswith("a9059cbb"): return None if len(data) < 8 + 64 + 64: return None to_chunk = data[8:72] amount_chunk = data[72:136] to_addr = "0x" + to_chunk[-40:] amount_int = int(amount_chunk, 16) return { "to": to_addr, "amount": amount_int, } def fetch_tx_by_hash(rpc_url, txid): tx = _rpc_post(rpc_url, "eth_getTransactionByHash", [txid]) if not tx: return None receipt = _rpc_post(rpc_url, "eth_getTransactionReceipt", [txid]) confirmed = bool(receipt and receipt.get("blockNumber")) return { "tx": tx, "receipt": receipt, "confirmed": confirmed, } def verify_expected_transaction(row, txid): symbol = str(row.get("payment_currency") or "").upper() meta = _chain_meta(symbol) if not meta: return None rpc_urls = RPC_MAP.get(symbol, []) if not rpc_urls: return None wallet_to = str(row.get("wallet_address") or "").lower() expected_units = _to_base_units(row.get("payment_amount") or "0", meta["decimals"]) for rpc_url in rpc_urls: try: found = fetch_tx_by_hash(rpc_url, txid) if not found: continue tx = found["tx"] if meta["asset_type"] == "native": tx_to = str(tx.get("to") or "").lower() tx_value = int(tx.get("value") or "0x0", 16) if tx_to != wallet_to: continue if tx_value != expected_units: continue else: tx_to = str(tx.get("to") or "").lower() contract = str(meta["token_contract"] or "").lower() if tx_to != contract: continue parsed = parse_erc20_transfer_input(tx.get("input") or "") if not parsed: continue if str(parsed["to"]).lower() != wallet_to: continue if int(parsed["amount"]) != expected_units: continue return { "confirmed": found["confirmed"], "rpc_url": rpc_url, "asset": meta["asset"], "network": meta["network"], "received_amount_cad": row.get("expected_amount_cad") or row.get("cad_value_at_payment"), "tx": tx, "receipt": found["receipt"], } except Exception: continue return None def find_cross_asset_match(row, txid): expected_symbol = str(row.get("payment_currency") or "").upper() wallet_to = str(row.get("wallet_address") or "").lower() for symbol, rpc_urls in RPC_MAP.items(): if symbol == expected_symbol: continue meta = _chain_meta(symbol) if not meta: continue for rpc_url in rpc_urls: try: found = fetch_tx_by_hash(rpc_url, txid) if not found: continue tx = found["tx"] if meta["asset_type"] == "native": tx_to = str(tx.get("to") or "").lower() if tx_to != wallet_to: continue else: tx_to = str(tx.get("to") or "").lower() contract = str(meta["token_contract"] or "").lower() if tx_to != contract: continue parsed = parse_erc20_transfer_input(tx.get("input") or "") if not parsed: continue if str(parsed["to"]).lower() != wallet_to: continue return { "confirmed": found["confirmed"], "txid": txid, "asset": meta["asset"], "network": meta["network"], "rpc_url": rpc_url, } except Exception: continue return None def ensure_ledger_credit(conn, invoice_id, client_id, overpayment_cad, payment_id): cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT id FROM credit_ledger WHERE client_id = %s AND invoice_id = %s AND notes LIKE %s ORDER BY id DESC LIMIT 1 """, (client_id, invoice_id, f"%overpayment payment_id={payment_id}%")) row = cursor.fetchone() if row: return row["id"] ins = conn.cursor() ins.execute(""" INSERT INTO credit_ledger (client_id, invoice_id, credit_amount, credit_type, notes, created_at) VALUES (%s, %s, %s, %s, %s, %s) """, ( client_id, invoice_id, str(overpayment_cad), "manual_credit", f"auto overpayment credit payment_id={payment_id}", now_utc(), )) conn.commit() return ins.lastrowid def create_event(conn, payment_id, invoice_id, event_type, details): cur = conn.cursor() cur.execute(""" INSERT INTO payment_reconciliation_events (payment_id, invoice_id, event_type, event_status, details) VALUES (%s, %s, %s, 'open', %s) """, (payment_id, invoice_id, event_type, json.dumps(details, default=str))) conn.commit() def send_client_delay_email(row, milestone): email = (row.get("client_email") or "").strip() if not email: return invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}" txid = row.get("txid") or "(not yet captured)" asset = row.get("payment_currency") or row.get("payment_asset") or "crypto" body = ( f"Hello,\n\n" f"This is a friendly update regarding {invoice_number}.\n\n" f"Your payment has not fully completed in our system yet. " f"We are continuing to monitor it and will keep checking for up to 48 hours or until it completes.\n\n" f"Current checkpoint: {milestone}\n" f"Recorded transaction ID: {txid}\n" f"Recorded crypto: {asset}\n\n" f"If you have it handy, please reply with:\n" f"- the transaction ID\n" f"- the sending wallet address (if public)\n" f"- the crypto/network used\n\n" f"No action is required right now unless you would like us to verify those details sooner.\n\n" f"Thank you,\n" f"OTB Billing" ) send_email([email], "Payment update for your invoice", body) def send_client_success_after_alert(row): email = (row.get("client_email") or "").strip() if not email: return invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}" asset = row.get("payment_currency") or row.get("payment_asset") or "crypto" txid = row.get("txid") or "(not recorded)" body = ( f"Hello,\n\n" f"Your payment for {invoice_number} has now been confirmed successfully.\n\n" f"Crypto: {asset}\n" f"Transaction ID: {txid}\n\n" f"Thank you,\n" f"OTB Billing" ) send_email([email], "Payment confirmed for your invoice", body) def send_client_overpayment_email(row, overpayment_cad): email = (row.get("client_email") or "").strip() if not email: return invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}" amount_text = f"{overpayment_cad:.2f} CAD" body = ( f"Hello,\n\n" f"Your last invoice {invoice_number} was overpaid by {amount_text}.\n\n" f"{amount_text} will be credited to your account.\n\n" f"If this was mistaken and you would like the over payment sent back, " f"please email support@outsidethebox.top with the subject:\n" f"Overpayment return request\n\n" f"Please include any details in the body. As with other payment options, " f"the overpayment payback would go back to the address or payment source it came from after review.\n\n" f"Thank you,\n" f"OTB Billing" ) send_email([email], "Overpayment credit applied to your account", body, bcc_list=URGENT_TO) def send_urgent_crosschain(row, detected): body = ( f"Invoice: {row.get('invoice_number')}\n" f"Invoice ID: {row.get('invoice_id')}\n" f"Payment ID: {row.get('payment_id')}\n" f"Transaction ID: {detected.get('txid') or row.get('txid')}\n" f"Expected asset: {row.get('payment_currency')}\n" f"Detected asset: {detected.get('asset')}\n" f"Expected network: {row.get('payment_network')}\n" f"Detected network: {detected.get('network')}\n" ) send_email(URGENT_TO, "URGENT crosschain transaction needs resolve immedately!", body) def send_urgent_overpayment(row, overpayment_cad, received_cad): body = ( f"Invoice: {row.get('invoice_number')}\n" f"Invoice ID: {row.get('invoice_id')}\n" f"Payment ID: {row.get('payment_id')}\n" f"Transaction ID: {row.get('txid')}\n" f"Expected CAD: {d(row.get('expected_amount_cad') or row.get('cad_value_at_payment')):.2f}\n" f"Received CAD: {received_cad:.2f}\n" f"Overpayment CAD: {overpayment_cad:.2f}\n" f"Asset: {row.get('payment_currency')}\n" f"Network: {row.get('payment_network')}\n" f"Client: {row.get('client_email') or row.get('company_name') or ''}\n" ) send_email(URGENT_TO, "URGENT overpayment detected – review required", body) def mark_invoice_paid_via(conn, invoice_id, method, asset, network): cur = conn.cursor() cur.execute(""" UPDATE invoices SET paid_via_method = %s, paid_via_asset = %s, paid_via_network = %s WHERE id = %s """, (method, asset, network, invoice_id)) conn.commit() def recalc_invoice_amount_paid(conn, invoice_id): cur = conn.cursor(dictionary=True) cur.execute(""" SELECT COALESCE(SUM(cad_value_at_payment), 0) AS total_paid FROM payments WHERE invoice_id = %s AND payment_status = 'confirmed' """, (invoice_id,)) paid = d((cur.fetchone() or {}).get("total_paid")) cur.execute("SELECT total_amount FROM invoices WHERE id = %s", (invoice_id,)) inv = cur.fetchone() or {} total = d(inv.get("total_amount")) status = "paid" if paid >= total and total > 0 else "pending" upd = conn.cursor() upd.execute(""" UPDATE invoices SET amount_paid = %s, status = %s, paid_at = CASE WHEN %s = 'paid' AND paid_at IS NULL THEN %s ELSE paid_at END WHERE id = %s """, (str(paid), status, status, now_utc(), invoice_id)) conn.commit() def mark_payment_confirmed(conn, row, received_cad): cur = conn.cursor() notes = append_note(row.get("notes"), f"[reconciliation] confirmed at {now_utc().isoformat()} received_cad={received_cad}") cur.execute(""" UPDATE payments SET payment_status = 'confirmed', review_status = 'resolved', received_amount_cad = %s, last_checked_at = %s, notes = %s WHERE id = %s """, (str(received_cad), now_utc(), notes, row["payment_id"])) conn.commit() mark_invoice_paid_via(conn, row["invoice_id"], row.get("payment_method") or "crypto", row.get("payment_currency"), row.get("payment_network")) recalc_invoice_amount_paid(conn, row["invoice_id"]) def fetch_candidates(conn, daily=False): cur = conn.cursor(dictionary=True) if daily: cur.execute(""" SELECT p.id AS payment_id, p.invoice_id, p.client_id, p.payment_method, p.payment_currency, p.payment_network, p.payment_asset, p.payment_amount, p.cad_value_at_payment, p.expected_amount_cad, p.received_amount_cad, p.reference, p.wallet_address, p.payment_status, p.review_status, p.created_at, p.updated_at, p.first_seen_at, p.last_checked_at, p.alert_24_sent_at, p.alert_48_sent_at, p.success_after_alert_sent_at, p.urgent_alert_sent_at, p.overpayment_email_sent_at, p.txid, p.notes, i.invoice_number, i.status AS invoice_status, i.total_amount AS invoice_total, c.company_name, c.contact_name, c.email AS client_email FROM payments p JOIN invoices i ON i.id = p.invoice_id LEFT JOIN clients c ON c.id = p.client_id WHERE ( p.payment_method = 'crypto' OR UPPER(p.payment_currency) IN ('ETH','ETHO','ETI','USDC') ) AND p.payment_status IN ('pending', 'failed') ORDER BY p.created_at ASC """) else: cur.execute(""" SELECT p.id AS payment_id, p.invoice_id, p.client_id, p.payment_method, p.payment_currency, p.payment_network, p.payment_asset, p.payment_amount, p.cad_value_at_payment, p.expected_amount_cad, p.received_amount_cad, p.reference, p.wallet_address, p.payment_status, p.review_status, p.created_at, p.updated_at, p.first_seen_at, p.last_checked_at, p.alert_24_sent_at, p.alert_48_sent_at, p.success_after_alert_sent_at, p.urgent_alert_sent_at, p.overpayment_email_sent_at, p.txid, p.notes, i.invoice_number, i.status AS invoice_status, i.total_amount AS invoice_total, c.company_name, c.contact_name, c.email AS client_email FROM payments p JOIN invoices i ON i.id = p.invoice_id LEFT JOIN clients c ON c.id = p.client_id WHERE ( p.payment_method = 'crypto' OR UPPER(p.payment_currency) IN ('ETH','ETHO','ETI','USDC') ) AND p.payment_status IN ('pending', 'failed') ORDER BY p.created_at ASC """) return cur.fetchall() def process_one(conn, row): payment_id = row["payment_id"] invoice_id = row["invoice_id"] expected_cad = d(row.get("expected_amount_cad") or row.get("cad_value_at_payment") or row.get("invoice_total")) created_at = row.get("created_at") or now_utc() age = now_utc() - created_at cur = conn.cursor() cur.execute(""" UPDATE payments SET first_seen_at = COALESCE(first_seen_at, %s), last_checked_at = %s WHERE id = %s """, (created_at, now_utc(), payment_id)) conn.commit() txid = (row.get("txid") or "").strip() verified = verify_expected_transaction(row, txid) if txid else None if txid and not verified and row.get("payment_status") == "failed": cur.execute(""" UPDATE payments SET payment_status = 'pending', review_status = 'awaiting_recheck', review_notes = CONCAT( COALESCE(review_notes, ''), %s ), last_checked_at = %s WHERE id = %s """, ( "\n[rpc recheck] tx present but not confirmed on configured RPC pool; preserving as pending for retry", now_utc(), payment_id )) conn.commit() if txid and not verified and row.get("payment_status") == "failed": cur.execute(""" UPDATE payments SET payment_status = 'pending', review_status = 'awaiting_recheck', review_notes = CONCAT( COALESCE(review_notes, ''), %s ), last_checked_at = %s WHERE id = %s """, ( "\n[rpc recheck] tx present but not confirmed on configured RPC pool; preserving as pending for retry", now_utc(), payment_id )) conn.commit() if verified and verified.get("confirmed"): received_cad = d(verified.get("received_amount_cad") or expected_cad) detected_asset = (verified.get("asset") or row.get("payment_currency") or "").upper() detected_network = (verified.get("network") or row.get("payment_network") or "").lower() expected_asset = (row.get("payment_currency") or "").upper() expected_network = (row.get("payment_network") or "").lower() if (detected_asset and expected_asset and detected_asset != expected_asset) or ( detected_network and expected_network and detected_network != expected_network ): if not row.get("urgent_alert_sent_at"): send_urgent_crosschain(row, verified) create_event(conn, payment_id, invoice_id, "crosschain_mismatch", { "expected_asset": expected_asset, "detected_asset": detected_asset, "expected_network": expected_network, "detected_network": detected_network, "txid": txid, }) cur.execute(""" UPDATE payments SET review_status = 'crosschain_mismatch', urgent_alert_sent_at = %s WHERE id = %s """, (now_utc(), payment_id)) conn.commit() return "flagged" mark_payment_confirmed(conn, row, received_cad) overpayment_cad = received_cad - expected_cad if overpayment_cad > Decimal("0"): ensure_ledger_credit(conn, invoice_id, row["client_id"], overpayment_cad, payment_id) create_event(conn, payment_id, invoice_id, "overpayment", { "expected_cad": str(expected_cad), "received_cad": str(received_cad), "overpayment_cad": str(overpayment_cad), }) if not row.get("urgent_alert_sent_at"): send_urgent_overpayment(row, overpayment_cad, received_cad) cur.execute(""" UPDATE payments SET urgent_alert_sent_at = %s, review_status = 'overpayment_pending_review' WHERE id = %s """, (now_utc(), payment_id)) conn.commit() if overpayment_cad > Decimal("1.00") and not row.get("overpayment_email_sent_at"): send_client_overpayment_email(row, overpayment_cad) cur.execute(""" UPDATE payments SET overpayment_email_sent_at = %s WHERE id = %s """, (now_utc(), payment_id)) conn.commit() if (row.get("alert_24_sent_at") or row.get("alert_48_sent_at")) and not row.get("success_after_alert_sent_at"): send_client_success_after_alert(row) cur.execute(""" UPDATE payments SET success_after_alert_sent_at = %s WHERE id = %s """, (now_utc(), payment_id)) conn.commit() return "resolved" cross = find_cross_asset_match(row, txid) if cross and not row.get("urgent_alert_sent_at"): send_urgent_crosschain(row, cross) create_event(conn, payment_id, invoice_id, "crosschain_mismatch", cross) cur.execute(""" UPDATE payments SET review_status = 'crosschain_mismatch', urgent_alert_sent_at = %s, last_checked_at = %s WHERE id = %s """, (now_utc(), now_utc(), payment_id)) conn.commit() return "flagged" if age >= timedelta(hours=48) and not row.get("alert_48_sent_at"): stamp = now_utc() send_client_delay_email(row, "48 hours") cur.execute(""" UPDATE payments SET alert_48_sent_at = %s, alert_24_sent_at = COALESCE(alert_24_sent_at, %s), review_status = 'awaiting_recheck' WHERE id = %s """, (stamp, stamp, payment_id)) conn.commit() return "notified" if age >= timedelta(hours=24) and not row.get("alert_24_sent_at") and not row.get("alert_48_sent_at"): send_client_delay_email(row, "24 hours") cur.execute(""" UPDATE payments SET alert_24_sent_at = %s, review_status = 'watching_24h' WHERE id = %s """, (now_utc(), payment_id)) conn.commit() return "notified" return "scanned" def main(): run_mode = "daily" if "--daily" in sys.argv else "interval" conn = db() ins = conn.cursor() ins.execute(""" INSERT INTO crypto_reconciliation_runs (run_mode, started_at) VALUES (%s, %s) """, (run_mode, now_utc())) conn.commit() run_id = ins.lastrowid scanned = resolved = flagged = 0 try: rows = fetch_candidates(conn, daily=(run_mode == "daily")) for row in rows: scanned += 1 result = process_one(conn, row) if result == "resolved": resolved += 1 elif result == "flagged": flagged += 1 upd = conn.cursor() upd.execute(""" UPDATE crypto_reconciliation_runs SET finished_at = %s, scanned_count = %s, resolved_count = %s, flagged_count = %s, notes = %s WHERE id = %s """, (now_utc(), scanned, resolved, flagged, f"mode={run_mode}", run_id)) conn.commit() log(f"crypto reconciliation complete mode={run_mode} scanned={scanned} resolved={resolved} flagged={flagged}") except Exception as e: msg = f"crypto reconciliation error mode={run_mode}: {e}\n{traceback.format_exc()}" log(msg) upd = conn.cursor() upd.execute(""" UPDATE crypto_reconciliation_runs SET finished_at = %s, scanned_count = %s, resolved_count = %s, flagged_count = %s, notes = %s WHERE id = %s """, (now_utc(), scanned, resolved, flagged, msg[:60000], run_id)) conn.commit() raise finally: conn.close() if __name__ == "__main__": main()