You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
626 lines
18 KiB
626 lines
18 KiB
import os |
|
import time |
|
import shutil |
|
import platform |
|
from datetime import datetime, timezone |
|
from zoneinfo import ZoneInfo |
|
|
|
from flask import jsonify, redirect, render_template, request |
|
|
|
APP_START_TS = time.time() |
|
|
|
|
|
def _read_meminfo(): |
|
data = {} |
|
try: |
|
with open("/proc/meminfo", "r", encoding="utf-8") as f: |
|
for line in f: |
|
if ":" not in line: |
|
continue |
|
key, val = line.split(":", 1) |
|
data[key.strip()] = val.strip() |
|
except Exception: |
|
pass |
|
return data |
|
|
|
|
|
def _kb_to_mb(kb_value): |
|
try: |
|
return round(int(kb_value) / 1024, 2) |
|
except Exception: |
|
return None |
|
|
|
|
|
def _server_uptime_seconds(): |
|
try: |
|
with open("/proc/uptime", "r", encoding="utf-8") as f: |
|
return int(float(f.read().split()[0])) |
|
except Exception: |
|
return None |
|
|
|
|
|
def _format_duration(seconds): |
|
if seconds is None: |
|
return None |
|
seconds = int(seconds) |
|
days, rem = divmod(seconds, 86400) |
|
hours, rem = divmod(rem, 3600) |
|
minutes, secs = divmod(rem, 60) |
|
return f"{days}d {hours}h {minutes}m {secs}s" |
|
|
|
|
|
def _health_payload(app): |
|
now_utc = datetime.now(timezone.utc) |
|
now_toronto = now_utc.astimezone(ZoneInfo("America/Toronto")) |
|
|
|
load1 = load5 = load15 = None |
|
try: |
|
load1, load5, load15 = os.getloadavg() |
|
except Exception: |
|
pass |
|
|
|
meminfo = _read_meminfo() |
|
mem_total_kb = None |
|
mem_available_kb = None |
|
mem_used_kb = None |
|
mem_used_percent = None |
|
try: |
|
mem_total_kb = int(meminfo.get("MemTotal", "0 kB").split()[0]) |
|
mem_available_kb = int(meminfo.get("MemAvailable", "0 kB").split()[0]) |
|
mem_used_kb = mem_total_kb - mem_available_kb |
|
if mem_total_kb > 0: |
|
mem_used_percent = round((mem_used_kb / mem_total_kb) * 100, 2) |
|
except Exception: |
|
pass |
|
|
|
disk = shutil.disk_usage("/") |
|
|
|
db_ok = False |
|
db_error = None |
|
try: |
|
connector = app.config.get("OTB_HEALTH_DB_CONNECTOR") |
|
if callable(connector): |
|
conn = connector() |
|
cur = conn.cursor() |
|
cur.execute("SELECT 1") |
|
cur.fetchone() |
|
cur.close() |
|
conn.close() |
|
db_ok = True |
|
else: |
|
db_error = "DB connector not registered" |
|
except Exception as e: |
|
db_error = str(e) |
|
|
|
app_uptime = int(time.time() - APP_START_TS) |
|
server_uptime = _server_uptime_seconds() |
|
|
|
payload = { |
|
"status": "ok" if db_ok else "degraded", |
|
"app_name": "otb_billing", |
|
"hostname": platform.node(), |
|
"server_time_utc": now_utc.isoformat(), |
|
"server_time_toronto": now_toronto.isoformat(), |
|
"app_uptime_seconds": app_uptime, |
|
"app_uptime_human": _format_duration(app_uptime), |
|
"server_uptime_seconds": server_uptime, |
|
"server_uptime_human": _format_duration(server_uptime), |
|
"load_average": {"1m": load1, "5m": load5, "15m": load15}, |
|
"memory": { |
|
"total_mb": _kb_to_mb(mem_total_kb) if mem_total_kb is not None else None, |
|
"available_mb": _kb_to_mb(mem_available_kb) if mem_available_kb is not None else None, |
|
"used_mb": _kb_to_mb(mem_used_kb) if mem_used_kb is not None else None, |
|
"used_percent": mem_used_percent, |
|
}, |
|
"disk_root": { |
|
"total_gb": round(disk.total / (1024**3), 2), |
|
"used_gb": round(disk.used / (1024**3), 2), |
|
"free_gb": round(disk.free / (1024**3), 2), |
|
"used_percent": round((disk.used / disk.total) * 100, 2) if disk.total else None, |
|
}, |
|
"database": {"ok": db_ok, "error": db_error}, |
|
} |
|
|
|
http_code = 200 if db_ok else 503 |
|
return payload, http_code |
|
|
|
|
|
|
|
|
|
def _wallet_balance_format(raw_value, decimals): |
|
try: |
|
from decimal import Decimal, getcontext |
|
getcontext().prec = 60 |
|
value = Decimal(int(raw_value)) / (Decimal(10) ** int(decimals)) |
|
fixed = f"{value:.8f}".rstrip("0").rstrip(".") |
|
return fixed if fixed else "0" |
|
except Exception: |
|
return "error" |
|
|
|
|
|
def _split_rpc_urls(value, defaults): |
|
urls = [] |
|
|
|
if value: |
|
for item in str(value).replace(",", " ").split(): |
|
item = item.strip() |
|
if item: |
|
urls.append(item) |
|
|
|
for item in defaults: |
|
if item and item not in urls: |
|
urls.append(item) |
|
|
|
return urls |
|
|
|
|
|
def _wallet_rpc_call_one(rpc_url, method, params, timeout=5): |
|
import json |
|
import urllib.request |
|
|
|
payload = json.dumps({ |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": method, |
|
"params": params, |
|
}).encode("utf-8") |
|
|
|
req = urllib.request.Request( |
|
rpc_url, |
|
data=payload, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"Accept": "application/json", |
|
"User-Agent": "OTB-Billing-Health/1.0", |
|
}, |
|
method="POST", |
|
) |
|
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp: |
|
data = json.loads(resp.read().decode("utf-8")) |
|
|
|
if data.get("error"): |
|
raise RuntimeError(str(data["error"])) |
|
|
|
return data.get("result") |
|
|
|
|
|
def _wallet_rpc_call(rpc_urls, method, params, timeout=5): |
|
last_error = None |
|
|
|
if isinstance(rpc_urls, str): |
|
rpc_urls = [rpc_urls] |
|
|
|
for rpc_url in rpc_urls: |
|
try: |
|
return _wallet_rpc_call_one(rpc_url, method, params, timeout=timeout) |
|
except Exception as exc: |
|
last_error = f"{rpc_url}: {exc}" |
|
|
|
raise RuntimeError(last_error or "No RPC URLs available") |
|
|
|
|
|
def _wallet_native_balance(rpc_urls, wallet_address, decimals=18): |
|
result = _wallet_rpc_call(rpc_urls, "eth_getBalance", [wallet_address, "latest"]) |
|
return _wallet_balance_format(int(result or "0x0", 16), decimals) |
|
|
|
|
|
def _wallet_erc20_balance(rpc_urls, token_contract, wallet_address, decimals): |
|
# ERC20 balanceOf(address) selector: 0x70a08231 |
|
clean_addr = wallet_address.lower().replace("0x", "").rjust(64, "0") |
|
data = "0x70a08231" + clean_addr |
|
result = _wallet_rpc_call( |
|
rpc_urls, |
|
"eth_call", |
|
[{"to": token_contract, "data": data}, "latest"], |
|
) |
|
return _wallet_balance_format(int(result or "0x0", 16), decimals) |
|
|
|
|
|
|
|
|
|
def _wallet_explorer_url(coin, chain, wallet_address): |
|
wallet = str(wallet_address or "").strip() |
|
if not wallet: |
|
return None |
|
|
|
coin = str(coin or "").upper() |
|
chain = str(chain or "").lower() |
|
|
|
# Native/token balances are shown by wallet address on each chain explorer. |
|
if coin == "USDC" or chain == "arbitrum": |
|
return f"https://arbiscan.io/address/{wallet}" |
|
|
|
if coin == "ETH" or chain == "ethereum": |
|
return f"https://etherscan.io/address/{wallet}" |
|
|
|
if coin == "ETHO" or chain == "etho": |
|
return f"https://explorer.ethoprotocol.com/address/{wallet}" |
|
|
|
if coin in ("EGAZ", "ETI") or chain == "etica": |
|
return f"https://explorer.etica-stats.org/address/{wallet}" |
|
|
|
return None |
|
|
|
|
|
def _wallet_balances(wallet_address=None, label=None): |
|
import os |
|
|
|
wallet = wallet_address or os.environ.get( |
|
"OTB_OPERATIONS_WALLET", |
|
"0x44f6c44C42e6ae0392E7289F032384C0d37F56D5", |
|
) |
|
|
|
arbitrum_rpcs = _split_rpc_urls( |
|
os.environ.get("RPC_ARBITRUM_URLS") or os.environ.get("RPC_ARBITRUM_URL"), |
|
[ |
|
"https://arb1.arbitrum.io/rpc", |
|
"https://arbitrum-one-rpc.publicnode.com", |
|
"https://arbitrum.drpc.org", |
|
], |
|
) |
|
|
|
ethereum_rpcs = _split_rpc_urls( |
|
os.environ.get("RPC_ETHEREUM_URLS") or os.environ.get("RPC_ETHEREUM_URL"), |
|
[ |
|
"https://ethereum-rpc.publicnode.com", |
|
"https://cloudflare-eth.com", |
|
"https://eth.llamarpc.com", |
|
], |
|
) |
|
|
|
etho_rpcs = _split_rpc_urls( |
|
os.environ.get("RPC_ETHO_URLS") or os.environ.get("RPC_ETHO_URL"), |
|
[ |
|
"https://rpc.ethoprotocol.com", |
|
"https://rpc4.ethoprotocol.com", |
|
], |
|
) |
|
|
|
etica_rpcs = _split_rpc_urls( |
|
os.environ.get("RPC_ETICA_URLS") or os.environ.get("RPC_ETICA_URL"), |
|
[ |
|
"https://rpc.etica-stats.org", |
|
"https://eticamainnet.eticaprotocol.org", |
|
], |
|
) |
|
|
|
assets = [ |
|
{ |
|
"coin": "USDC", |
|
"chain": "Arbitrum", |
|
"kind": "erc20", |
|
"decimals": 6, |
|
"rpc": arbitrum_rpcs, |
|
"token": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831", |
|
}, |
|
{ |
|
"coin": "ETH", |
|
"chain": "Ethereum", |
|
"kind": "native", |
|
"decimals": 18, |
|
"rpc": ethereum_rpcs, |
|
}, |
|
{ |
|
"coin": "ETHO", |
|
"chain": "Etho", |
|
"kind": "native", |
|
"decimals": 18, |
|
"rpc": etho_rpcs, |
|
}, |
|
{ |
|
"coin": "EGAZ", |
|
"chain": "Etica", |
|
"kind": "native", |
|
"decimals": 18, |
|
"rpc": etica_rpcs, |
|
}, |
|
{ |
|
"coin": "ETI", |
|
"chain": "Etica", |
|
"kind": "erc20", |
|
"decimals": 18, |
|
"rpc": etica_rpcs, |
|
"token": "0x34c61EA91bAcdA647269d4e310A86b875c09946f", |
|
}, |
|
] |
|
|
|
rows = [] |
|
for asset in assets: |
|
row = { |
|
"coin": asset["coin"], |
|
"chain": asset["chain"], |
|
"balance": "error", |
|
"ok": False, |
|
"error": None, |
|
"explorer_url": _wallet_explorer_url(asset["coin"], asset["chain"], wallet), |
|
} |
|
|
|
try: |
|
if asset["kind"] == "native": |
|
row["balance"] = _wallet_native_balance( |
|
asset["rpc"], |
|
wallet, |
|
asset["decimals"], |
|
) |
|
else: |
|
row["balance"] = _wallet_erc20_balance( |
|
asset["rpc"], |
|
asset["token"], |
|
wallet, |
|
asset["decimals"], |
|
) |
|
row["ok"] = True |
|
except Exception as exc: |
|
row["error"] = str(exc)[:220] |
|
|
|
rows.append(row) |
|
|
|
return { |
|
"label": label, |
|
"wallet": wallet, |
|
"assets": rows, |
|
} |
|
|
|
|
|
|
|
|
|
|
|
def _systemctl_value(args, timeout=3): |
|
import subprocess |
|
|
|
try: |
|
proc = subprocess.run( |
|
["systemctl"] + list(args), |
|
text=True, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
timeout=timeout, |
|
check=False, |
|
) |
|
return { |
|
"ok": proc.returncode == 0, |
|
"code": proc.returncode, |
|
"stdout": (proc.stdout or "").strip(), |
|
"stderr": (proc.stderr or "").strip(), |
|
} |
|
except Exception as exc: |
|
return { |
|
"ok": False, |
|
"code": None, |
|
"stdout": "", |
|
"stderr": str(exc), |
|
} |
|
|
|
|
|
def _try_payment_stats_from_db(app): |
|
stats = { |
|
"available": False, |
|
"error": None, |
|
"pending": 0, |
|
"confirmed_today": 0, |
|
"stale_pending": 0, |
|
} |
|
|
|
conn = None |
|
|
|
try: |
|
from db import get_db_connection |
|
|
|
conn = get_db_connection() |
|
cur = conn.cursor(dictionary=True) |
|
|
|
cur.execute("SHOW COLUMNS FROM payments") |
|
cols = {row["Field"] for row in cur.fetchall()} |
|
|
|
crypto_filters = [] |
|
|
|
if "payment_currency" in cols: |
|
crypto_filters.append("payment_currency IN ('USDC','ETH','ETHO','EGAZ','ETI')") |
|
|
|
if "notes" in cols: |
|
crypto_filters.append("notes LIKE '%portal_crypto_intent:%'") |
|
|
|
crypto_clause = "" |
|
if crypto_filters: |
|
crypto_clause = " AND (" + " OR ".join(crypto_filters) + ")" |
|
|
|
if "payment_status" not in cols: |
|
stats["error"] = "payments.payment_status column not found" |
|
return stats |
|
|
|
cur.execute(f""" |
|
SELECT COUNT(*) AS count_value |
|
FROM payments |
|
WHERE payment_status = 'pending' |
|
{crypto_clause} |
|
""") |
|
stats["pending"] = int((cur.fetchone() or {}).get("count_value") or 0) |
|
|
|
timestamp_candidates = [ |
|
c for c in ("received_at", "updated_at", "created_at") |
|
if c in cols |
|
] |
|
|
|
if timestamp_candidates: |
|
timestamp_expr = "COALESCE(" + ", ".join(timestamp_candidates) + ")" |
|
cur.execute(f""" |
|
SELECT COUNT(*) AS count_value |
|
FROM payments |
|
WHERE payment_status = 'confirmed' |
|
AND DATE({timestamp_expr}) = UTC_DATE() |
|
{crypto_clause} |
|
""") |
|
stats["confirmed_today"] = int((cur.fetchone() or {}).get("count_value") or 0) |
|
else: |
|
stats["confirmed_today"] = 0 |
|
|
|
stale_col = None |
|
for candidate in ("created_at", "first_seen_at", "last_checked_at"): |
|
if candidate in cols: |
|
stale_col = candidate |
|
break |
|
|
|
if stale_col: |
|
cur.execute(f""" |
|
SELECT COUNT(*) AS count_value |
|
FROM payments |
|
WHERE payment_status = 'pending' |
|
AND {stale_col} < (UTC_TIMESTAMP() - INTERVAL 30 MINUTE) |
|
{crypto_clause} |
|
""") |
|
stats["stale_pending"] = int((cur.fetchone() or {}).get("count_value") or 0) |
|
else: |
|
stats["stale_pending"] = 0 |
|
|
|
stats["available"] = True |
|
stats["error"] = None |
|
return stats |
|
|
|
except Exception as exc: |
|
stats["available"] = False |
|
stats["error"] = str(exc)[:220] |
|
return stats |
|
|
|
finally: |
|
try: |
|
if conn: |
|
conn.close() |
|
except Exception: |
|
pass |
|
|
|
|
|
def _parse_systemctl_show(stdout): |
|
parsed = {} |
|
|
|
for line in str(stdout or "").splitlines(): |
|
if "=" not in line: |
|
continue |
|
key, value = line.split("=", 1) |
|
parsed[key.strip()] = value.strip() |
|
|
|
return parsed |
|
|
|
|
|
def _crypto_reconcile_status(app): |
|
service_name = "otb-billing-crypto-reconcile.service" |
|
timer_name = "otb-billing-crypto-reconcile.timer" |
|
|
|
timer_active = _systemctl_value(["is-active", timer_name]) |
|
timer_enabled = _systemctl_value(["is-enabled", timer_name]) |
|
service_active = _systemctl_value(["is-active", service_name]) |
|
service_enabled = _systemctl_value(["is-enabled", service_name]) |
|
|
|
timer_list = _systemctl_value([ |
|
"list-timers", |
|
"--all", |
|
"--no-pager", |
|
"--plain", |
|
timer_name, |
|
]) |
|
|
|
service_show = _systemctl_value([ |
|
"show", |
|
service_name, |
|
"--property=ActiveEnterTimestamp", |
|
"--property=InactiveEnterTimestamp", |
|
"--property=ExecMainStatus", |
|
"--property=Result", |
|
"--property=NRestarts", |
|
"--no-pager", |
|
]) |
|
|
|
parsed = _parse_systemctl_show(service_show["stdout"]) |
|
|
|
active_entered = parsed.get("ActiveEnterTimestamp") or "" |
|
inactive_entered = parsed.get("InactiveEnterTimestamp") or "" |
|
result = parsed.get("Result") or "unknown" |
|
exec_status = parsed.get("ExecMainStatus") or "unknown" |
|
|
|
# For a oneshot timer service, InactiveEnterTimestamp is usually the useful |
|
# "last completed" timestamp. Fall back to ActiveEnterTimestamp if needed. |
|
last_run = inactive_entered |
|
if not last_run or last_run.lower() in ("n/a", "never"): |
|
last_run = active_entered |
|
|
|
if not last_run: |
|
last_run = "unknown" |
|
|
|
return { |
|
"service_name": service_name, |
|
"timer_name": timer_name, |
|
"timer_active": timer_active["stdout"] or "unknown", |
|
"timer_enabled": timer_enabled["stdout"] or "unknown", |
|
"service_active": service_active["stdout"] or "unknown", |
|
"service_enabled": service_enabled["stdout"] or "unknown", |
|
"timer_line": timer_list["stdout"], |
|
"service_details": service_show["stdout"], |
|
"last_run": last_run, |
|
"last_result": result, |
|
"last_exit_status": exec_status, |
|
"payment_stats": _try_payment_stats_from_db(app), |
|
} |
|
|
|
|
|
def register_health_routes(app): |
|
@app.route("/health", methods=["GET"]) |
|
def health_page(): |
|
health, http_code = _health_payload(app) |
|
health["operations_balances"] = _wallet_balances( |
|
os.environ.get("OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5"), |
|
"Operations Bal", |
|
) |
|
health["treasury_balances"] = _wallet_balances( |
|
os.environ.get("OTB_TREASURY_WALLET", "0xbe1fdc8c69f712d62cfcd3bf23f636de1dbd213f"), |
|
"Treasury Bal", |
|
) |
|
# Backward compatible JSON key for anything already reading health.json. |
|
health["wallet_balances"] = health["operations_balances"] |
|
health["crypto_reconcile"] = _crypto_reconcile_status(app) |
|
return render_template("health.html", health=health), http_code |
|
|
|
|
|
@app.route("/health/reconcile-now", methods=["POST"]) |
|
def health_reconcile_now(): |
|
import subprocess |
|
import sys |
|
from pathlib import Path |
|
|
|
base_dir = Path("/home/def/otb_billing") |
|
worker = base_dir / "scripts" / "crypto_reconciliation_worker.py" |
|
log_path = base_dir / "logs" / "crypto_reconciliation_worker.log" |
|
|
|
if not worker.exists(): |
|
return redirect("/health?reconcile=missing-worker") |
|
|
|
try: |
|
log_path.parent.mkdir(parents=True, exist_ok=True) |
|
with open(log_path, "a", encoding="utf-8") as log_fh: |
|
log_fh.write("\\n[manual health trigger] starting crypto reconciliation worker\\n") |
|
subprocess.Popen( |
|
[sys.executable, str(worker)], |
|
cwd=str(base_dir), |
|
stdout=log_fh, |
|
stderr=subprocess.STDOUT, |
|
start_new_session=True, |
|
) |
|
return redirect("/health?reconcile=started") |
|
except Exception as exc: |
|
return redirect("/health?reconcile=failed") |
|
|
|
|
|
@app.route("/health.json", methods=["GET"]) |
|
def health_json(): |
|
health, http_code = _health_payload(app) |
|
health["operations_balances"] = _wallet_balances( |
|
os.environ.get("OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5"), |
|
"Operations Bal", |
|
) |
|
health["treasury_balances"] = _wallet_balances( |
|
os.environ.get("OTB_TREASURY_WALLET", "0xbe1fdc8c69f712d62cfcd3bf23f636de1dbd213f"), |
|
"Treasury Bal", |
|
) |
|
# Backward compatible JSON key for anything already reading health.json. |
|
health["wallet_balances"] = health["operations_balances"] |
|
health["crypto_reconcile"] = _crypto_reconcile_status(app) |
|
return jsonify(health), http_code
|
|
|