You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
812 lines
28 KiB
812 lines
28 KiB
#!/usr/bin/env python3 |
|
import os |
|
import sys |
|
import json |
|
import smtplib |
|
import traceback |
|
from decimal import Decimal, InvalidOperation |
|
from datetime import datetime, timedelta, timezone |
|
from email.message import EmailMessage |
|
from pathlib import Path |
|
|
|
import mysql.connector |
|
from dotenv import load_dotenv |
|
|
|
BASE_DIR = Path("/home/def/otb_billing") |
|
load_dotenv(BASE_DIR / ".env") |
|
|
|
LOG_PATH = BASE_DIR / "logs" / "crypto_reconciliation_worker.log" |
|
TZ_UTC = timezone.utc |
|
|
|
URGENT_TO = [ |
|
"natural_gas_fitter@yahoo.ca", |
|
"support@outsidethebox.top", |
|
] |
|
|
|
def log(msg: str) -> None: |
|
stamp = datetime.now(TZ_UTC).isoformat() |
|
LOG_PATH.parent.mkdir(parents=True, exist_ok=True) |
|
with open(LOG_PATH, "a", encoding="utf-8") as fh: |
|
fh.write(f"[{stamp}] {msg}\n") |
|
print(msg) |
|
|
|
def env(name: str, default: str = "") -> str: |
|
return os.getenv(name, default).strip() |
|
|
|
DB_CONFIG = { |
|
"host": env("OTB_BILLING_DB_HOST", "127.0.0.1"), |
|
"port": int(env("OTB_BILLING_DB_PORT", "3306")), |
|
"user": env("OTB_BILLING_DB_USER", "otb_billing"), |
|
"password": env("OTB_BILLING_DB_PASSWORD", ""), |
|
"database": env("OTB_BILLING_DB_NAME", "otb_billing"), |
|
} |
|
|
|
SMTP_CONFIG = { |
|
"host": env("SMTP_HOST"), |
|
"port": int(env("SMTP_PORT", "587") or "587"), |
|
"user": env("SMTP_USER"), |
|
"password": env("SMTP_PASS"), |
|
"from_email": env("SMTP_FROM_EMAIL", env("BUSINESS_EMAIL")), |
|
"from_name": env("SMTP_FROM_NAME", env("BUSINESS_NAME", "OTB Billing")), |
|
"use_tls": env("SMTP_USE_TLS", "1") == "1", |
|
"use_ssl": env("SMTP_USE_SSL", "0") == "1", |
|
} |
|
|
|
RPC_MAP = { |
|
"ETH": [u for u in [env("RPC_ETH_URL"), env("RPC_ETH_URL_2")] if u], |
|
"ETHO": [u for u in [env("RPC_ETHO_URL", "http://62.72.177.111:7545"), env("RPC_ETHO_URL_2", "http://192.168.0.177:6645")] if u], |
|
"ETI": [u for u in [env("RPC_ETICA_URL"), env("RPC_ETICA_URL_2")] if u], |
|
"USDC": [u for u in [env("RPC_ARB_URL"), env("RPC_ARB_URL_2")] if u], |
|
} |
|
|
|
def now_utc(): |
|
return datetime.now(TZ_UTC).replace(tzinfo=None) |
|
|
|
def d(val) -> Decimal: |
|
try: |
|
return Decimal(str(val or "0")) |
|
except (InvalidOperation, ValueError): |
|
return Decimal("0") |
|
|
|
def db(): |
|
return mysql.connector.connect(**DB_CONFIG) |
|
|
|
def send_email(to_list, subject, body, bcc_list=None): |
|
if not SMTP_CONFIG["host"] or not SMTP_CONFIG["from_email"] or not to_list: |
|
log(f"email skipped: subject={subject!r} to={to_list!r}") |
|
return False |
|
|
|
msg = EmailMessage() |
|
msg["Subject"] = subject |
|
msg["From"] = f'{SMTP_CONFIG["from_name"]} <{SMTP_CONFIG["from_email"]}>' if SMTP_CONFIG["from_name"] else SMTP_CONFIG["from_email"] |
|
msg["To"] = ", ".join(to_list) |
|
if bcc_list: |
|
msg["Bcc"] = ", ".join(bcc_list) |
|
msg.set_content(body) |
|
|
|
if SMTP_CONFIG["use_ssl"]: |
|
with smtplib.SMTP_SSL(SMTP_CONFIG["host"], SMTP_CONFIG["port"], timeout=30) as s: |
|
if SMTP_CONFIG["user"]: |
|
s.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"]) |
|
s.send_message(msg) |
|
else: |
|
with smtplib.SMTP(SMTP_CONFIG["host"], SMTP_CONFIG["port"], timeout=30) as s: |
|
if SMTP_CONFIG["use_tls"]: |
|
s.starttls() |
|
if SMTP_CONFIG["user"]: |
|
s.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"]) |
|
s.send_message(msg) |
|
return True |
|
|
|
def append_note(existing, line): |
|
existing = (existing or "").strip() |
|
if existing: |
|
return existing + "\n" + line |
|
return line |
|
|
|
def _rpc_post(rpc_url, method, params): |
|
import json |
|
import urllib.request |
|
|
|
payload = json.dumps({ |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": method, |
|
"params": params, |
|
}).encode("utf-8") |
|
|
|
req = urllib.request.Request( |
|
rpc_url, |
|
data=payload, |
|
headers={"Content-Type": "application/json"}, |
|
method="POST", |
|
) |
|
|
|
with urllib.request.urlopen(req, timeout=20) as resp: |
|
data = json.loads(resp.read().decode("utf-8")) |
|
|
|
if isinstance(data, dict) and data.get("error"): |
|
raise RuntimeError(str(data["error"])) |
|
|
|
return (data or {}).get("result") |
|
|
|
|
|
def _chain_meta(symbol): |
|
symbol = str(symbol or "").upper() |
|
mapping = { |
|
"ETH": { |
|
"asset": "ETH", |
|
"network": "ethereum", |
|
"asset_type": "native", |
|
"decimals": 18, |
|
"token_contract": None, |
|
}, |
|
"ETHO": { |
|
"asset": "ETHO", |
|
"network": "etho", |
|
"asset_type": "native", |
|
"decimals": 18, |
|
"token_contract": None, |
|
}, |
|
"ETI": { |
|
"asset": "ETI", |
|
"network": "etica", |
|
"asset_type": "token", |
|
"decimals": 18, |
|
"token_contract": "0x34c61EA91bAcdA647269d4e310A86b875c09946f", |
|
}, |
|
"USDC": { |
|
"asset": "USDC", |
|
"network": "arbitrum", |
|
"asset_type": "token", |
|
"decimals": 6, |
|
"token_contract": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831", |
|
}, |
|
} |
|
return mapping.get(symbol) |
|
|
|
|
|
def _to_base_units(amount_text, decimals): |
|
amount_dec = Decimal(str(amount_text or "0")) |
|
scale = Decimal(10) ** int(decimals) |
|
return int((amount_dec * scale).quantize(Decimal("1"))) |
|
|
|
|
|
def _strip_0x(value): |
|
return str(value or "").lower().replace("0x", "") |
|
|
|
|
|
def parse_erc20_transfer_input(input_data): |
|
data = _strip_0x(input_data) |
|
if not data.startswith("a9059cbb"): |
|
return None |
|
if len(data) < 8 + 64 + 64: |
|
return None |
|
|
|
to_chunk = data[8:72] |
|
amount_chunk = data[72:136] |
|
to_addr = "0x" + to_chunk[-40:] |
|
amount_int = int(amount_chunk, 16) |
|
|
|
return { |
|
"to": to_addr, |
|
"amount": amount_int, |
|
} |
|
|
|
|
|
def fetch_tx_by_hash(rpc_url, txid): |
|
tx = _rpc_post(rpc_url, "eth_getTransactionByHash", [txid]) |
|
if not tx: |
|
return None |
|
|
|
receipt = _rpc_post(rpc_url, "eth_getTransactionReceipt", [txid]) |
|
confirmed = bool(receipt and receipt.get("blockNumber")) |
|
|
|
return { |
|
"tx": tx, |
|
"receipt": receipt, |
|
"confirmed": confirmed, |
|
} |
|
|
|
|
|
def verify_expected_transaction(row, txid): |
|
symbol = str(row.get("payment_currency") or "").upper() |
|
meta = _chain_meta(symbol) |
|
if not meta: |
|
return None |
|
|
|
rpc_urls = RPC_MAP.get(symbol, []) |
|
if not rpc_urls: |
|
return None |
|
|
|
wallet_to = str(row.get("wallet_address") or "").lower() |
|
expected_units = _to_base_units(row.get("payment_amount") or "0", meta["decimals"]) |
|
|
|
for rpc_url in rpc_urls: |
|
try: |
|
found = fetch_tx_by_hash(rpc_url, txid) |
|
if not found: |
|
continue |
|
|
|
tx = found["tx"] |
|
|
|
if meta["asset_type"] == "native": |
|
tx_to = str(tx.get("to") or "").lower() |
|
tx_value = int(tx.get("value") or "0x0", 16) |
|
|
|
if tx_to != wallet_to: |
|
continue |
|
if tx_value != expected_units: |
|
continue |
|
|
|
else: |
|
tx_to = str(tx.get("to") or "").lower() |
|
contract = str(meta["token_contract"] or "").lower() |
|
if tx_to != contract: |
|
continue |
|
|
|
parsed = parse_erc20_transfer_input(tx.get("input") or "") |
|
if not parsed: |
|
continue |
|
|
|
if str(parsed["to"]).lower() != wallet_to: |
|
continue |
|
if int(parsed["amount"]) != expected_units: |
|
continue |
|
|
|
return { |
|
"confirmed": found["confirmed"], |
|
"rpc_url": rpc_url, |
|
"asset": meta["asset"], |
|
"network": meta["network"], |
|
"received_amount_cad": row.get("expected_amount_cad") or row.get("cad_value_at_payment"), |
|
"tx": tx, |
|
"receipt": found["receipt"], |
|
} |
|
except Exception: |
|
continue |
|
|
|
return None |
|
|
|
|
|
def find_cross_asset_match(row, txid): |
|
expected_symbol = str(row.get("payment_currency") or "").upper() |
|
wallet_to = str(row.get("wallet_address") or "").lower() |
|
|
|
for symbol, rpc_urls in RPC_MAP.items(): |
|
if symbol == expected_symbol: |
|
continue |
|
|
|
meta = _chain_meta(symbol) |
|
if not meta: |
|
continue |
|
|
|
for rpc_url in rpc_urls: |
|
try: |
|
found = fetch_tx_by_hash(rpc_url, txid) |
|
if not found: |
|
continue |
|
|
|
tx = found["tx"] |
|
|
|
if meta["asset_type"] == "native": |
|
tx_to = str(tx.get("to") or "").lower() |
|
if tx_to != wallet_to: |
|
continue |
|
else: |
|
tx_to = str(tx.get("to") or "").lower() |
|
contract = str(meta["token_contract"] or "").lower() |
|
if tx_to != contract: |
|
continue |
|
|
|
parsed = parse_erc20_transfer_input(tx.get("input") or "") |
|
if not parsed: |
|
continue |
|
if str(parsed["to"]).lower() != wallet_to: |
|
continue |
|
|
|
return { |
|
"confirmed": found["confirmed"], |
|
"txid": txid, |
|
"asset": meta["asset"], |
|
"network": meta["network"], |
|
"rpc_url": rpc_url, |
|
} |
|
except Exception: |
|
continue |
|
|
|
return None |
|
|
|
|
|
def ensure_ledger_credit(conn, invoice_id, client_id, overpayment_cad, payment_id): |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT id |
|
FROM credit_ledger |
|
WHERE client_id = %s |
|
AND invoice_id = %s |
|
AND notes LIKE %s |
|
ORDER BY id DESC |
|
LIMIT 1 |
|
""", (client_id, invoice_id, f"%overpayment payment_id={payment_id}%")) |
|
row = cursor.fetchone() |
|
if row: |
|
return row["id"] |
|
|
|
ins = conn.cursor() |
|
ins.execute(""" |
|
INSERT INTO credit_ledger |
|
(client_id, invoice_id, credit_amount, credit_type, notes, created_at) |
|
VALUES (%s, %s, %s, %s, %s, %s) |
|
""", ( |
|
client_id, |
|
invoice_id, |
|
str(overpayment_cad), |
|
"manual_credit", |
|
f"auto overpayment credit payment_id={payment_id}", |
|
now_utc(), |
|
)) |
|
conn.commit() |
|
return ins.lastrowid |
|
|
|
def create_event(conn, payment_id, invoice_id, event_type, details): |
|
cur = conn.cursor() |
|
cur.execute(""" |
|
INSERT INTO payment_reconciliation_events |
|
(payment_id, invoice_id, event_type, event_status, details) |
|
VALUES (%s, %s, %s, 'open', %s) |
|
""", (payment_id, invoice_id, event_type, json.dumps(details, default=str))) |
|
conn.commit() |
|
|
|
def send_client_delay_email(row, milestone): |
|
email = (row.get("client_email") or "").strip() |
|
if not email: |
|
return |
|
invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}" |
|
txid = row.get("txid") or "(not yet captured)" |
|
asset = row.get("payment_currency") or row.get("payment_asset") or "crypto" |
|
body = ( |
|
f"Hello,\n\n" |
|
f"This is a friendly update regarding {invoice_number}.\n\n" |
|
f"Your payment has not fully completed in our system yet. " |
|
f"We are continuing to monitor it and will keep checking for up to 48 hours or until it completes.\n\n" |
|
f"Current checkpoint: {milestone}\n" |
|
f"Recorded transaction ID: {txid}\n" |
|
f"Recorded crypto: {asset}\n\n" |
|
f"If you have it handy, please reply with:\n" |
|
f"- the transaction ID\n" |
|
f"- the sending wallet address (if public)\n" |
|
f"- the crypto/network used\n\n" |
|
f"No action is required right now unless you would like us to verify those details sooner.\n\n" |
|
f"Thank you,\n" |
|
f"OTB Billing" |
|
) |
|
send_email([email], "Payment update for your invoice", body) |
|
|
|
def send_client_success_after_alert(row): |
|
email = (row.get("client_email") or "").strip() |
|
if not email: |
|
return |
|
invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}" |
|
asset = row.get("payment_currency") or row.get("payment_asset") or "crypto" |
|
txid = row.get("txid") or "(not recorded)" |
|
body = ( |
|
f"Hello,\n\n" |
|
f"Your payment for {invoice_number} has now been confirmed successfully.\n\n" |
|
f"Crypto: {asset}\n" |
|
f"Transaction ID: {txid}\n\n" |
|
f"Thank you,\n" |
|
f"OTB Billing" |
|
) |
|
send_email([email], "Payment confirmed for your invoice", body) |
|
|
|
def send_client_overpayment_email(row, overpayment_cad): |
|
email = (row.get("client_email") or "").strip() |
|
if not email: |
|
return |
|
invoice_number = row.get("invoice_number") or f"Invoice #{row.get('invoice_id')}" |
|
amount_text = f"{overpayment_cad:.2f} CAD" |
|
body = ( |
|
f"Hello,\n\n" |
|
f"Your last invoice {invoice_number} was overpaid by {amount_text}.\n\n" |
|
f"{amount_text} will be credited to your account.\n\n" |
|
f"If this was mistaken and you would like the over payment sent back, " |
|
f"please email support@outsidethebox.top with the subject:\n" |
|
f"Overpayment return request\n\n" |
|
f"Please include any details in the body. As with other payment options, " |
|
f"the overpayment payback would go back to the address or payment source it came from after review.\n\n" |
|
f"Thank you,\n" |
|
f"OTB Billing" |
|
) |
|
send_email([email], "Overpayment credit applied to your account", body, bcc_list=URGENT_TO) |
|
|
|
def send_urgent_crosschain(row, detected): |
|
body = ( |
|
f"Invoice: {row.get('invoice_number')}\n" |
|
f"Invoice ID: {row.get('invoice_id')}\n" |
|
f"Payment ID: {row.get('payment_id')}\n" |
|
f"Transaction ID: {detected.get('txid') or row.get('txid')}\n" |
|
f"Expected asset: {row.get('payment_currency')}\n" |
|
f"Detected asset: {detected.get('asset')}\n" |
|
f"Expected network: {row.get('payment_network')}\n" |
|
f"Detected network: {detected.get('network')}\n" |
|
) |
|
send_email(URGENT_TO, "URGENT crosschain transaction needs resolve immedately!", body) |
|
|
|
def send_urgent_overpayment(row, overpayment_cad, received_cad): |
|
body = ( |
|
f"Invoice: {row.get('invoice_number')}\n" |
|
f"Invoice ID: {row.get('invoice_id')}\n" |
|
f"Payment ID: {row.get('payment_id')}\n" |
|
f"Transaction ID: {row.get('txid')}\n" |
|
f"Expected CAD: {d(row.get('expected_amount_cad') or row.get('cad_value_at_payment')):.2f}\n" |
|
f"Received CAD: {received_cad:.2f}\n" |
|
f"Overpayment CAD: {overpayment_cad:.2f}\n" |
|
f"Asset: {row.get('payment_currency')}\n" |
|
f"Network: {row.get('payment_network')}\n" |
|
f"Client: {row.get('client_email') or row.get('company_name') or ''}\n" |
|
) |
|
send_email(URGENT_TO, "URGENT overpayment detected – review required", body) |
|
|
|
def mark_invoice_paid_via(conn, invoice_id, method, asset, network): |
|
cur = conn.cursor() |
|
cur.execute(""" |
|
UPDATE invoices |
|
SET paid_via_method = %s, |
|
paid_via_asset = %s, |
|
paid_via_network = %s |
|
WHERE id = %s |
|
""", (method, asset, network, invoice_id)) |
|
conn.commit() |
|
|
|
def recalc_invoice_amount_paid(conn, invoice_id): |
|
cur = conn.cursor(dictionary=True) |
|
cur.execute(""" |
|
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS total_paid |
|
FROM payments |
|
WHERE invoice_id = %s |
|
AND payment_status = 'confirmed' |
|
""", (invoice_id,)) |
|
paid = d((cur.fetchone() or {}).get("total_paid")) |
|
cur.execute("SELECT total_amount FROM invoices WHERE id = %s", (invoice_id,)) |
|
inv = cur.fetchone() or {} |
|
total = d(inv.get("total_amount")) |
|
status = "paid" if paid >= total and total > 0 else "pending" |
|
upd = conn.cursor() |
|
upd.execute(""" |
|
UPDATE invoices |
|
SET amount_paid = %s, |
|
status = %s, |
|
paid_at = CASE WHEN %s = 'paid' AND paid_at IS NULL THEN %s ELSE paid_at END |
|
WHERE id = %s |
|
""", (str(paid), status, status, now_utc(), invoice_id)) |
|
conn.commit() |
|
|
|
def mark_payment_confirmed(conn, row, received_cad): |
|
cur = conn.cursor() |
|
notes = append_note(row.get("notes"), f"[reconciliation] confirmed at {now_utc().isoformat()} received_cad={received_cad}") |
|
cur.execute(""" |
|
UPDATE payments |
|
SET payment_status = 'confirmed', |
|
review_status = 'resolved', |
|
received_amount_cad = %s, |
|
last_checked_at = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", (str(received_cad), now_utc(), notes, row["payment_id"])) |
|
conn.commit() |
|
mark_invoice_paid_via(conn, row["invoice_id"], row.get("payment_method") or "crypto", row.get("payment_currency"), row.get("payment_network")) |
|
recalc_invoice_amount_paid(conn, row["invoice_id"]) |
|
|
|
def fetch_candidates(conn, daily=False): |
|
cur = conn.cursor(dictionary=True) |
|
if daily: |
|
cur.execute(""" |
|
SELECT |
|
p.id AS payment_id, |
|
p.invoice_id, |
|
p.client_id, |
|
p.payment_method, |
|
p.payment_currency, |
|
p.payment_network, |
|
p.payment_asset, |
|
p.payment_amount, |
|
p.cad_value_at_payment, |
|
p.expected_amount_cad, |
|
p.received_amount_cad, |
|
p.reference, |
|
p.wallet_address, |
|
p.payment_status, |
|
p.review_status, |
|
p.created_at, |
|
p.updated_at, |
|
p.first_seen_at, |
|
p.last_checked_at, |
|
p.alert_24_sent_at, |
|
p.alert_48_sent_at, |
|
p.success_after_alert_sent_at, |
|
p.urgent_alert_sent_at, |
|
p.overpayment_email_sent_at, |
|
p.txid, |
|
p.notes, |
|
i.invoice_number, |
|
i.status AS invoice_status, |
|
i.total_amount AS invoice_total, |
|
c.company_name, |
|
c.contact_name, |
|
c.email AS client_email |
|
FROM payments p |
|
JOIN invoices i ON i.id = p.invoice_id |
|
LEFT JOIN clients c ON c.id = p.client_id |
|
WHERE ( |
|
p.payment_method = 'crypto' |
|
OR UPPER(p.payment_currency) IN ('ETH','ETHO','ETI','USDC') |
|
) |
|
AND p.payment_status IN ('pending', 'failed') |
|
ORDER BY p.created_at ASC |
|
""") |
|
else: |
|
cur.execute(""" |
|
SELECT |
|
p.id AS payment_id, |
|
p.invoice_id, |
|
p.client_id, |
|
p.payment_method, |
|
p.payment_currency, |
|
p.payment_network, |
|
p.payment_asset, |
|
p.payment_amount, |
|
p.cad_value_at_payment, |
|
p.expected_amount_cad, |
|
p.received_amount_cad, |
|
p.reference, |
|
p.wallet_address, |
|
p.payment_status, |
|
p.review_status, |
|
p.created_at, |
|
p.updated_at, |
|
p.first_seen_at, |
|
p.last_checked_at, |
|
p.alert_24_sent_at, |
|
p.alert_48_sent_at, |
|
p.success_after_alert_sent_at, |
|
p.urgent_alert_sent_at, |
|
p.overpayment_email_sent_at, |
|
p.txid, |
|
p.notes, |
|
i.invoice_number, |
|
i.status AS invoice_status, |
|
i.total_amount AS invoice_total, |
|
c.company_name, |
|
c.contact_name, |
|
c.email AS client_email |
|
FROM payments p |
|
JOIN invoices i ON i.id = p.invoice_id |
|
LEFT JOIN clients c ON c.id = p.client_id |
|
WHERE ( |
|
p.payment_method = 'crypto' |
|
OR UPPER(p.payment_currency) IN ('ETH','ETHO','ETI','USDC') |
|
) |
|
AND p.payment_status IN ('pending', 'failed') |
|
ORDER BY p.created_at ASC |
|
""") |
|
return cur.fetchall() |
|
|
|
def process_one(conn, row): |
|
|
|
payment_id = row["payment_id"] |
|
invoice_id = row["invoice_id"] |
|
expected_cad = d(row.get("expected_amount_cad") or row.get("cad_value_at_payment") or row.get("invoice_total")) |
|
created_at = row.get("created_at") or now_utc() |
|
age = now_utc() - created_at |
|
|
|
cur = conn.cursor() |
|
cur.execute(""" |
|
UPDATE payments |
|
SET first_seen_at = COALESCE(first_seen_at, %s), |
|
last_checked_at = %s |
|
WHERE id = %s |
|
""", (created_at, now_utc(), payment_id)) |
|
conn.commit() |
|
|
|
txid = (row.get("txid") or "").strip() |
|
verified = verify_expected_transaction(row, txid) if txid else None |
|
|
|
if txid and not verified and row.get("payment_status") == "failed": |
|
cur.execute(""" |
|
UPDATE payments |
|
SET payment_status = 'pending', |
|
review_status = 'awaiting_recheck', |
|
review_notes = CONCAT( |
|
COALESCE(review_notes, ''), |
|
%s |
|
), |
|
last_checked_at = %s |
|
WHERE id = %s |
|
""", ( |
|
"\n[rpc recheck] tx present but not confirmed on configured RPC pool; preserving as pending for retry", |
|
now_utc(), |
|
payment_id |
|
)) |
|
conn.commit() |
|
|
|
if txid and not verified and row.get("payment_status") == "failed": |
|
cur.execute(""" |
|
UPDATE payments |
|
SET payment_status = 'pending', |
|
review_status = 'awaiting_recheck', |
|
review_notes = CONCAT( |
|
COALESCE(review_notes, ''), |
|
%s |
|
), |
|
last_checked_at = %s |
|
WHERE id = %s |
|
""", ( |
|
"\n[rpc recheck] tx present but not confirmed on configured RPC pool; preserving as pending for retry", |
|
now_utc(), |
|
payment_id |
|
)) |
|
conn.commit() |
|
|
|
if verified and verified.get("confirmed"): |
|
received_cad = d(verified.get("received_amount_cad") or expected_cad) |
|
detected_asset = (verified.get("asset") or row.get("payment_currency") or "").upper() |
|
detected_network = (verified.get("network") or row.get("payment_network") or "").lower() |
|
expected_asset = (row.get("payment_currency") or "").upper() |
|
expected_network = (row.get("payment_network") or "").lower() |
|
|
|
if (detected_asset and expected_asset and detected_asset != expected_asset) or ( |
|
detected_network and expected_network and detected_network != expected_network |
|
): |
|
if not row.get("urgent_alert_sent_at"): |
|
send_urgent_crosschain(row, verified) |
|
create_event(conn, payment_id, invoice_id, "crosschain_mismatch", { |
|
"expected_asset": expected_asset, |
|
"detected_asset": detected_asset, |
|
"expected_network": expected_network, |
|
"detected_network": detected_network, |
|
"txid": txid, |
|
}) |
|
cur.execute(""" |
|
UPDATE payments |
|
SET review_status = 'crosschain_mismatch', |
|
urgent_alert_sent_at = %s |
|
WHERE id = %s |
|
""", (now_utc(), payment_id)) |
|
conn.commit() |
|
return "flagged" |
|
|
|
mark_payment_confirmed(conn, row, received_cad) |
|
|
|
overpayment_cad = received_cad - expected_cad |
|
if overpayment_cad > Decimal("0"): |
|
ensure_ledger_credit(conn, invoice_id, row["client_id"], overpayment_cad, payment_id) |
|
create_event(conn, payment_id, invoice_id, "overpayment", { |
|
"expected_cad": str(expected_cad), |
|
"received_cad": str(received_cad), |
|
"overpayment_cad": str(overpayment_cad), |
|
}) |
|
if not row.get("urgent_alert_sent_at"): |
|
send_urgent_overpayment(row, overpayment_cad, received_cad) |
|
cur.execute(""" |
|
UPDATE payments |
|
SET urgent_alert_sent_at = %s, |
|
review_status = 'overpayment_pending_review' |
|
WHERE id = %s |
|
""", (now_utc(), payment_id)) |
|
conn.commit() |
|
if overpayment_cad > Decimal("1.00") and not row.get("overpayment_email_sent_at"): |
|
send_client_overpayment_email(row, overpayment_cad) |
|
cur.execute(""" |
|
UPDATE payments |
|
SET overpayment_email_sent_at = %s |
|
WHERE id = %s |
|
""", (now_utc(), payment_id)) |
|
conn.commit() |
|
|
|
if (row.get("alert_24_sent_at") or row.get("alert_48_sent_at")) and not row.get("success_after_alert_sent_at"): |
|
send_client_success_after_alert(row) |
|
cur.execute(""" |
|
UPDATE payments |
|
SET success_after_alert_sent_at = %s |
|
WHERE id = %s |
|
""", (now_utc(), payment_id)) |
|
conn.commit() |
|
|
|
return "resolved" |
|
|
|
cross = find_cross_asset_match(row, txid) |
|
if cross and not row.get("urgent_alert_sent_at"): |
|
send_urgent_crosschain(row, cross) |
|
create_event(conn, payment_id, invoice_id, "crosschain_mismatch", cross) |
|
cur.execute(""" |
|
UPDATE payments |
|
SET review_status = 'crosschain_mismatch', |
|
urgent_alert_sent_at = %s, |
|
last_checked_at = %s |
|
WHERE id = %s |
|
""", (now_utc(), now_utc(), payment_id)) |
|
conn.commit() |
|
return "flagged" |
|
|
|
if age >= timedelta(hours=48) and not row.get("alert_48_sent_at"): |
|
stamp = now_utc() |
|
send_client_delay_email(row, "48 hours") |
|
cur.execute(""" |
|
UPDATE payments |
|
SET alert_48_sent_at = %s, |
|
alert_24_sent_at = COALESCE(alert_24_sent_at, %s), |
|
review_status = 'awaiting_recheck' |
|
WHERE id = %s |
|
""", (stamp, stamp, payment_id)) |
|
conn.commit() |
|
return "notified" |
|
|
|
if age >= timedelta(hours=24) and not row.get("alert_24_sent_at") and not row.get("alert_48_sent_at"): |
|
send_client_delay_email(row, "24 hours") |
|
cur.execute(""" |
|
UPDATE payments |
|
SET alert_24_sent_at = %s, |
|
review_status = 'watching_24h' |
|
WHERE id = %s |
|
""", (now_utc(), payment_id)) |
|
conn.commit() |
|
return "notified" |
|
|
|
return "scanned" |
|
|
|
def main(): |
|
run_mode = "daily" if "--daily" in sys.argv else "interval" |
|
conn = db() |
|
ins = conn.cursor() |
|
ins.execute(""" |
|
INSERT INTO crypto_reconciliation_runs (run_mode, started_at) |
|
VALUES (%s, %s) |
|
""", (run_mode, now_utc())) |
|
conn.commit() |
|
run_id = ins.lastrowid |
|
|
|
scanned = resolved = flagged = 0 |
|
|
|
try: |
|
rows = fetch_candidates(conn, daily=(run_mode == "daily")) |
|
for row in rows: |
|
scanned += 1 |
|
result = process_one(conn, row) |
|
if result == "resolved": |
|
resolved += 1 |
|
elif result == "flagged": |
|
flagged += 1 |
|
|
|
upd = conn.cursor() |
|
upd.execute(""" |
|
UPDATE crypto_reconciliation_runs |
|
SET finished_at = %s, |
|
scanned_count = %s, |
|
resolved_count = %s, |
|
flagged_count = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", (now_utc(), scanned, resolved, flagged, f"mode={run_mode}", run_id)) |
|
conn.commit() |
|
log(f"crypto reconciliation complete mode={run_mode} scanned={scanned} resolved={resolved} flagged={flagged}") |
|
except Exception as e: |
|
msg = f"crypto reconciliation error mode={run_mode}: {e}\n{traceback.format_exc()}" |
|
log(msg) |
|
upd = conn.cursor() |
|
upd.execute(""" |
|
UPDATE crypto_reconciliation_runs |
|
SET finished_at = %s, |
|
scanned_count = %s, |
|
resolved_count = %s, |
|
flagged_count = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", (now_utc(), scanned, resolved, flagged, msg[:60000], run_id)) |
|
conn.commit() |
|
raise |
|
finally: |
|
conn.close() |
|
|
|
if __name__ == "__main__": |
|
main()
|
|
|