billing frontend for mariadb. setup as otb_billing for outsidethebox.top accounting. also involved with outsidethedb
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

812 lines
28 KiB

#!/usr/bin/env python3
import os
import sys
import json
import smtplib
import traceback
from decimal import Decimal, InvalidOperation
from datetime import datetime, timedelta, timezone
from email.message import EmailMessage
from pathlib import Path
import mysql.connector
from dotenv import load_dotenv
BASE_DIR = Path("/home/def/otb_billing")
load_dotenv(BASE_DIR / ".env")
LOG_PATH = BASE_DIR / "logs" / "crypto_reconciliation_worker.log"
TZ_UTC = timezone.utc
URGENT_TO = [
"natural_gas_fitter@yahoo.ca",
"support@outsidethebox.top",
]
def log(msg: str) -> None:
stamp = datetime.now(TZ_UTC).isoformat()
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(f"[{stamp}] {msg}\n")
print(msg)
def env(name: str, default: str = "") -> str:
return os.getenv(name, default).strip()
DB_CONFIG = {
"host": env("OTB_BILLING_DB_HOST", "127.0.0.1"),
"port": int(env("OTB_BILLING_DB_PORT", "3306")),
"user": env("OTB_BILLING_DB_USER", "otb_billing"),
"password": env("OTB_BILLING_DB_PASSWORD", ""),
"database": env("OTB_BILLING_DB_NAME", "otb_billing"),
}
SMTP_CONFIG = {
"host": env("SMTP_HOST"),
"port": int(env("SMTP_PORT", "587") or "587"),
"user": env("SMTP_USER"),
"password": env("SMTP_PASS"),
"from_email": env("SMTP_FROM_EMAIL", env("BUSINESS_EMAIL")),
"from_name": env("SMTP_FROM_NAME", env("BUSINESS_NAME", "OTB Billing")),
"use_tls": env("SMTP_USE_TLS", "1") == "1",
"use_ssl": env("SMTP_USE_SSL", "0") == "1",
}
RPC_MAP = {
"ETH": [u for u in [env("RPC_ETH_URL"), env("RPC_ETH_URL_2")] if u],
"ETHO": [u for u in [env("RPC_ETHO_URL", "http://62.72.177.111:7545"), env("RPC_ETHO_URL_2", "http://192.168.0.177:6645")] if u],
"ETI": [u for u in [env("RPC_ETICA_URL"), env("RPC_ETICA_URL_2")] if u],
"USDC": [u for u in [env("RPC_ARB_URL"), env("RPC_ARB_URL_2")] if u],
}
def now_utc():
return datetime.now(TZ_UTC).replace(tzinfo=None)
def d(val) -> Decimal:
try:
return Decimal(str(val or "0"))
except (InvalidOperation, ValueError):
return Decimal("0")
def db():
return mysql.connector.connect(**DB_CONFIG)
def send_email(to_list, subject, body, bcc_list=None):
if not SMTP_CONFIG["host"] or not SMTP_CONFIG["from_email"] or not to_list:
log(f"email skipped: subject={subject!r} to={to_list!r}")
return False
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = f'{SMTP_CONFIG["from_name"]} <{SMTP_CONFIG["from_email"]}>' if SMTP_CONFIG["from_name"] else SMTP_CONFIG["from_email"]
msg["To"] = ", ".join(to_list)
if bcc_list:
msg["Bcc"] = ", ".join(bcc_list)
msg.set_content(body)
if SMTP_CONFIG["use_ssl"]:
with smtplib.SMTP_SSL(SMTP_CONFIG["host"], SMTP_CONFIG["port"], timeout=30) as s:
if SMTP_CONFIG["user"]:
s.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"])
s.send_message(msg)
else:
with smtplib.SMTP(SMTP_CONFIG["host"], SMTP_CONFIG["port"], timeout=30) as s:
if SMTP_CONFIG["use_tls"]:
s.starttls()
if SMTP_CONFIG["user"]:
s.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"])
s.send_message(msg)
return True
def append_note(existing, line):
existing = (existing or "").strip()
if existing:
return existing + "\n" + line
return line
def _rpc_post(rpc_url, method, params):
import json
import urllib.request
payload = json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
}).encode("utf-8")
req = urllib.request.Request(
rpc_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=20) as resp:
data = json.loads(resp.read().decode("utf-8"))
if isinstance(data, dict) and data.get("error"):
raise RuntimeError(str(data["error"]))
return (data or {}).get("result")
def _chain_meta(symbol):
symbol = str(symbol or "").upper()
mapping = {
"ETH": {
"asset": "ETH",
"network": "ethereum",
"asset_type": "native",
"decimals": 18,
"token_contract": None,
},
"ETHO": {
"asset": "ETHO",
"network": "etho",
"asset_type": "native",
"decimals": 18,
"token_contract": None,
},
"ETI": {
"asset": "ETI",
"network": "etica",
"asset_type": "token",
"decimals": 18,
"token_contract": "0x34c61EA91bAcdA647269d4e310A86b875c09946f",
},
"USDC": {
"asset": "USDC",
"network": "arbitrum",
"asset_type": "token",
"decimals": 6,
"token_contract": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
},
}
return mapping.get(symbol)
def _to_base_units(amount_text, decimals):
amount_dec = Decimal(str(amount_text or "0"))
scale = Decimal(10) ** int(decimals)
return int((amount_dec * scale).quantize(Decimal("1")))
def _strip_0x(value):
return str(value or "").lower().replace("0x", "")
def parse_erc20_transfer_input(input_data):
data = _strip_0x(input_data)
if not data.startswith("a9059cbb"):
return None
if len(data) < 8 + 64 + 64:
return None
to_chunk = data[8:72]
amount_chunk = data[72:136]
to_addr = "0x" + to_chunk[-40:]
amount_int = int(amount_chunk, 16)
return {
"to": to_addr,
"amount": amount_int,
}
def fetch_tx_by_hash(rpc_url, txid):
tx = _rpc_post(rpc_url, "eth_getTransactionByHash", [txid])
if not tx:
return None
receipt = _rpc_post(rpc_url, "eth_getTransactionReceipt", [txid])
confirmed = bool(receipt and receipt.get("blockNumber"))
return {
"tx": tx,
"receipt": receipt,
"confirmed": confirmed,
}
def verify_expected_transaction(row, txid):
symbol = str(row.get("payment_currency") or "").upper()
meta = _chain_meta(symbol)
if not meta:
return None
rpc_urls = RPC_MAP.get(symbol, [])
if not rpc_urls:
return None
wallet_to = str(row.get("wallet_address") or "").lower()
expected_units = _to_base_units(row.get("payment_amount") or "0", meta["decimals"])
for rpc_url in rpc_urls:
try:
found = fetch_tx_by_hash(rpc_url, txid)
if not found:
continue
tx = found["tx"]
if meta["asset_type"] == "native":
tx_to = str(tx.get("to") or "").lower()
tx_value = int(tx.get("value") or "0x0", 16)
if tx_to != wallet_to:
continue
if tx_value != expected_units:
continue
else:
tx_to = str(tx.get("to") or "").lower()
contract = str(meta["token_contract"] or "").lower()
if tx_to != contract:
continue
parsed = parse_erc20_transfer_input(tx.get("input") or "")
if not parsed:
continue
if str(parsed["to"]).lower() != wallet_to:
continue
if int(parsed["amount"]) != expected_units:
continue
return {
"confirmed": found["confirmed"],
"rpc_url": rpc_url,
"asset": meta["asset"],
"network": meta["network"],
"received_amount_cad": row.get("expected_amount_cad") or row.get("cad_value_at_payment"),
"tx": tx,
"receipt": found["receipt"],
}
except Exception:
continue
return None
def find_cross_asset_match(row, txid):
expected_symbol = str(row.get("payment_currency") or "").upper()
wallet_to = str(row.get("wallet_address") or "").lower()
for symbol, rpc_urls in RPC_MAP.items():
if symbol == expected_symbol:
continue
meta = _chain_meta(symbol)
if not meta:
continue
for rpc_url in rpc_urls:
try:
found = fetch_tx_by_hash(rpc_url, txid)
if not found:
continue
tx = found["tx"]
if meta["asset_type"] == "native":
tx_to = str(tx.get("to") or "").lower()
if tx_to != wallet_to:
continue
else:
tx_to = str(tx.get("to") or "").lower()
contract = str(meta["token_contract"] or "").lower()
if tx_to != contract:
continue
parsed = parse_erc20_transfer_input(tx.get("input") or "")
if not parsed:
continue
if str(parsed["to"]).lower() != wallet_to:
continue
return {
"confirmed": found["confirmed"],
"txid": txid,
"asset": meta["asset"],
"network": meta["network"],
"rpc_url": rpc_url,
}
except Exception:
continue
return None
def ensure_ledger_credit(conn, invoice_id, client_id, overpayment_cad, payment_id):
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id
FROM credit_ledger
WHERE client_id = %s
AND invoice_id = %s
AND notes LIKE %s
ORDER BY id DESC
LIMIT 1
""", (client_id, invoice_id, f"%overpayment payment_id={payment_id}%"))
row = cursor.fetchone()
if row:
return row["id"]
ins = conn.cursor()
ins.execute("""
INSERT INTO credit_ledger
(client_id, invoice_id, credit_amount, credit_type, notes, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
client_id,
invoice_id,
str(overpayment_cad),
"manual_credit",
f"auto overpayment credit payment_id={payment_id}",
now_utc(),
))
conn.commit()
return ins.lastrowid
def create_event(conn, payment_id, invoice_id, event_type, details):
cur = conn.cursor()
cur.execute("""
INSERT INTO payment_reconciliation_events
(payment_id, invoice_id, event_type, event_status, details)
VALUES (%s, %s, %s, 'open', %s)
""", (payment_id, invoice_id, event_type, json.dumps(details, default=str)))
conn.commit()
def send_client_delay_email(row, milestone):
email = (row.get("client_email") or "").strip()
if not email:
return
invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}"
txid = row.get("txid") or "(not yet captured)"
asset = row.get("payment_currency") or row.get("payment_asset") or "crypto"
body = (
f"Hello,\n\n"
f"This is a friendly update regarding {invoice_number}.\n\n"
f"Your payment has not fully completed in our system yet. "
f"We are continuing to monitor it and will keep checking for up to 48 hours or until it completes.\n\n"
f"Current checkpoint: {milestone}\n"
f"Recorded transaction ID: {txid}\n"
f"Recorded crypto: {asset}\n\n"
f"If you have it handy, please reply with:\n"
f"- the transaction ID\n"
f"- the sending wallet address (if public)\n"
f"- the crypto/network used\n\n"
f"No action is required right now unless you would like us to verify those details sooner.\n\n"
f"Thank you,\n"
f"OTB Billing"
)
send_email([email], "Payment update for your invoice", body)
def send_client_success_after_alert(row):
email = (row.get("client_email") or "").strip()
if not email:
return
invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}"
asset = row.get("payment_currency") or row.get("payment_asset") or "crypto"
txid = row.get("txid") or "(not recorded)"
body = (
f"Hello,\n\n"
f"Your payment for {invoice_number} has now been confirmed successfully.\n\n"
f"Crypto: {asset}\n"
f"Transaction ID: {txid}\n\n"
f"Thank you,\n"
f"OTB Billing"
)
send_email([email], "Payment confirmed for your invoice", body)
def send_client_overpayment_email(row, overpayment_cad):
email = (row.get("client_email") or "").strip()
if not email:
return
invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}"
amount_text = f"{overpayment_cad:.2f} CAD"
body = (
f"Hello,\n\n"
f"Your last invoice {invoice_number} was overpaid by {amount_text}.\n\n"
f"{amount_text} will be credited to your account.\n\n"
f"If this was mistaken and you would like the over payment sent back, "
f"please email support@outsidethebox.top with the subject:\n"
f"Overpayment return request\n\n"
f"Please include any details in the body. As with other payment options, "
f"the overpayment payback would go back to the address or payment source it came from after review.\n\n"
f"Thank you,\n"
f"OTB Billing"
)
send_email([email], "Overpayment credit applied to your account", body, bcc_list=URGENT_TO)
def send_urgent_crosschain(row, detected):
body = (
f"Invoice: {row.get('invoice_number')}\n"
f"Invoice ID: {row.get('invoice_id')}\n"
f"Payment ID: {row.get('payment_id')}\n"
f"Transaction ID: {detected.get('txid') or row.get('txid')}\n"
f"Expected asset: {row.get('payment_currency')}\n"
f"Detected asset: {detected.get('asset')}\n"
f"Expected network: {row.get('payment_network')}\n"
f"Detected network: {detected.get('network')}\n"
)
send_email(URGENT_TO, "URGENT crosschain transaction needs resolve immedately!", body)
def send_urgent_overpayment(row, overpayment_cad, received_cad):
body = (
f"Invoice: {row.get('invoice_number')}\n"
f"Invoice ID: {row.get('invoice_id')}\n"
f"Payment ID: {row.get('payment_id')}\n"
f"Transaction ID: {row.get('txid')}\n"
f"Expected CAD: {d(row.get('expected_amount_cad') or row.get('cad_value_at_payment')):.2f}\n"
f"Received CAD: {received_cad:.2f}\n"
f"Overpayment CAD: {overpayment_cad:.2f}\n"
f"Asset: {row.get('payment_currency')}\n"
f"Network: {row.get('payment_network')}\n"
f"Client: {row.get('client_email') or row.get('company_name') or ''}\n"
)
send_email(URGENT_TO, "URGENT overpayment detected – review required", body)
def mark_invoice_paid_via(conn, invoice_id, method, asset, network):
cur = conn.cursor()
cur.execute("""
UPDATE invoices
SET paid_via_method = %s,
paid_via_asset = %s,
paid_via_network = %s
WHERE id = %s
""", (method, asset, network, invoice_id))
conn.commit()
def recalc_invoice_amount_paid(conn, invoice_id):
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS total_paid
FROM payments
WHERE invoice_id = %s
AND payment_status = 'confirmed'
""", (invoice_id,))
paid = d((cur.fetchone() or {}).get("total_paid"))
cur.execute("SELECT total_amount FROM invoices WHERE id = %s", (invoice_id,))
inv = cur.fetchone() or {}
total = d(inv.get("total_amount"))
status = "paid" if paid >= total and total > 0 else "pending"
upd = conn.cursor()
upd.execute("""
UPDATE invoices
SET amount_paid = %s,
status = %s,
paid_at = CASE WHEN %s = 'paid' AND paid_at IS NULL THEN %s ELSE paid_at END
WHERE id = %s
""", (str(paid), status, status, now_utc(), invoice_id))
conn.commit()
def mark_payment_confirmed(conn, row, received_cad):
cur = conn.cursor()
notes = append_note(row.get("notes"), f"[reconciliation] confirmed at {now_utc().isoformat()} received_cad={received_cad}")
cur.execute("""
UPDATE payments
SET payment_status = 'confirmed',
review_status = 'resolved',
received_amount_cad = %s,
last_checked_at = %s,
notes = %s
WHERE id = %s
""", (str(received_cad), now_utc(), notes, row["payment_id"]))
conn.commit()
mark_invoice_paid_via(conn, row["invoice_id"], row.get("payment_method") or "crypto", row.get("payment_currency"), row.get("payment_network"))
recalc_invoice_amount_paid(conn, row["invoice_id"])
def fetch_candidates(conn, daily=False):
cur = conn.cursor(dictionary=True)
if daily:
cur.execute("""
SELECT
p.id AS payment_id,
p.invoice_id,
p.client_id,
p.payment_method,
p.payment_currency,
p.payment_network,
p.payment_asset,
p.payment_amount,
p.cad_value_at_payment,
p.expected_amount_cad,
p.received_amount_cad,
p.reference,
p.wallet_address,
p.payment_status,
p.review_status,
p.created_at,
p.updated_at,
p.first_seen_at,
p.last_checked_at,
p.alert_24_sent_at,
p.alert_48_sent_at,
p.success_after_alert_sent_at,
p.urgent_alert_sent_at,
p.overpayment_email_sent_at,
p.txid,
p.notes,
i.invoice_number,
i.status AS invoice_status,
i.total_amount AS invoice_total,
c.company_name,
c.contact_name,
c.email AS client_email
FROM payments p
JOIN invoices i ON i.id = p.invoice_id
LEFT JOIN clients c ON c.id = p.client_id
WHERE (
p.payment_method = 'crypto'
OR UPPER(p.payment_currency) IN ('ETH','ETHO','ETI','USDC')
)
AND p.payment_status IN ('pending', 'failed')
ORDER BY p.created_at ASC
""")
else:
cur.execute("""
SELECT
p.id AS payment_id,
p.invoice_id,
p.client_id,
p.payment_method,
p.payment_currency,
p.payment_network,
p.payment_asset,
p.payment_amount,
p.cad_value_at_payment,
p.expected_amount_cad,
p.received_amount_cad,
p.reference,
p.wallet_address,
p.payment_status,
p.review_status,
p.created_at,
p.updated_at,
p.first_seen_at,
p.last_checked_at,
p.alert_24_sent_at,
p.alert_48_sent_at,
p.success_after_alert_sent_at,
p.urgent_alert_sent_at,
p.overpayment_email_sent_at,
p.txid,
p.notes,
i.invoice_number,
i.status AS invoice_status,
i.total_amount AS invoice_total,
c.company_name,
c.contact_name,
c.email AS client_email
FROM payments p
JOIN invoices i ON i.id = p.invoice_id
LEFT JOIN clients c ON c.id = p.client_id
WHERE (
p.payment_method = 'crypto'
OR UPPER(p.payment_currency) IN ('ETH','ETHO','ETI','USDC')
)
AND p.payment_status IN ('pending', 'failed')
ORDER BY p.created_at ASC
""")
return cur.fetchall()
def process_one(conn, row):
payment_id = row["payment_id"]
invoice_id = row["invoice_id"]
expected_cad = d(row.get("expected_amount_cad") or row.get("cad_value_at_payment") or row.get("invoice_total"))
created_at = row.get("created_at") or now_utc()
age = now_utc() - created_at
cur = conn.cursor()
cur.execute("""
UPDATE payments
SET first_seen_at = COALESCE(first_seen_at, %s),
last_checked_at = %s
WHERE id = %s
""", (created_at, now_utc(), payment_id))
conn.commit()
txid = (row.get("txid") or "").strip()
verified = verify_expected_transaction(row, txid) if txid else None
if txid and not verified and row.get("payment_status") == "failed":
cur.execute("""
UPDATE payments
SET payment_status = 'pending',
review_status = 'awaiting_recheck',
review_notes = CONCAT(
COALESCE(review_notes, ''),
%s
),
last_checked_at = %s
WHERE id = %s
""", (
"\n[rpc recheck] tx present but not confirmed on configured RPC pool; preserving as pending for retry",
now_utc(),
payment_id
))
conn.commit()
if txid and not verified and row.get("payment_status") == "failed":
cur.execute("""
UPDATE payments
SET payment_status = 'pending',
review_status = 'awaiting_recheck',
review_notes = CONCAT(
COALESCE(review_notes, ''),
%s
),
last_checked_at = %s
WHERE id = %s
""", (
"\n[rpc recheck] tx present but not confirmed on configured RPC pool; preserving as pending for retry",
now_utc(),
payment_id
))
conn.commit()
if verified and verified.get("confirmed"):
received_cad = d(verified.get("received_amount_cad") or expected_cad)
detected_asset = (verified.get("asset") or row.get("payment_currency") or "").upper()
detected_network = (verified.get("network") or row.get("payment_network") or "").lower()
expected_asset = (row.get("payment_currency") or "").upper()
expected_network = (row.get("payment_network") or "").lower()
if (detected_asset and expected_asset and detected_asset != expected_asset) or (
detected_network and expected_network and detected_network != expected_network
):
if not row.get("urgent_alert_sent_at"):
send_urgent_crosschain(row, verified)
create_event(conn, payment_id, invoice_id, "crosschain_mismatch", {
"expected_asset": expected_asset,
"detected_asset": detected_asset,
"expected_network": expected_network,
"detected_network": detected_network,
"txid": txid,
})
cur.execute("""
UPDATE payments
SET review_status = 'crosschain_mismatch',
urgent_alert_sent_at = %s
WHERE id = %s
""", (now_utc(), payment_id))
conn.commit()
return "flagged"
mark_payment_confirmed(conn, row, received_cad)
overpayment_cad = received_cad - expected_cad
if overpayment_cad > Decimal("0"):
ensure_ledger_credit(conn, invoice_id, row["client_id"], overpayment_cad, payment_id)
create_event(conn, payment_id, invoice_id, "overpayment", {
"expected_cad": str(expected_cad),
"received_cad": str(received_cad),
"overpayment_cad": str(overpayment_cad),
})
if not row.get("urgent_alert_sent_at"):
send_urgent_overpayment(row, overpayment_cad, received_cad)
cur.execute("""
UPDATE payments
SET urgent_alert_sent_at = %s,
review_status = 'overpayment_pending_review'
WHERE id = %s
""", (now_utc(), payment_id))
conn.commit()
if overpayment_cad > Decimal("1.00") and not row.get("overpayment_email_sent_at"):
send_client_overpayment_email(row, overpayment_cad)
cur.execute("""
UPDATE payments
SET overpayment_email_sent_at = %s
WHERE id = %s
""", (now_utc(), payment_id))
conn.commit()
if (row.get("alert_24_sent_at") or row.get("alert_48_sent_at")) and not row.get("success_after_alert_sent_at"):
send_client_success_after_alert(row)
cur.execute("""
UPDATE payments
SET success_after_alert_sent_at = %s
WHERE id = %s
""", (now_utc(), payment_id))
conn.commit()
return "resolved"
cross = find_cross_asset_match(row, txid)
if cross and not row.get("urgent_alert_sent_at"):
send_urgent_crosschain(row, cross)
create_event(conn, payment_id, invoice_id, "crosschain_mismatch", cross)
cur.execute("""
UPDATE payments
SET review_status = 'crosschain_mismatch',
urgent_alert_sent_at = %s,
last_checked_at = %s
WHERE id = %s
""", (now_utc(), now_utc(), payment_id))
conn.commit()
return "flagged"
if age >= timedelta(hours=48) and not row.get("alert_48_sent_at"):
stamp = now_utc()
send_client_delay_email(row, "48 hours")
cur.execute("""
UPDATE payments
SET alert_48_sent_at = %s,
alert_24_sent_at = COALESCE(alert_24_sent_at, %s),
review_status = 'awaiting_recheck'
WHERE id = %s
""", (stamp, stamp, payment_id))
conn.commit()
return "notified"
if age >= timedelta(hours=24) and not row.get("alert_24_sent_at") and not row.get("alert_48_sent_at"):
send_client_delay_email(row, "24 hours")
cur.execute("""
UPDATE payments
SET alert_24_sent_at = %s,
review_status = 'watching_24h'
WHERE id = %s
""", (now_utc(), payment_id))
conn.commit()
return "notified"
return "scanned"
def main():
run_mode = "daily" if "--daily" in sys.argv else "interval"
conn = db()
ins = conn.cursor()
ins.execute("""
INSERT INTO crypto_reconciliation_runs (run_mode, started_at)
VALUES (%s, %s)
""", (run_mode, now_utc()))
conn.commit()
run_id = ins.lastrowid
scanned = resolved = flagged = 0
try:
rows = fetch_candidates(conn, daily=(run_mode == "daily"))
for row in rows:
scanned += 1
result = process_one(conn, row)
if result == "resolved":
resolved += 1
elif result == "flagged":
flagged += 1
upd = conn.cursor()
upd.execute("""
UPDATE crypto_reconciliation_runs
SET finished_at = %s,
scanned_count = %s,
resolved_count = %s,
flagged_count = %s,
notes = %s
WHERE id = %s
""", (now_utc(), scanned, resolved, flagged, f"mode={run_mode}", run_id))
conn.commit()
log(f"crypto reconciliation complete mode={run_mode} scanned={scanned} resolved={resolved} flagged={flagged}")
except Exception as e:
msg = f"crypto reconciliation error mode={run_mode}: {e}\n{traceback.format_exc()}"
log(msg)
upd = conn.cursor()
upd.execute("""
UPDATE crypto_reconciliation_runs
SET finished_at = %s,
scanned_count = %s,
resolved_count = %s,
flagged_count = %s,
notes = %s
WHERE id = %s
""", (now_utc(), scanned, resolved, flagged, msg[:60000], run_id))
conn.commit()
raise
finally:
conn.close()
if __name__ == "__main__":
main()