You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
481 lines
14 KiB
481 lines
14 KiB
// /home/def/monitor/backend/server.js |
|
// |
|
// Express backend for monitor.outsidethebox.top |
|
// - Reads /home/def/monitor/config/lines.json |
|
// - Rotating refresh (one line per interval) |
|
// - Supports sources: |
|
// * CoinGecko: line.source omitted or "coingecko" + coingecko_id (slug) |
|
// * KlingEx: line.source === "klingex" + klingex_pair ("ETHO-USDT", "EGAZ-USDT") |
|
// |
|
// Fiat sources (USD base): |
|
// * USD: fixed 1.0 |
|
// * CAD: Bank of Canada FXUSDCAD (1 USD = X CAD) -> invert to get CAD in USD |
|
// * EUR: Frankfurter API EUR->USD (timeseries + latest) |
|
// |
|
// Endpoints: |
|
// GET /health |
|
// GET /api/lines |
|
|
|
const fs = require("fs"); |
|
const path = require("path"); |
|
const express = require("express"); |
|
const { fetchPair } = require("./providers/klingex"); |
|
|
|
const app = express(); |
|
|
|
const PORT = process.env.PORT ? Number(process.env.PORT) : 4010; |
|
const ROTATE_SECONDS = process.env.ROTATE_SECONDS ? Number(process.env.ROTATE_SECONDS) : 60; |
|
|
|
const ROOT = "/home/def/monitor"; |
|
const CONFIG_PATH = path.join(ROOT, "config", "lines.json"); |
|
|
|
// Bank of Canada: USD/CAD (1 USD = X CAD). We invert to show CAD in USD base. |
|
const BOC_USDCAD = "https://www.bankofcanada.ca/valet/observations/FXUSDCAD/json"; |
|
|
|
// Frankfurter FX API (ECB-based). We'll use EUR->USD for Euro. |
|
const FRANKFURTER = "https://api.frankfurter.app"; |
|
|
|
function nowMs() { return Date.now(); } |
|
|
|
async function fetchJson(url, { timeoutMs = 15000 } = {}) { |
|
const ctrl = new AbortController(); |
|
const t = setTimeout(() => ctrl.abort(), timeoutMs); |
|
try { |
|
const res = await fetch(url, { |
|
headers: { |
|
"accept": "application/json", |
|
"user-agent": "monitor.outsidethebox.top/1.0" |
|
}, |
|
signal: ctrl.signal |
|
}); |
|
if (!res.ok) { |
|
const txt = await res.text().catch(() => ""); |
|
const err = new Error(`HTTP ${res.status} for ${url} :: ${txt.slice(0, 250)}`); |
|
err.status = res.status; |
|
throw err; |
|
} |
|
return await res.json(); |
|
} finally { |
|
clearTimeout(t); |
|
} |
|
} |
|
|
|
function normalizeTo7(points) { |
|
const cleaned = points.filter(v => typeof v === "number" && isFinite(v)); |
|
if (cleaned.length <= 7) return cleaned; |
|
const out = []; |
|
for (let i = 0; i < 7; i++) { |
|
const idx = Math.round((i * (cleaned.length - 1)) / 6); |
|
out.push(cleaned[idx]); |
|
} |
|
return out; |
|
} |
|
|
|
function formatDisplay(v) { |
|
if (v == null || !isFinite(v)) return "—"; |
|
if (v >= 1000) return v.toFixed(0); |
|
if (v >= 1) return v.toFixed(4); |
|
return v.toFixed(6); |
|
} |
|
|
|
function isoDate(d) { return d.toISOString().slice(0, 10); } |
|
|
|
function dateRange(days) { |
|
const end = new Date(); |
|
const start = new Date(); |
|
start.setDate(end.getDate() - (days + 3)); // small buffer |
|
return { start: isoDate(start), end: isoDate(end) }; |
|
} |
|
|
|
function cgMarketChartUrl(coinId, vs, days = 7) { |
|
return `https://api.coingecko.com/api/v3/coins/${encodeURIComponent(coinId)}/market_chart?vs_currency=${encodeURIComponent(vs)}&days=${days}&interval=daily`; |
|
} |
|
|
|
/* --------------------- state --------------------- */ |
|
let cfg = null; |
|
let cfgMtime = 0; |
|
|
|
let queue = []; // rotating items (fiat lines + crypto lines) |
|
let state = { |
|
fiat: [], |
|
crypto: [], |
|
errors: [], |
|
progress: { total: 0, done: 0, current: null, cycle: 0 } |
|
}; |
|
|
|
let backoffUntilMs = 0; |
|
let consecutive429 = 0; |
|
|
|
/* --------------------- config load --------------------- */ |
|
function loadConfig() { |
|
const st = fs.statSync(CONFIG_PATH); |
|
if (st.mtimeMs <= cfgMtime && cfg) return cfg; |
|
|
|
const raw = fs.readFileSync(CONFIG_PATH, "utf8"); |
|
const parsed = JSON.parse(raw); |
|
cfg = parsed; |
|
cfgMtime = st.mtimeMs; |
|
|
|
// Flatten into one rotating queue in the order we want to update |
|
const fiatLines = (cfg?.fiat?.lines || []).map(x => ({ ...x, group: "fiat" })); |
|
const cryptoLines = (cfg?.crypto?.lines || []).map(x => ({ ...x, group: "crypto" })); |
|
|
|
queue = [...fiatLines, ...cryptoLines]; |
|
|
|
// init visible state arrays (keep ordering) |
|
state.fiat = fiatLines.map(x => ({ |
|
...x, |
|
value: null, |
|
display: "—", |
|
vs: cfg?.fiat?.base || "USD", |
|
spark: [], |
|
spark_points: [], |
|
source: "init" |
|
})); |
|
|
|
state.crypto = cryptoLines.map(x => ({ |
|
...x, |
|
value: null, |
|
display: "—", |
|
vs: (cfg?.crypto?.vs_currency || "usd").toUpperCase(), |
|
spark: [], |
|
spark_points: [], |
|
source: "init" |
|
})); |
|
|
|
state.errors = []; |
|
state.progress.total = queue.length; |
|
state.progress.done = 0; |
|
state.progress.current = null; |
|
|
|
return cfg; |
|
} |
|
|
|
function findLineInState(group, key) { |
|
const arr = group === "fiat" ? state.fiat : state.crypto; |
|
return arr.find(x => x.key === key); |
|
} |
|
|
|
function setError(msg) { |
|
state.errors.push(msg); |
|
// keep last 20 errors max |
|
if (state.errors.length > 20) state.errors = state.errors.slice(-20); |
|
} |
|
|
|
function setProgress(currentKey = null) { |
|
state.progress.current = currentKey; |
|
} |
|
|
|
function nextCycle() { |
|
state.progress.cycle += 1; |
|
state.progress.done = 0; |
|
state.progress.current = null; |
|
} |
|
|
|
/* --------------------- fiat fetchers --------------------- */ |
|
|
|
// CAD in USD base using BoC FXUSDCAD (1 USD = X CAD) -> invert for CADUSD |
|
async function fetchCadUsdFromBoC() { |
|
const j = await fetchJson(BOC_USDCAD, { timeoutMs: 15000 }); |
|
|
|
const obs = Array.isArray(j?.observations) ? j.observations : []; |
|
// obs entries: { d: "YYYY-MM-DD", FXUSDCAD: { v: "1.34" } } |
|
const points = []; |
|
const sparkPoints = []; |
|
|
|
for (const o of obs) { |
|
const d = o?.d; |
|
const v = o?.FXUSDCAD?.v; |
|
const num = v != null ? Number(v) : NaN; |
|
if (!d || !isFinite(num) || num <= 0) continue; |
|
|
|
// invert: 1 CAD = 1 / (USD->CAD) |
|
const cadusd = 1 / num; |
|
const ts = Date.parse(d + "T00:00:00Z"); |
|
points.push(cadusd); |
|
sparkPoints.push({ t: ts, v: cadusd }); |
|
} |
|
|
|
const p7 = normalizeTo7(points); |
|
const sp7 = normalizeTo7(sparkPoints.map(x => x.v)).map((v, i) => { |
|
const t = sparkPoints[Math.min(i, sparkPoints.length - 1)]?.t ?? nowMs(); |
|
return { t, v }; |
|
}); |
|
|
|
const latest = p7.length ? p7[p7.length - 1] : (points.length ? points[points.length - 1] : null); |
|
|
|
return { latest, spark: p7, spark_points: sp7, source: "boc" }; |
|
} |
|
|
|
// EUR in USD base using Frankfurter timeseries EUR->USD |
|
async function fetchEurUsdFromFrankfurter() { |
|
const { start, end } = dateRange(7); |
|
const url = `${FRANKFURTER}/${start}..${end}?from=EUR&to=USD`; |
|
|
|
const j = await fetchJson(url, { timeoutMs: 15000 }); |
|
// response: { start_date, end_date, base, rates: { "YYYY-MM-DD": { "USD": 1.09 } } } |
|
const rates = j?.rates && typeof j.rates === "object" ? j.rates : {}; |
|
|
|
// Sort dates ascending |
|
const dates = Object.keys(rates).sort(); |
|
const points = []; |
|
const sparkPoints = []; |
|
|
|
for (const d of dates) { |
|
const usd = rates?.[d]?.USD; |
|
const num = usd != null ? Number(usd) : NaN; |
|
if (!isFinite(num) || num <= 0) continue; |
|
const ts = Date.parse(d + "T00:00:00Z"); |
|
points.push(num); |
|
sparkPoints.push({ t: ts, v: num }); |
|
} |
|
|
|
const p7 = normalizeTo7(points); |
|
const sp7 = normalizeTo7(sparkPoints.map(x => x.v)).map((v, i) => { |
|
const t = sparkPoints[Math.min(i, sparkPoints.length - 1)]?.t ?? nowMs(); |
|
return { t, v }; |
|
}); |
|
|
|
const latest = p7.length ? p7[p7.length - 1] : (points.length ? points[points.length - 1] : null); |
|
|
|
return { latest, spark: p7, spark_points: sp7, source: "frankfurter" }; |
|
} |
|
|
|
async function updateFiatLine(line) { |
|
const base = (cfg?.fiat?.base || "USD").toUpperCase(); |
|
|
|
if (line.key === "USD") { |
|
// fixed USD base |
|
return { |
|
value: 1, |
|
spark: [1, 1, 1, 1, 1, 1, 1], |
|
spark_points: Array.from({ length: 7 }, (_, i) => ({ t: nowMs() - (6 - i) * 86400000, v: 1 })), |
|
source: "fixed", |
|
vs: base |
|
}; |
|
} |
|
|
|
if (line.key === "CAD") { |
|
const r = await fetchCadUsdFromBoC(); |
|
return { |
|
value: r.latest, |
|
spark: r.spark, |
|
spark_points: r.spark_points, |
|
source: r.source, |
|
vs: base |
|
}; |
|
} |
|
|
|
if (line.key === "EUR") { |
|
const r = await fetchEurUsdFromFrankfurter(); |
|
return { |
|
value: r.latest, |
|
spark: r.spark, |
|
spark_points: r.spark_points, |
|
source: r.source, |
|
vs: base |
|
}; |
|
} |
|
|
|
// Unknown fiat (not implemented) |
|
throw new Error(`Unsupported fiat key: ${line.key}`); |
|
} |
|
|
|
/* --------------------- crypto fetchers --------------------- */ |
|
|
|
let cgCache = new Map(); // coinId -> { atMs, data } |
|
const CG_CACHE_MS = 5 * 60 * 1000; // 5 min cache to reduce calls |
|
|
|
async function fetchCoinGeckoMarketChart(coinId, vs, days = 7) { |
|
const now = nowMs(); |
|
const cached = cgCache.get(coinId); |
|
if (cached && (now - cached.atMs) < CG_CACHE_MS) { |
|
return { ...cached.data, source: "coingecko:cache" }; |
|
} |
|
|
|
const url = cgMarketChartUrl(coinId, vs, days); |
|
const j = await fetchJson(url, { timeoutMs: 15000 }); |
|
|
|
// j.prices = [[ts, price], ...] |
|
const prices = Array.isArray(j?.prices) ? j.prices : []; |
|
const pts = []; |
|
const sp = []; |
|
|
|
for (const p of prices) { |
|
const t = Array.isArray(p) ? Number(p[0]) : NaN; |
|
const v = Array.isArray(p) ? Number(p[1]) : NaN; |
|
if (!isFinite(t) || !isFinite(v)) continue; |
|
pts.push(v); |
|
sp.push({ t, v }); |
|
} |
|
|
|
const p7 = normalizeTo7(pts); |
|
const sp7 = normalizeTo7(sp.map(x => x.v)).map((v, i) => { |
|
const t = sp[Math.min(i, sp.length - 1)]?.t ?? nowMs(); |
|
return { t, v }; |
|
}); |
|
|
|
const out = { |
|
value: p7.length ? p7[p7.length - 1] : (pts.length ? pts[pts.length - 1] : null), |
|
spark: p7, |
|
spark_points: sp7, |
|
source: "coingecko" |
|
}; |
|
|
|
cgCache.set(coinId, { atMs: now, data: out }); |
|
return out; |
|
} |
|
|
|
function parseKlingexPair(pair) { |
|
// accept "ETHO-USDT" or "ETHO_USDT" |
|
if (!pair) return null; |
|
const p = String(pair).replace("_", "-").toUpperCase(); |
|
const m = p.match(/^([A-Z0-9]+)-([A-Z0-9]+)$/); |
|
if (!m) return null; |
|
return { base: m[1], quote: m[2], pair: `${m[1]}-${m[2]}` }; |
|
} |
|
|
|
async function fetchKlingexLastPrice(pairStr) { |
|
const parsed = parseKlingexPair(pairStr); |
|
if (!parsed) throw new Error(`Bad klingex_pair: ${pairStr}`); |
|
|
|
// Your smoketest showed these work: |
|
// client.markets.tickers() returns array of { ticker_id: 'ETHO_USDT', last_price: '...' } |
|
// We'll use the provider fetchPair() which already encapsulates the SDK usage. |
|
const r = await fetchPair(parsed.pair); // expects "ETHO-USDT" |
|
// r should be: { price, source, ... } |
|
return r; |
|
} |
|
|
|
async function updateCryptoLine(line) { |
|
const vs = (cfg?.crypto?.vs_currency || "usd").toLowerCase(); |
|
|
|
// KlingEx override for specific coins |
|
if ((line.source || "").toLowerCase() === "klingex") { |
|
const kp = line.klingex_pair; |
|
const r = await fetchKlingexLastPrice(kp); |
|
|
|
const price = Number(r.price); |
|
if (!isFinite(price)) throw new Error(`KlingEx returned non-numeric price for ${line.key}`); |
|
|
|
// KlingEx currently gives spot only (no OHLCV from your tests), |
|
// so we keep a flat 7-point spark for now using the current price. |
|
const spark = Array(7).fill(price); |
|
const spark_points = Array.from({ length: 7 }, (_, i) => ({ t: nowMs() - (6 - i) * 86400000, v: price })); |
|
|
|
return { value: price, spark, spark_points, source: r.source || "klingex", vs: "USD" }; |
|
} |
|
|
|
// Default: CoinGecko |
|
const coinId = line.coingecko_id; |
|
if (!coinId) throw new Error(`missing coingecko_id`); |
|
const r = await fetchCoinGeckoMarketChart(coinId, vs, 7); |
|
return { ...r, vs: vs.toUpperCase() }; |
|
} |
|
|
|
/* --------------------- rotating updater --------------------- */ |
|
async function doOneUpdate() { |
|
loadConfig(); |
|
|
|
if (!queue.length) return; |
|
|
|
const now = nowMs(); |
|
if (now < backoffUntilMs) return; |
|
|
|
// rotate pointer using cycle+done |
|
const idx = state.progress.done % queue.length; |
|
const item = queue[idx]; |
|
|
|
setProgress(`${item.group}:${item.key}`); |
|
|
|
const started = nowMs(); |
|
try { |
|
if (item.group === "fiat") { |
|
const stLine = findLineInState("fiat", item.key); |
|
const r = await updateFiatLine(item); |
|
|
|
if (stLine) { |
|
stLine.value = r.value; |
|
stLine.display = formatDisplay(r.value); |
|
stLine.vs = r.vs || (cfg?.fiat?.base || "USD"); |
|
stLine.spark = r.spark || []; |
|
stLine.spark_points = r.spark_points || []; |
|
stLine.source = r.source || "fiat"; |
|
} |
|
} else { |
|
const stLine = findLineInState("crypto", item.key); |
|
const r = await updateCryptoLine(item); |
|
|
|
if (stLine) { |
|
stLine.value = r.value; |
|
stLine.display = formatDisplay(r.value); |
|
stLine.vs = r.vs || "USD"; |
|
stLine.spark = r.spark || []; |
|
stLine.spark_points = r.spark_points || []; |
|
stLine.source = r.source || "crypto"; |
|
} |
|
} |
|
|
|
const took = nowMs() - started; |
|
state.progress.done += 1; |
|
setProgress(null); |
|
|
|
// reset 429 tracking on success |
|
consecutive429 = 0; |
|
|
|
console.log(`[monitor] updated ${item.group}:${item.key} ok took=${took}ms (${state.progress.done}/${queue.length} cycle=${state.progress.cycle})`); |
|
if (state.progress.done >= queue.length) nextCycle(); |
|
} catch (e) { |
|
const took = nowMs() - started; |
|
const msg = String(e?.message || e); |
|
|
|
// If we hit 429 (CoinGecko), backoff for 60 seconds |
|
if (String(e?.status) === "429" || msg.includes("HTTP 429")) { |
|
consecutive429 += 1; |
|
backoffUntilMs = nowMs() + 60_000; |
|
console.log(`[monitor] 429 hit. backoff 60s (consecutive=${consecutive429})`); |
|
} |
|
|
|
setError(`${item.key}: ${msg}`); |
|
state.progress.done += 1; |
|
setProgress(null); |
|
|
|
console.log(`[monitor] updated ${item.group}:${item.key} WARN took=${took}ms err=${msg}`); |
|
|
|
if (state.progress.done >= queue.length) nextCycle(); |
|
} |
|
} |
|
|
|
setInterval(doOneUpdate, ROTATE_SECONDS * 1000); |
|
|
|
/* --------------------- HTTP endpoints --------------------- */ |
|
app.get("/health", (_req, res) => { |
|
res.json({ |
|
ok: true, |
|
ts: new Date().toISOString(), |
|
rotate_seconds: ROTATE_SECONDS, |
|
backoff_until_ms: backoffUntilMs || 0 |
|
}); |
|
}); |
|
|
|
app.get("/api/lines", (_req, res) => { |
|
try { |
|
loadConfig(); |
|
res.json({ |
|
ok: true, |
|
ts: new Date().toISOString(), |
|
took_ms: 0, |
|
fiat: state.fiat, |
|
crypto: state.crypto, |
|
errors: state.errors, |
|
progress: state.progress |
|
}); |
|
} catch (e) { |
|
res.status(500).json({ ok: false, error: String(e?.message || e) }); |
|
} |
|
}); |
|
|
|
app.listen(PORT, "127.0.0.1", () => { |
|
loadConfig(); |
|
console.log(`[monitor] backend listening on 127.0.0.1:${PORT} rotate=${ROTATE_SECONDS}s`); |
|
});
|
|
|