ipfs storage for images and other nontext items. for use with etica - runs on etica network and currencys
https://collect.etica-stats.org
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
5.4 KiB
155 lines
5.4 KiB
import 'dotenv/config'; |
|
import axios from 'axios'; |
|
import pg from 'pg'; |
|
import { ethers } from 'ethers'; |
|
|
|
// ---------- Config ---------- |
|
const BACKEND_BASE = process.env.BACKEND_BASE || 'http://127.0.0.1:4310'; |
|
const DATABASE_URL = process.env.DATABASE_URL; |
|
const ETICA_HTTP_LIST = (process.env.ETICA_HTTP || '').split(',').map(s => s.trim()).filter(Boolean); |
|
const ETICA_WS_LIST = (process.env.ETICA_WS || '').split(',').map(s => s.trim()).filter(Boolean); |
|
const REQUIRED_CONFIRMATIONS = Number(process.env.REQUIRED_CONFIRMATIONS || 5); |
|
const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL_MS || 15000); |
|
const MAX_ROWS = Number(process.env.MAX_ROWS || 200); |
|
|
|
if (!DATABASE_URL) { |
|
console.error('[watcher] DATABASE_URL is required'); |
|
process.exit(1); |
|
} |
|
if (ETICA_HTTP_LIST.length === 0 && ETICA_WS_LIST.length === 0) { |
|
console.error('[watcher] Provide at least one ETICA_HTTP or ETICA_WS endpoint (comma-separated)'); |
|
process.exit(1); |
|
} |
|
|
|
// ---------- DB ---------- |
|
const pool = new pg.Pool({ connectionString: DATABASE_URL }); |
|
|
|
// ---------- Providers (array with rotation & basic penalties) ---------- |
|
const httpProviders = ETICA_HTTP_LIST.map(url => ({ url, p: new ethers.JsonRpcProvider(url), penalty: 0 })); |
|
const wsProviders = ETICA_WS_LIST.map(url => ({ url, p: new ethers.WebSocketProvider(url), penalty: 0 })); |
|
let providers = wsProviders.length ? wsProviders : httpProviders; |
|
|
|
function pickProvider() { |
|
// choose the provider with the lowest penalty (simple LB) |
|
let best = providers[0]; |
|
for (const pr of providers) if (pr.penalty < best.penalty) best = pr; |
|
return best; |
|
} |
|
function bumpPenalty(pr, amount = 1) { pr.penalty += amount; setTimeout(() => (pr.penalty = Math.max(0, pr.penalty - amount)), 60_000); } |
|
|
|
async function withAnyProvider(fnName, fn) { |
|
// try providers in order of current penalties (best first) |
|
const sorted = [...providers].sort((a,b) => a.penalty - b.penalty); |
|
let lastErr; |
|
for (const pr of sorted) { |
|
try { |
|
const out = await fn(pr.p); |
|
if (lastErr) console.warn(`[watcher] ${fnName} recovered via ${pr.url}`); |
|
return { out, via: pr }; |
|
} catch (e) { |
|
lastErr = e; |
|
bumpPenalty(pr, 1); |
|
console.warn(`[watcher] ${fnName} failed on ${pr.url}: ${e.message}`); |
|
} |
|
} |
|
throw lastErr; |
|
} |
|
|
|
async function getLatestBlock() { |
|
const { out, via } = await withAnyProvider('getBlockNumber', p => p.getBlockNumber()); |
|
return { latest: Number(out), via }; |
|
} |
|
async function getReceipt(txHash) { |
|
const { out, via } = await withAnyProvider('getTransactionReceipt', p => p.getTransactionReceipt(txHash)); |
|
return { rcpt: out, via }; |
|
} |
|
|
|
// ---------- Core loop ---------- |
|
async function fetchPending() { |
|
const sql = ` |
|
SELECT id, tx_hash, min_confirmations |
|
FROM payments |
|
WHERE status = 'pending' AND tx_hash IS NOT NULL |
|
ORDER BY id DESC |
|
LIMIT $1 |
|
`; |
|
const { rows } = await pool.query(sql, [MAX_ROWS]); |
|
return rows; |
|
} |
|
|
|
async function confirmPayment(txHash, confs) { |
|
try { |
|
const r = await axios.post(`${BACKEND_BASE}/api/payments/confirm`, { |
|
txHash, confirmations: confs |
|
}, { timeout: 10000 }); |
|
return r.data; |
|
} catch (e) { |
|
const msg = e.response?.data?.error || e.message; |
|
console.warn(`[watcher] confirm POST failed for ${txHash}: ${msg}`); |
|
return null; |
|
} |
|
} |
|
|
|
async function runOnce() { |
|
const { latest, via: viaBlock } = await getLatestBlock(); |
|
const pend = await fetchPending(); |
|
if (pend.length === 0) { |
|
console.log(`[watcher] no pending payments; latest block ${latest} via ${viaBlock.url}`); |
|
return; |
|
} |
|
console.log(`[watcher] checking ${pend.length} pending; latest block ${latest} via ${viaBlock.url}`); |
|
|
|
for (const p of pend) { |
|
const tx = (p.tx_hash || '').toLowerCase(); |
|
if (!/^0x[0-9a-f]{64}$/.test(tx)) continue; |
|
|
|
try { |
|
const { rcpt, via } = await getReceipt(tx); |
|
if (!rcpt || rcpt.blockNumber == null) { |
|
// not mined yet |
|
continue; |
|
} |
|
const confs = Math.max(0, Number(latest) - Number(rcpt.blockNumber) + 1); |
|
const need = Math.max(REQUIRED_CONFIRMATIONS, Number(p.min_confirmations || 0)); |
|
|
|
// If reverted, you could mark failed here. We only raise confirmations: |
|
await confirmPayment(tx, confs); |
|
|
|
if (confs >= need) { |
|
console.log(`[watcher] tx ${tx.slice(0,10)}… confirmed (${confs}/${need}) via ${via.url}`); |
|
} else { |
|
console.log(`[watcher] tx ${tx.slice(0,10)}… has ${confs}/${need} confirmations via ${via.url}`); |
|
} |
|
} catch (e) { |
|
console.warn(`[watcher] error checking ${tx.slice(0,10)}…: ${e.message}`); |
|
} |
|
} |
|
} |
|
|
|
async function main() { |
|
console.log('[watcher] starting…', JSON.stringify({ |
|
backend: BACKEND_BASE, |
|
http: ETICA_HTTP_LIST, |
|
ws: ETICA_WS_LIST, |
|
pollMs: POLL_INTERVAL_MS, |
|
minConf: REQUIRED_CONFIRMATIONS |
|
})); |
|
|
|
// Periodic poll (always on; covers WS hiccups too) |
|
setInterval(async () => { |
|
try { await runOnce(); } catch (e) { console.error('[watcher] runOnce error:', e); } |
|
}, POLL_INTERVAL_MS); |
|
|
|
// If we have WS providers, also respond to new blocks for fast updates |
|
if (wsProviders.length) { |
|
for (const pr of wsProviders) { |
|
pr.p.on('block', async () => { |
|
try { await runOnce(); } catch (e) { console.error('[watcher] ws runOnce error:', e); } |
|
}); |
|
pr.p._websocket?.on('close', () => { bumpPenalty(pr, 2); }); |
|
pr.p._websocket?.on('error', () => { bumpPenalty(pr, 2); }); |
|
} |
|
} |
|
} |
|
|
|
main().catch(e => { console.error('[watcher] fatal:', e); process.exit(1); });
|
|
|