ipfs storage for images and other nontext items. for use with etica - runs on etica network and currencys https://collect.etica-stats.org
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

155 lines
5.4 KiB

import 'dotenv/config';
import axios from 'axios';
import pg from 'pg';
import { ethers } from 'ethers';
// ---------- Config ----------
const BACKEND_BASE = process.env.BACKEND_BASE || 'http://127.0.0.1:4310';
const DATABASE_URL = process.env.DATABASE_URL;
const ETICA_HTTP_LIST = (process.env.ETICA_HTTP || '').split(',').map(s => s.trim()).filter(Boolean);
const ETICA_WS_LIST = (process.env.ETICA_WS || '').split(',').map(s => s.trim()).filter(Boolean);
const REQUIRED_CONFIRMATIONS = Number(process.env.REQUIRED_CONFIRMATIONS || 5);
const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL_MS || 15000);
const MAX_ROWS = Number(process.env.MAX_ROWS || 200);
if (!DATABASE_URL) {
console.error('[watcher] DATABASE_URL is required');
process.exit(1);
}
if (ETICA_HTTP_LIST.length === 0 && ETICA_WS_LIST.length === 0) {
console.error('[watcher] Provide at least one ETICA_HTTP or ETICA_WS endpoint (comma-separated)');
process.exit(1);
}
// ---------- DB ----------
const pool = new pg.Pool({ connectionString: DATABASE_URL });
// ---------- Providers (array with rotation & basic penalties) ----------
const httpProviders = ETICA_HTTP_LIST.map(url => ({ url, p: new ethers.JsonRpcProvider(url), penalty: 0 }));
const wsProviders = ETICA_WS_LIST.map(url => ({ url, p: new ethers.WebSocketProvider(url), penalty: 0 }));
let providers = wsProviders.length ? wsProviders : httpProviders;
function pickProvider() {
// choose the provider with the lowest penalty (simple LB)
let best = providers[0];
for (const pr of providers) if (pr.penalty < best.penalty) best = pr;
return best;
}
function bumpPenalty(pr, amount = 1) { pr.penalty += amount; setTimeout(() => (pr.penalty = Math.max(0, pr.penalty - amount)), 60_000); }
async function withAnyProvider(fnName, fn) {
// try providers in order of current penalties (best first)
const sorted = [...providers].sort((a,b) => a.penalty - b.penalty);
let lastErr;
for (const pr of sorted) {
try {
const out = await fn(pr.p);
if (lastErr) console.warn(`[watcher] ${fnName} recovered via ${pr.url}`);
return { out, via: pr };
} catch (e) {
lastErr = e;
bumpPenalty(pr, 1);
console.warn(`[watcher] ${fnName} failed on ${pr.url}: ${e.message}`);
}
}
throw lastErr;
}
async function getLatestBlock() {
const { out, via } = await withAnyProvider('getBlockNumber', p => p.getBlockNumber());
return { latest: Number(out), via };
}
async function getReceipt(txHash) {
const { out, via } = await withAnyProvider('getTransactionReceipt', p => p.getTransactionReceipt(txHash));
return { rcpt: out, via };
}
// ---------- Core loop ----------
async function fetchPending() {
const sql = `
SELECT id, tx_hash, min_confirmations
FROM payments
WHERE status = 'pending' AND tx_hash IS NOT NULL
ORDER BY id DESC
LIMIT $1
`;
const { rows } = await pool.query(sql, [MAX_ROWS]);
return rows;
}
async function confirmPayment(txHash, confs) {
try {
const r = await axios.post(`${BACKEND_BASE}/api/payments/confirm`, {
txHash, confirmations: confs
}, { timeout: 10000 });
return r.data;
} catch (e) {
const msg = e.response?.data?.error || e.message;
console.warn(`[watcher] confirm POST failed for ${txHash}: ${msg}`);
return null;
}
}
async function runOnce() {
const { latest, via: viaBlock } = await getLatestBlock();
const pend = await fetchPending();
if (pend.length === 0) {
console.log(`[watcher] no pending payments; latest block ${latest} via ${viaBlock.url}`);
return;
}
console.log(`[watcher] checking ${pend.length} pending; latest block ${latest} via ${viaBlock.url}`);
for (const p of pend) {
const tx = (p.tx_hash || '').toLowerCase();
if (!/^0x[0-9a-f]{64}$/.test(tx)) continue;
try {
const { rcpt, via } = await getReceipt(tx);
if (!rcpt || rcpt.blockNumber == null) {
// not mined yet
continue;
}
const confs = Math.max(0, Number(latest) - Number(rcpt.blockNumber) + 1);
const need = Math.max(REQUIRED_CONFIRMATIONS, Number(p.min_confirmations || 0));
// If reverted, you could mark failed here. We only raise confirmations:
await confirmPayment(tx, confs);
if (confs >= need) {
console.log(`[watcher] tx ${tx.slice(0,10)}… confirmed (${confs}/${need}) via ${via.url}`);
} else {
console.log(`[watcher] tx ${tx.slice(0,10)}… has ${confs}/${need} confirmations via ${via.url}`);
}
} catch (e) {
console.warn(`[watcher] error checking ${tx.slice(0,10)}…: ${e.message}`);
}
}
}
async function main() {
console.log('[watcher] starting…', JSON.stringify({
backend: BACKEND_BASE,
http: ETICA_HTTP_LIST,
ws: ETICA_WS_LIST,
pollMs: POLL_INTERVAL_MS,
minConf: REQUIRED_CONFIRMATIONS
}));
// Periodic poll (always on; covers WS hiccups too)
setInterval(async () => {
try { await runOnce(); } catch (e) { console.error('[watcher] runOnce error:', e); }
}, POLL_INTERVAL_MS);
// If we have WS providers, also respond to new blocks for fast updates
if (wsProviders.length) {
for (const pr of wsProviders) {
pr.p.on('block', async () => {
try { await runOnce(); } catch (e) { console.error('[watcher] ws runOnce error:', e); }
});
pr.p._websocket?.on('close', () => { bumpPenalty(pr, 2); });
pr.p._websocket?.on('error', () => { bumpPenalty(pr, 2); });
}
}
}
main().catch(e => { console.error('[watcher] fatal:', e); process.exit(1); });