billing frontend for mariadb. setup as otb_billing for outsidethebox.top accounting. also involved with outsidethedb
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

626 lines
18 KiB

import os
import time
import shutil
import platform
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from flask import jsonify, redirect, render_template, request
APP_START_TS = time.time()
def _read_meminfo():
data = {}
try:
with open("/proc/meminfo", "r", encoding="utf-8") as f:
for line in f:
if ":" not in line:
continue
key, val = line.split(":", 1)
data[key.strip()] = val.strip()
except Exception:
pass
return data
def _kb_to_mb(kb_value):
try:
return round(int(kb_value) / 1024, 2)
except Exception:
return None
def _server_uptime_seconds():
try:
with open("/proc/uptime", "r", encoding="utf-8") as f:
return int(float(f.read().split()[0]))
except Exception:
return None
def _format_duration(seconds):
if seconds is None:
return None
seconds = int(seconds)
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)
return f"{days}d {hours}h {minutes}m {secs}s"
def _health_payload(app):
now_utc = datetime.now(timezone.utc)
now_toronto = now_utc.astimezone(ZoneInfo("America/Toronto"))
load1 = load5 = load15 = None
try:
load1, load5, load15 = os.getloadavg()
except Exception:
pass
meminfo = _read_meminfo()
mem_total_kb = None
mem_available_kb = None
mem_used_kb = None
mem_used_percent = None
try:
mem_total_kb = int(meminfo.get("MemTotal", "0 kB").split()[0])
mem_available_kb = int(meminfo.get("MemAvailable", "0 kB").split()[0])
mem_used_kb = mem_total_kb - mem_available_kb
if mem_total_kb > 0:
mem_used_percent = round((mem_used_kb / mem_total_kb) * 100, 2)
except Exception:
pass
disk = shutil.disk_usage("/")
db_ok = False
db_error = None
try:
connector = app.config.get("OTB_HEALTH_DB_CONNECTOR")
if callable(connector):
conn = connector()
cur = conn.cursor()
cur.execute("SELECT 1")
cur.fetchone()
cur.close()
conn.close()
db_ok = True
else:
db_error = "DB connector not registered"
except Exception as e:
db_error = str(e)
app_uptime = int(time.time() - APP_START_TS)
server_uptime = _server_uptime_seconds()
payload = {
"status": "ok" if db_ok else "degraded",
"app_name": "otb_billing",
"hostname": platform.node(),
"server_time_utc": now_utc.isoformat(),
"server_time_toronto": now_toronto.isoformat(),
"app_uptime_seconds": app_uptime,
"app_uptime_human": _format_duration(app_uptime),
"server_uptime_seconds": server_uptime,
"server_uptime_human": _format_duration(server_uptime),
"load_average": {"1m": load1, "5m": load5, "15m": load15},
"memory": {
"total_mb": _kb_to_mb(mem_total_kb) if mem_total_kb is not None else None,
"available_mb": _kb_to_mb(mem_available_kb) if mem_available_kb is not None else None,
"used_mb": _kb_to_mb(mem_used_kb) if mem_used_kb is not None else None,
"used_percent": mem_used_percent,
},
"disk_root": {
"total_gb": round(disk.total / (1024**3), 2),
"used_gb": round(disk.used / (1024**3), 2),
"free_gb": round(disk.free / (1024**3), 2),
"used_percent": round((disk.used / disk.total) * 100, 2) if disk.total else None,
},
"database": {"ok": db_ok, "error": db_error},
}
http_code = 200 if db_ok else 503
return payload, http_code
def _wallet_balance_format(raw_value, decimals):
try:
from decimal import Decimal, getcontext
getcontext().prec = 60
value = Decimal(int(raw_value)) / (Decimal(10) ** int(decimals))
fixed = f"{value:.8f}".rstrip("0").rstrip(".")
return fixed if fixed else "0"
except Exception:
return "error"
def _split_rpc_urls(value, defaults):
urls = []
if value:
for item in str(value).replace(",", " ").split():
item = item.strip()
if item:
urls.append(item)
for item in defaults:
if item and item not in urls:
urls.append(item)
return urls
def _wallet_rpc_call_one(rpc_url, method, params, timeout=5):
import json
import urllib.request
payload = json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
}).encode("utf-8")
req = urllib.request.Request(
rpc_url,
data=payload,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "OTB-Billing-Health/1.0",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read().decode("utf-8"))
if data.get("error"):
raise RuntimeError(str(data["error"]))
return data.get("result")
def _wallet_rpc_call(rpc_urls, method, params, timeout=5):
last_error = None
if isinstance(rpc_urls, str):
rpc_urls = [rpc_urls]
for rpc_url in rpc_urls:
try:
return _wallet_rpc_call_one(rpc_url, method, params, timeout=timeout)
except Exception as exc:
last_error = f"{rpc_url}: {exc}"
raise RuntimeError(last_error or "No RPC URLs available")
def _wallet_native_balance(rpc_urls, wallet_address, decimals=18):
result = _wallet_rpc_call(rpc_urls, "eth_getBalance", [wallet_address, "latest"])
return _wallet_balance_format(int(result or "0x0", 16), decimals)
def _wallet_erc20_balance(rpc_urls, token_contract, wallet_address, decimals):
# ERC20 balanceOf(address) selector: 0x70a08231
clean_addr = wallet_address.lower().replace("0x", "").rjust(64, "0")
data = "0x70a08231" + clean_addr
result = _wallet_rpc_call(
rpc_urls,
"eth_call",
[{"to": token_contract, "data": data}, "latest"],
)
return _wallet_balance_format(int(result or "0x0", 16), decimals)
def _wallet_explorer_url(coin, chain, wallet_address):
wallet = str(wallet_address or "").strip()
if not wallet:
return None
coin = str(coin or "").upper()
chain = str(chain or "").lower()
# Native/token balances are shown by wallet address on each chain explorer.
if coin == "USDC" or chain == "arbitrum":
return f"https://arbiscan.io/address/{wallet}"
if coin == "ETH" or chain == "ethereum":
return f"https://etherscan.io/address/{wallet}"
if coin == "ETHO" or chain == "etho":
return f"https://explorer.ethoprotocol.com/address/{wallet}"
if coin in ("EGAZ", "ETI") or chain == "etica":
return f"https://explorer.etica-stats.org/address/{wallet}"
return None
def _wallet_balances(wallet_address=None, label=None):
import os
wallet = wallet_address or os.environ.get(
"OTB_OPERATIONS_WALLET",
"0x44f6c44C42e6ae0392E7289F032384C0d37F56D5",
)
arbitrum_rpcs = _split_rpc_urls(
os.environ.get("RPC_ARBITRUM_URLS") or os.environ.get("RPC_ARBITRUM_URL"),
[
"https://arb1.arbitrum.io/rpc",
"https://arbitrum-one-rpc.publicnode.com",
"https://arbitrum.drpc.org",
],
)
ethereum_rpcs = _split_rpc_urls(
os.environ.get("RPC_ETHEREUM_URLS") or os.environ.get("RPC_ETHEREUM_URL"),
[
"https://ethereum-rpc.publicnode.com",
"https://cloudflare-eth.com",
"https://eth.llamarpc.com",
],
)
etho_rpcs = _split_rpc_urls(
os.environ.get("RPC_ETHO_URLS") or os.environ.get("RPC_ETHO_URL"),
[
"https://rpc.ethoprotocol.com",
"https://rpc4.ethoprotocol.com",
],
)
etica_rpcs = _split_rpc_urls(
os.environ.get("RPC_ETICA_URLS") or os.environ.get("RPC_ETICA_URL"),
[
"https://rpc.etica-stats.org",
"https://eticamainnet.eticaprotocol.org",
],
)
assets = [
{
"coin": "USDC",
"chain": "Arbitrum",
"kind": "erc20",
"decimals": 6,
"rpc": arbitrum_rpcs,
"token": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
},
{
"coin": "ETH",
"chain": "Ethereum",
"kind": "native",
"decimals": 18,
"rpc": ethereum_rpcs,
},
{
"coin": "ETHO",
"chain": "Etho",
"kind": "native",
"decimals": 18,
"rpc": etho_rpcs,
},
{
"coin": "EGAZ",
"chain": "Etica",
"kind": "native",
"decimals": 18,
"rpc": etica_rpcs,
},
{
"coin": "ETI",
"chain": "Etica",
"kind": "erc20",
"decimals": 18,
"rpc": etica_rpcs,
"token": "0x34c61EA91bAcdA647269d4e310A86b875c09946f",
},
]
rows = []
for asset in assets:
row = {
"coin": asset["coin"],
"chain": asset["chain"],
"balance": "error",
"ok": False,
"error": None,
"explorer_url": _wallet_explorer_url(asset["coin"], asset["chain"], wallet),
}
try:
if asset["kind"] == "native":
row["balance"] = _wallet_native_balance(
asset["rpc"],
wallet,
asset["decimals"],
)
else:
row["balance"] = _wallet_erc20_balance(
asset["rpc"],
asset["token"],
wallet,
asset["decimals"],
)
row["ok"] = True
except Exception as exc:
row["error"] = str(exc)[:220]
rows.append(row)
return {
"label": label,
"wallet": wallet,
"assets": rows,
}
def _systemctl_value(args, timeout=3):
import subprocess
try:
proc = subprocess.run(
["systemctl"] + list(args),
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=False,
)
return {
"ok": proc.returncode == 0,
"code": proc.returncode,
"stdout": (proc.stdout or "").strip(),
"stderr": (proc.stderr or "").strip(),
}
except Exception as exc:
return {
"ok": False,
"code": None,
"stdout": "",
"stderr": str(exc),
}
def _try_payment_stats_from_db(app):
stats = {
"available": False,
"error": None,
"pending": 0,
"confirmed_today": 0,
"stale_pending": 0,
}
conn = None
try:
from db import get_db_connection
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
cur.execute("SHOW COLUMNS FROM payments")
cols = {row["Field"] for row in cur.fetchall()}
crypto_filters = []
if "payment_currency" in cols:
crypto_filters.append("payment_currency IN ('USDC','ETH','ETHO','EGAZ','ETI')")
if "notes" in cols:
crypto_filters.append("notes LIKE '%portal_crypto_intent:%'")
crypto_clause = ""
if crypto_filters:
crypto_clause = " AND (" + " OR ".join(crypto_filters) + ")"
if "payment_status" not in cols:
stats["error"] = "payments.payment_status column not found"
return stats
cur.execute(f"""
SELECT COUNT(*) AS count_value
FROM payments
WHERE payment_status = 'pending'
{crypto_clause}
""")
stats["pending"] = int((cur.fetchone() or {}).get("count_value") or 0)
timestamp_candidates = [
c for c in ("received_at", "updated_at", "created_at")
if c in cols
]
if timestamp_candidates:
timestamp_expr = "COALESCE(" + ", ".join(timestamp_candidates) + ")"
cur.execute(f"""
SELECT COUNT(*) AS count_value
FROM payments
WHERE payment_status = 'confirmed'
AND DATE({timestamp_expr}) = UTC_DATE()
{crypto_clause}
""")
stats["confirmed_today"] = int((cur.fetchone() or {}).get("count_value") or 0)
else:
stats["confirmed_today"] = 0
stale_col = None
for candidate in ("created_at", "first_seen_at", "last_checked_at"):
if candidate in cols:
stale_col = candidate
break
if stale_col:
cur.execute(f"""
SELECT COUNT(*) AS count_value
FROM payments
WHERE payment_status = 'pending'
AND {stale_col} < (UTC_TIMESTAMP() - INTERVAL 30 MINUTE)
{crypto_clause}
""")
stats["stale_pending"] = int((cur.fetchone() or {}).get("count_value") or 0)
else:
stats["stale_pending"] = 0
stats["available"] = True
stats["error"] = None
return stats
except Exception as exc:
stats["available"] = False
stats["error"] = str(exc)[:220]
return stats
finally:
try:
if conn:
conn.close()
except Exception:
pass
def _parse_systemctl_show(stdout):
parsed = {}
for line in str(stdout or "").splitlines():
if "=" not in line:
continue
key, value = line.split("=", 1)
parsed[key.strip()] = value.strip()
return parsed
def _crypto_reconcile_status(app):
service_name = "otb-billing-crypto-reconcile.service"
timer_name = "otb-billing-crypto-reconcile.timer"
timer_active = _systemctl_value(["is-active", timer_name])
timer_enabled = _systemctl_value(["is-enabled", timer_name])
service_active = _systemctl_value(["is-active", service_name])
service_enabled = _systemctl_value(["is-enabled", service_name])
timer_list = _systemctl_value([
"list-timers",
"--all",
"--no-pager",
"--plain",
timer_name,
])
service_show = _systemctl_value([
"show",
service_name,
"--property=ActiveEnterTimestamp",
"--property=InactiveEnterTimestamp",
"--property=ExecMainStatus",
"--property=Result",
"--property=NRestarts",
"--no-pager",
])
parsed = _parse_systemctl_show(service_show["stdout"])
active_entered = parsed.get("ActiveEnterTimestamp") or ""
inactive_entered = parsed.get("InactiveEnterTimestamp") or ""
result = parsed.get("Result") or "unknown"
exec_status = parsed.get("ExecMainStatus") or "unknown"
# For a oneshot timer service, InactiveEnterTimestamp is usually the useful
# "last completed" timestamp. Fall back to ActiveEnterTimestamp if needed.
last_run = inactive_entered
if not last_run or last_run.lower() in ("n/a", "never"):
last_run = active_entered
if not last_run:
last_run = "unknown"
return {
"service_name": service_name,
"timer_name": timer_name,
"timer_active": timer_active["stdout"] or "unknown",
"timer_enabled": timer_enabled["stdout"] or "unknown",
"service_active": service_active["stdout"] or "unknown",
"service_enabled": service_enabled["stdout"] or "unknown",
"timer_line": timer_list["stdout"],
"service_details": service_show["stdout"],
"last_run": last_run,
"last_result": result,
"last_exit_status": exec_status,
"payment_stats": _try_payment_stats_from_db(app),
}
def register_health_routes(app):
@app.route("/health", methods=["GET"])
def health_page():
health, http_code = _health_payload(app)
health["operations_balances"] = _wallet_balances(
os.environ.get("OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5"),
"Operations Bal",
)
health["treasury_balances"] = _wallet_balances(
os.environ.get("OTB_TREASURY_WALLET", "0xbe1fdc8c69f712d62cfcd3bf23f636de1dbd213f"),
"Treasury Bal",
)
# Backward compatible JSON key for anything already reading health.json.
health["wallet_balances"] = health["operations_balances"]
health["crypto_reconcile"] = _crypto_reconcile_status(app)
return render_template("health.html", health=health), http_code
@app.route("/health/reconcile-now", methods=["POST"])
def health_reconcile_now():
import subprocess
import sys
from pathlib import Path
base_dir = Path("/home/def/otb_billing")
worker = base_dir / "scripts" / "crypto_reconciliation_worker.py"
log_path = base_dir / "logs" / "crypto_reconciliation_worker.log"
if not worker.exists():
return redirect("/health?reconcile=missing-worker")
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "a", encoding="utf-8") as log_fh:
log_fh.write("\\n[manual health trigger] starting crypto reconciliation worker\\n")
subprocess.Popen(
[sys.executable, str(worker)],
cwd=str(base_dir),
stdout=log_fh,
stderr=subprocess.STDOUT,
start_new_session=True,
)
return redirect("/health?reconcile=started")
except Exception as exc:
return redirect("/health?reconcile=failed")
@app.route("/health.json", methods=["GET"])
def health_json():
health, http_code = _health_payload(app)
health["operations_balances"] = _wallet_balances(
os.environ.get("OTB_OPERATIONS_WALLET", "0x44f6c44C42e6ae0392E7289F032384C0d37F56D5"),
"Operations Bal",
)
health["treasury_balances"] = _wallet_balances(
os.environ.get("OTB_TREASURY_WALLET", "0xbe1fdc8c69f712d62cfcd3bf23f636de1dbd213f"),
"Treasury Bal",
)
# Backward compatible JSON key for anything already reading health.json.
health["wallet_balances"] = health["operations_balances"]
health["crypto_reconcile"] = _crypto_reconcile_status(app)
return jsonify(health), http_code