import 'dotenv/config'; import axios from 'axios'; import pg from 'pg'; import { ethers } from 'ethers'; // ---------- Config ---------- const BACKEND_BASE = process.env.BACKEND_BASE || 'http://127.0.0.1:4310'; const DATABASE_URL = process.env.DATABASE_URL; const ETICA_HTTP_LIST = (process.env.ETICA_HTTP || '').split(',').map(s => s.trim()).filter(Boolean); const ETICA_WS_LIST = (process.env.ETICA_WS || '').split(',').map(s => s.trim()).filter(Boolean); const REQUIRED_CONFIRMATIONS = Number(process.env.REQUIRED_CONFIRMATIONS || 5); const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL_MS || 15000); const MAX_ROWS = Number(process.env.MAX_ROWS || 200); if (!DATABASE_URL) { console.error('[watcher] DATABASE_URL is required'); process.exit(1); } if (ETICA_HTTP_LIST.length === 0 && ETICA_WS_LIST.length === 0) { console.error('[watcher] Provide at least one ETICA_HTTP or ETICA_WS endpoint (comma-separated)'); process.exit(1); } // ---------- DB ---------- const pool = new pg.Pool({ connectionString: DATABASE_URL }); // ---------- Providers (array with rotation & basic penalties) ---------- const httpProviders = ETICA_HTTP_LIST.map(url => ({ url, p: new ethers.JsonRpcProvider(url), penalty: 0 })); const wsProviders = ETICA_WS_LIST.map(url => ({ url, p: new ethers.WebSocketProvider(url), penalty: 0 })); let providers = wsProviders.length ? wsProviders : httpProviders; function pickProvider() { // choose the provider with the lowest penalty (simple LB) let best = providers[0]; for (const pr of providers) if (pr.penalty < best.penalty) best = pr; return best; } function bumpPenalty(pr, amount = 1) { pr.penalty += amount; setTimeout(() => (pr.penalty = Math.max(0, pr.penalty - amount)), 60_000); } async function withAnyProvider(fnName, fn) { // try providers in order of current penalties (best first) const sorted = [...providers].sort((a,b) => a.penalty - b.penalty); let lastErr; for (const pr of sorted) { try { const out = await fn(pr.p); if (lastErr) console.warn(`[watcher] ${fnName} recovered via ${pr.url}`); return { out, via: pr }; } catch (e) { lastErr = e; bumpPenalty(pr, 1); console.warn(`[watcher] ${fnName} failed on ${pr.url}: ${e.message}`); } } throw lastErr; } async function getLatestBlock() { const { out, via } = await withAnyProvider('getBlockNumber', p => p.getBlockNumber()); return { latest: Number(out), via }; } async function getReceipt(txHash) { const { out, via } = await withAnyProvider('getTransactionReceipt', p => p.getTransactionReceipt(txHash)); return { rcpt: out, via }; } // ---------- Core loop ---------- async function fetchPending() { const sql = ` SELECT id, tx_hash, min_confirmations FROM payments WHERE status = 'pending' AND tx_hash IS NOT NULL ORDER BY id DESC LIMIT $1 `; const { rows } = await pool.query(sql, [MAX_ROWS]); return rows; } async function confirmPayment(txHash, confs) { try { const r = await axios.post(`${BACKEND_BASE}/api/payments/confirm`, { txHash, confirmations: confs }, { timeout: 10000 }); return r.data; } catch (e) { const msg = e.response?.data?.error || e.message; console.warn(`[watcher] confirm POST failed for ${txHash}: ${msg}`); return null; } } async function runOnce() { const { latest, via: viaBlock } = await getLatestBlock(); const pend = await fetchPending(); if (pend.length === 0) { console.log(`[watcher] no pending payments; latest block ${latest} via ${viaBlock.url}`); return; } console.log(`[watcher] checking ${pend.length} pending; latest block ${latest} via ${viaBlock.url}`); for (const p of pend) { const tx = (p.tx_hash || '').toLowerCase(); if (!/^0x[0-9a-f]{64}$/.test(tx)) continue; try { const { rcpt, via } = await getReceipt(tx); if (!rcpt || rcpt.blockNumber == null) { // not mined yet continue; } const confs = Math.max(0, Number(latest) - Number(rcpt.blockNumber) + 1); const need = Math.max(REQUIRED_CONFIRMATIONS, Number(p.min_confirmations || 0)); // If reverted, you could mark failed here. We only raise confirmations: await confirmPayment(tx, confs); if (confs >= need) { console.log(`[watcher] tx ${tx.slice(0,10)}… confirmed (${confs}/${need}) via ${via.url}`); } else { console.log(`[watcher] tx ${tx.slice(0,10)}… has ${confs}/${need} confirmations via ${via.url}`); } } catch (e) { console.warn(`[watcher] error checking ${tx.slice(0,10)}…: ${e.message}`); } } } async function main() { console.log('[watcher] starting…', JSON.stringify({ backend: BACKEND_BASE, http: ETICA_HTTP_LIST, ws: ETICA_WS_LIST, pollMs: POLL_INTERVAL_MS, minConf: REQUIRED_CONFIRMATIONS })); // Periodic poll (always on; covers WS hiccups too) setInterval(async () => { try { await runOnce(); } catch (e) { console.error('[watcher] runOnce error:', e); } }, POLL_INTERVAL_MS); // If we have WS providers, also respond to new blocks for fast updates if (wsProviders.length) { for (const pr of wsProviders) { pr.p.on('block', async () => { try { await runOnce(); } catch (e) { console.error('[watcher] ws runOnce error:', e); } }); pr.p._websocket?.on('close', () => { bumpPenalty(pr, 2); }); pr.p._websocket?.on('error', () => { bumpPenalty(pr, 2); }); } } } main().catch(e => { console.error('[watcher] fatal:', e); process.exit(1); });