import os import time import shutil import platform from datetime import datetime, timezone from zoneinfo import ZoneInfo from flask import jsonify, redirect, render_template, request APP_START_TS = time.time() def _read_meminfo(): data = {} try: with open("/proc/meminfo", "r", encoding="utf-8") as f: for line in f: if ":" not in line: continue key, val = line.split(":", 1) data[key.strip()] = val.strip() except Exception: pass return data def _kb_to_mb(kb_value): try: return round(int(kb_value) / 1024, 2) except Exception: return None def _server_uptime_seconds(): try: with open("/proc/uptime", "r", encoding="utf-8") as f: return int(float(f.read().split()[0])) except Exception: return None def _format_duration(seconds): if seconds is None: return None seconds = int(seconds) days, rem = divmod(seconds, 86400) hours, rem = divmod(rem, 3600) minutes, secs = divmod(rem, 60) return f"{days}d {hours}h {minutes}m {secs}s" def _health_payload(app): now_utc = datetime.now(timezone.utc) now_toronto = now_utc.astimezone(ZoneInfo("America/Toronto")) load1 = load5 = load15 = None try: load1, load5, load15 = os.getloadavg() except Exception: pass meminfo = _read_meminfo() mem_total_kb = None mem_available_kb = None mem_used_kb = None mem_used_percent = None try: mem_total_kb = int(meminfo.get("MemTotal", "0 kB").split()[0]) mem_available_kb = int(meminfo.get("MemAvailable", "0 kB").split()[0]) mem_used_kb = mem_total_kb - mem_available_kb if mem_total_kb > 0: mem_used_percent = round((mem_used_kb / mem_total_kb) * 100, 2) except Exception: pass disk = shutil.disk_usage("/") db_ok = False db_error = None try: connector = app.config.get("OTB_HEALTH_DB_CONNECTOR") if callable(connector): conn = connector() cur = conn.cursor() cur.execute("SELECT 1") cur.fetchone() cur.close() conn.close() db_ok = True else: db_error = "DB connector not registered" except Exception as e: db_error = str(e) app_uptime = int(time.time() - APP_START_TS) server_uptime = _server_uptime_seconds() payload = { "status": "ok" if db_ok else "degraded", "app_name": "otb_billing", "hostname": platform.node(), "server_time_utc": now_utc.isoformat(), "server_time_toronto": now_toronto.isoformat(), "app_uptime_seconds": app_uptime, "app_uptime_human": _format_duration(app_uptime), "server_uptime_seconds": server_uptime, "server_uptime_human": _format_duration(server_uptime), "load_average": {"1m": load1, "5m": load5, "15m": load15}, "memory": { "total_mb": _kb_to_mb(mem_total_kb) if mem_total_kb is not None else None, "available_mb": _kb_to_mb(mem_available_kb) if mem_available_kb is not None else None, "used_mb": _kb_to_mb(mem_used_kb) if mem_used_kb is not None else None, "used_percent": mem_used_percent, }, "disk_root": { "total_gb": round(disk.total / (1024**3), 2), "used_gb": round(disk.used / (1024**3), 2), "free_gb": round(disk.free / (1024**3), 2), "used_percent": round((disk.used / disk.total) * 100, 2) if disk.total else None, }, "database": {"ok": db_ok, "error": db_error}, } http_code = 200 if db_ok else 503 return payload, http_code def _wallet_balance_format(raw_value, decimals): try: from decimal import Decimal, getcontext getcontext().prec = 60 value = Decimal(int(raw_value)) / (Decimal(10) ** int(decimals)) fixed = f"{value:.8f}".rstrip("0").rstrip(".") return fixed if fixed else "0" except Exception: return "error" def _split_rpc_urls(value, defaults): urls = [] if value: for item in str(value).replace(",", " ").split(): item = item.strip() if item: urls.append(item) for item in defaults: if item and item not in urls: urls.append(item) return urls def _wallet_rpc_call_one(rpc_url, method, params, timeout=5): import json import urllib.request payload = json.dumps({ "jsonrpc": "2.0", "id": 1, "method": method, "params": params, }).encode("utf-8") req = urllib.request.Request( rpc_url, data=payload, headers={ "Content-Type": "application/json", "Accept": "application/json", "User-Agent": "OTB-Billing-Health/1.0", }, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) if data.get("error"): raise RuntimeError(str(data["error"])) return data.get("result") def _wallet_rpc_call(rpc_urls, method, params, timeout=5): last_error = None if isinstance(rpc_urls, str): rpc_urls = [rpc_urls] for rpc_url in rpc_urls: try: return _wallet_rpc_call_one(rpc_url, method, params, timeout=timeout) except Exception as exc: last_error = f"{rpc_url}: {exc}" raise RuntimeError(last_error or "No RPC URLs available") def _wallet_native_balance(rpc_urls, wallet_address, decimals=18): result = _wallet_rpc_call(rpc_urls, "eth_getBalance", [wallet_address, "latest"]) return _wallet_balance_format(int(result or "0x0", 16), decimals) def _wallet_erc20_balance(rpc_urls, token_contract, wallet_address, decimals): # ERC20 balanceOf(address) selector: 0x70a08231 clean_addr = wallet_address.lower().replace("0x", "").rjust(64, "0") data = "0x70a08231" + clean_addr result = _wallet_rpc_call( rpc_urls, "eth_call", [{"to": token_contract, "data": data}, "latest"], ) return _wallet_balance_format(int(result or "0x0", 16), decimals) def _wallet_explorer_url(coin, chain, wallet_address): wallet = str(wallet_address or "").strip() if not wallet: return None coin = str(coin or "").upper() chain = str(chain or "").lower() # Native/token balances are shown by wallet address on each chain explorer. if coin == "USDC" or chain == "arbitrum": return f"https://arbiscan.io/address/{wallet}" if coin == "ETH" or chain == "ethereum": return f"https://etherscan.io/address/{wallet}" if coin == "ETHO" or chain == "etho": return f"https://explorer.ethoprotocol.com/address/{wallet}" if coin in ("EGAZ", "ETI") or chain == "etica": return f"https://explorer.etica-stats.org/address/{wallet}" return None def _wallet_balances(wallet_address=None, label=None): import os wallet = wallet_address or os.environ.get( "OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5", ) arbitrum_rpcs = _split_rpc_urls( os.environ.get("RPC_ARBITRUM_URLS") or os.environ.get("RPC_ARBITRUM_URL"), [ "https://arb1.arbitrum.io/rpc", "https://arbitrum-one-rpc.publicnode.com", "https://arbitrum.drpc.org", ], ) ethereum_rpcs = _split_rpc_urls( os.environ.get("RPC_ETHEREUM_URLS") or os.environ.get("RPC_ETHEREUM_URL"), [ "https://ethereum-rpc.publicnode.com", "https://cloudflare-eth.com", "https://eth.llamarpc.com", ], ) etho_rpcs = _split_rpc_urls( os.environ.get("RPC_ETHO_URLS") or os.environ.get("RPC_ETHO_URL"), [ "https://rpc.ethoprotocol.com", "https://rpc4.ethoprotocol.com", ], ) etica_rpcs = _split_rpc_urls( os.environ.get("RPC_ETICA_URLS") or os.environ.get("RPC_ETICA_URL"), [ "https://rpc.etica-stats.org", "https://eticamainnet.eticaprotocol.org", ], ) assets = [ { "coin": "USDC", "chain": "Arbitrum", "kind": "erc20", "decimals": 6, "rpc": arbitrum_rpcs, "token": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831", }, { "coin": "ETH", "chain": "Ethereum", "kind": "native", "decimals": 18, "rpc": ethereum_rpcs, }, { "coin": "ETHO", "chain": "Etho", "kind": "native", "decimals": 18, "rpc": etho_rpcs, }, { "coin": "EGAZ", "chain": "Etica", "kind": "native", "decimals": 18, "rpc": etica_rpcs, }, { "coin": "ETI", "chain": "Etica", "kind": "erc20", "decimals": 18, "rpc": etica_rpcs, "token": "0x34c61EA91bAcdA647269d4e310A86b875c09946f", }, ] rows = [] for asset in assets: row = { "coin": asset["coin"], "chain": asset["chain"], "balance": "error", "ok": False, "error": None, "explorer_url": _wallet_explorer_url(asset["coin"], asset["chain"], wallet), } try: if asset["kind"] == "native": row["balance"] = _wallet_native_balance( asset["rpc"], wallet, asset["decimals"], ) else: row["balance"] = _wallet_erc20_balance( asset["rpc"], asset["token"], wallet, asset["decimals"], ) row["ok"] = True except Exception as exc: row["error"] = str(exc)[:220] rows.append(row) return { "label": label, "wallet": wallet, "assets": rows, } def _systemctl_value(args, timeout=3): import subprocess try: proc = subprocess.run( ["systemctl"] + list(args), text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=False, ) return { "ok": proc.returncode == 0, "code": proc.returncode, "stdout": (proc.stdout or "").strip(), "stderr": (proc.stderr or "").strip(), } except Exception as exc: return { "ok": False, "code": None, "stdout": "", "stderr": str(exc), } def _try_payment_stats_from_db(app): stats = { "available": False, "error": None, "pending": 0, "confirmed_today": 0, "stale_pending": 0, } conn = None try: from db import get_db_connection conn = get_db_connection() cur = conn.cursor(dictionary=True) cur.execute("SHOW COLUMNS FROM payments") cols = {row["Field"] for row in cur.fetchall()} crypto_filters = [] if "payment_currency" in cols: crypto_filters.append("payment_currency IN ('USDC','ETH','ETHO','EGAZ','ETI')") if "notes" in cols: crypto_filters.append("notes LIKE '%portal_crypto_intent:%'") crypto_clause = "" if crypto_filters: crypto_clause = " AND (" + " OR ".join(crypto_filters) + ")" if "payment_status" not in cols: stats["error"] = "payments.payment_status column not found" return stats cur.execute(f""" SELECT COUNT(*) AS count_value FROM payments WHERE payment_status = 'pending' {crypto_clause} """) stats["pending"] = int((cur.fetchone() or {}).get("count_value") or 0) timestamp_candidates = [ c for c in ("received_at", "updated_at", "created_at") if c in cols ] if timestamp_candidates: timestamp_expr = "COALESCE(" + ", ".join(timestamp_candidates) + ")" cur.execute(f""" SELECT COUNT(*) AS count_value FROM payments WHERE payment_status = 'confirmed' AND DATE({timestamp_expr}) = UTC_DATE() {crypto_clause} """) stats["confirmed_today"] = int((cur.fetchone() or {}).get("count_value") or 0) else: stats["confirmed_today"] = 0 stale_col = None for candidate in ("created_at", "first_seen_at", "last_checked_at"): if candidate in cols: stale_col = candidate break if stale_col: cur.execute(f""" SELECT COUNT(*) AS count_value FROM payments WHERE payment_status = 'pending' AND {stale_col} < (UTC_TIMESTAMP() - INTERVAL 30 MINUTE) {crypto_clause} """) stats["stale_pending"] = int((cur.fetchone() or {}).get("count_value") or 0) else: stats["stale_pending"] = 0 stats["available"] = True stats["error"] = None return stats except Exception as exc: stats["available"] = False stats["error"] = str(exc)[:220] return stats finally: try: if conn: conn.close() except Exception: pass def _parse_systemctl_show(stdout): parsed = {} for line in str(stdout or "").splitlines(): if "=" not in line: continue key, value = line.split("=", 1) parsed[key.strip()] = value.strip() return parsed def _crypto_reconcile_status(app): service_name = "otb-billing-crypto-reconcile.service" timer_name = "otb-billing-crypto-reconcile.timer" timer_active = _systemctl_value(["is-active", timer_name]) timer_enabled = _systemctl_value(["is-enabled", timer_name]) service_active = _systemctl_value(["is-active", service_name]) service_enabled = _systemctl_value(["is-enabled", service_name]) timer_list = _systemctl_value([ "list-timers", "--all", "--no-pager", "--plain", timer_name, ]) service_show = _systemctl_value([ "show", service_name, "--property=ActiveEnterTimestamp", "--property=InactiveEnterTimestamp", "--property=ExecMainStatus", "--property=Result", "--property=NRestarts", "--no-pager", ]) parsed = _parse_systemctl_show(service_show["stdout"]) active_entered = parsed.get("ActiveEnterTimestamp") or "" inactive_entered = parsed.get("InactiveEnterTimestamp") or "" result = parsed.get("Result") or "unknown" exec_status = parsed.get("ExecMainStatus") or "unknown" # For a oneshot timer service, InactiveEnterTimestamp is usually the useful # "last completed" timestamp. Fall back to ActiveEnterTimestamp if needed. last_run = inactive_entered if not last_run or last_run.lower() in ("n/a", "never"): last_run = active_entered if not last_run: last_run = "unknown" return { "service_name": service_name, "timer_name": timer_name, "timer_active": timer_active["stdout"] or "unknown", "timer_enabled": timer_enabled["stdout"] or "unknown", "service_active": service_active["stdout"] or "unknown", "service_enabled": service_enabled["stdout"] or "unknown", "timer_line": timer_list["stdout"], "service_details": service_show["stdout"], "last_run": last_run, "last_result": result, "last_exit_status": exec_status, "payment_stats": _try_payment_stats_from_db(app), } def register_health_routes(app): @app.route("/health", methods=["GET"]) def health_page(): health, http_code = _health_payload(app) health["operations_balances"] = _wallet_balances( os.environ.get("OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5"), "Operations Bal", ) health["treasury_balances"] = _wallet_balances( os.environ.get("OTB_TREASURY_WALLET", "0xbe1fdc8c69f712d62cfcd3bf23f636de1dbd213f"), "Treasury Bal", ) # Backward compatible JSON key for anything already reading health.json. health["wallet_balances"] = health["operations_balances"] health["crypto_reconcile"] = _crypto_reconcile_status(app) return render_template("health.html", health=health), http_code @app.route("/health/reconcile-now", methods=["POST"]) def health_reconcile_now(): import subprocess import sys from pathlib import Path base_dir = Path("/home/def/otb_billing") worker = base_dir / "scripts" / "crypto_reconciliation_worker.py" log_path = base_dir / "logs" / "crypto_reconciliation_worker.log" if not worker.exists(): return redirect("/health?reconcile=missing-worker") try: log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, "a", encoding="utf-8") as log_fh: log_fh.write("\\n[manual health trigger] starting crypto reconciliation worker\\n") subprocess.Popen( [sys.executable, str(worker)], cwd=str(base_dir), stdout=log_fh, stderr=subprocess.STDOUT, start_new_session=True, ) return redirect("/health?reconcile=started") except Exception as exc: return redirect("/health?reconcile=failed") @app.route("/health.json", methods=["GET"]) def health_json(): health, http_code = _health_payload(app) health["operations_balances"] = _wallet_balances( os.environ.get("OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5"), "Operations Bal", ) health["treasury_balances"] = _wallet_balances( os.environ.get("OTB_TREASURY_WALLET", "0xbe1fdc8c69f712d62cfcd3bf23f636de1dbd213f"), "Treasury Bal", ) # Backward compatible JSON key for anything already reading health.json. health["wallet_balances"] = health["operations_balances"] health["crypto_reconcile"] = _crypto_reconcile_status(app) return jsonify(health), http_code