// /home/def/monitor/backend/server.js // // Express backend for monitor.outsidethebox.top // - Reads /home/def/monitor/config/lines.json // - Rotating refresh (one line per interval) // - Supports sources: // * CoinGecko: line.source omitted or "coingecko" + coingecko_id (slug) // * KlingEx: line.source === "klingex" + klingex_pair ("ETHO-USDT", "EGAZ-USDT") // // Fiat sources (USD base): // * USD: fixed 1.0 // * CAD: Bank of Canada FXUSDCAD (1 USD = X CAD) -> invert to get CAD in USD // * EUR: Frankfurter API EUR->USD (timeseries + latest) // // Endpoints: // GET /health // GET /api/lines const fs = require("fs"); const path = require("path"); const express = require("express"); const { fetchPair } = require("./providers/klingex"); const oracleRoutes = require("../oracle/routes"); const app = express(); app.use("/api/oracle", oracleRoutes); const PORT = process.env.PORT ? Number(process.env.PORT) : 4010; const ROTATE_SECONDS = process.env.ROTATE_SECONDS ? Number(process.env.ROTATE_SECONDS) : 60; const ROOT = "/home/def/monitor"; const CONFIG_PATH = path.join(ROOT, "config", "lines.json"); // Bank of Canada: USD/CAD (1 USD = X CAD). We invert to show CAD in USD base. const BOC_USDCAD = "https://www.bankofcanada.ca/valet/observations/FXUSDCAD/json"; // Frankfurter FX API (ECB-based). We'll use EUR->USD for Euro. const FRANKFURTER = "https://api.frankfurter.app"; function nowMs() { return Date.now(); } async function fetchJson(url, { timeoutMs = 15000 } = {}) { const ctrl = new AbortController(); const t = setTimeout(() => ctrl.abort(), timeoutMs); try { const res = await fetch(url, { headers: { "accept": "application/json", "user-agent": "monitor.outsidethebox.top/1.0" }, signal: ctrl.signal }); if (!res.ok) { const txt = await res.text().catch(() => ""); const err = new Error(`HTTP ${res.status} for ${url} :: ${txt.slice(0, 250)}`); err.status = res.status; throw err; } return await res.json(); } finally { clearTimeout(t); } } function normalizeTo7(points) { const cleaned = points.filter(v => typeof v === "number" && isFinite(v)); if (cleaned.length <= 7) return cleaned; const out = []; for (let i = 0; i < 7; i++) { const idx = Math.round((i * (cleaned.length - 1)) / 6); out.push(cleaned[idx]); } return out; } function formatDisplay(v) { if (v == null || !isFinite(v)) return "—"; if (v >= 1000) return v.toFixed(0); if (v >= 1) return v.toFixed(4); return v.toFixed(6); } function isoDate(d) { return d.toISOString().slice(0, 10); } function dateRange(days) { const end = new Date(); const start = new Date(); start.setDate(end.getDate() - (days + 3)); // small buffer return { start: isoDate(start), end: isoDate(end) }; } function cgMarketChartUrl(coinId, vs, days = 7) { return `https://api.coingecko.com/api/v3/coins/${encodeURIComponent(coinId)}/market_chart?vs_currency=${encodeURIComponent(vs)}&days=${days}&interval=daily`; } /* --------------------- state --------------------- */ let cfg = null; let cfgMtime = 0; let queue = []; // rotating items (fiat lines + crypto lines) let state = { fiat: [], crypto: [], errors: [], progress: { total: 0, done: 0, current: null, cycle: 0 } }; let backoffUntilMs = 0; let consecutive429 = 0; /* --------------------- config load --------------------- */ function loadConfig() { const st = fs.statSync(CONFIG_PATH); if (st.mtimeMs <= cfgMtime && cfg) return cfg; const raw = fs.readFileSync(CONFIG_PATH, "utf8"); const parsed = JSON.parse(raw); cfg = parsed; cfgMtime = st.mtimeMs; // Flatten into one rotating queue in the order we want to update const fiatLines = (cfg?.fiat?.lines || []).map(x => ({ ...x, group: "fiat" })); const cryptoLines = (cfg?.crypto?.lines || []).map(x => ({ ...x, group: "crypto" })); queue = [...fiatLines, ...cryptoLines]; // init visible state arrays (keep ordering) state.fiat = fiatLines.map(x => ({ ...x, value: null, display: "—", vs: cfg?.fiat?.base || "USD", spark: [], spark_points: [], source: "init" })); state.crypto = cryptoLines.map(x => ({ ...x, value: null, display: "—", vs: (cfg?.crypto?.vs_currency || "usd").toUpperCase(), spark: [], spark_points: [], source: "init" })); state.errors = []; state.progress.total = queue.length; state.progress.done = 0; state.progress.current = null; return cfg; } function findLineInState(group, key) { const arr = group === "fiat" ? state.fiat : state.crypto; return arr.find(x => x.key === key); } function setError(msg) { state.errors.push(msg); // keep last 20 errors max if (state.errors.length > 20) state.errors = state.errors.slice(-20); } function setProgress(currentKey = null) { state.progress.current = currentKey; } function nextCycle() { state.progress.cycle += 1; state.progress.done = 0; state.progress.current = null; } /* --------------------- fiat fetchers --------------------- */ // CAD in USD base using BoC FXUSDCAD (1 USD = X CAD) -> invert for CADUSD async function fetchCadUsdFromBoC() { const j = await fetchJson(BOC_USDCAD, { timeoutMs: 15000 }); const obs = Array.isArray(j?.observations) ? j.observations : []; // obs entries: { d: "YYYY-MM-DD", FXUSDCAD: { v: "1.34" } } const points = []; const sparkPoints = []; for (const o of obs) { const d = o?.d; const v = o?.FXUSDCAD?.v; const num = v != null ? Number(v) : NaN; if (!d || !isFinite(num) || num <= 0) continue; // invert: 1 CAD = 1 / (USD->CAD) const cadusd = 1 / num; const ts = Date.parse(d + "T00:00:00Z"); points.push(cadusd); sparkPoints.push({ t: ts, v: cadusd }); } const p7 = normalizeTo7(points); const sp7 = normalizeTo7(sparkPoints.map(x => x.v)).map((v, i) => { const t = sparkPoints[Math.min(i, sparkPoints.length - 1)]?.t ?? nowMs(); return { t, v }; }); const latest = p7.length ? p7[p7.length - 1] : (points.length ? points[points.length - 1] : null); return { latest, spark: p7, spark_points: sp7, source: "boc" }; } // EUR in USD base using Frankfurter timeseries EUR->USD async function fetchEurUsdFromFrankfurter() { const { start, end } = dateRange(7); const url = `${FRANKFURTER}/${start}..${end}?from=EUR&to=USD`; const j = await fetchJson(url, { timeoutMs: 15000 }); // response: { start_date, end_date, base, rates: { "YYYY-MM-DD": { "USD": 1.09 } } } const rates = j?.rates && typeof j.rates === "object" ? j.rates : {}; // Sort dates ascending const dates = Object.keys(rates).sort(); const points = []; const sparkPoints = []; for (const d of dates) { const usd = rates?.[d]?.USD; const num = usd != null ? Number(usd) : NaN; if (!isFinite(num) || num <= 0) continue; const ts = Date.parse(d + "T00:00:00Z"); points.push(num); sparkPoints.push({ t: ts, v: num }); } const p7 = normalizeTo7(points); const sp7 = normalizeTo7(sparkPoints.map(x => x.v)).map((v, i) => { const t = sparkPoints[Math.min(i, sparkPoints.length - 1)]?.t ?? nowMs(); return { t, v }; }); const latest = p7.length ? p7[p7.length - 1] : (points.length ? points[points.length - 1] : null); return { latest, spark: p7, spark_points: sp7, source: "frankfurter" }; } async function updateFiatLine(line) { const base = (cfg?.fiat?.base || "USD").toUpperCase(); if (line.key === "USD") { // fixed USD base return { value: 1, spark: [1, 1, 1, 1, 1, 1, 1], spark_points: Array.from({ length: 7 }, (_, i) => ({ t: nowMs() - (6 - i) * 86400000, v: 1 })), source: "fixed", vs: base }; } if (line.key === "CAD") { const r = await fetchCadUsdFromBoC(); return { value: r.latest, spark: r.spark, spark_points: r.spark_points, source: r.source, vs: base }; } if (line.key === "EUR") { const r = await fetchEurUsdFromFrankfurter(); return { value: r.latest, spark: r.spark, spark_points: r.spark_points, source: r.source, vs: base }; } // Unknown fiat (not implemented) throw new Error(`Unsupported fiat key: ${line.key}`); } /* --------------------- crypto fetchers --------------------- */ let cgCache = new Map(); // coinId -> { atMs, data } const CG_CACHE_MS = 5 * 60 * 1000; // 5 min cache to reduce calls async function fetchCoinGeckoMarketChart(coinId, vs, days = 7) { const now = nowMs(); const cached = cgCache.get(coinId); if (cached && (now - cached.atMs) < CG_CACHE_MS) { return { ...cached.data, source: "coingecko:cache" }; } const url = cgMarketChartUrl(coinId, vs, days); const j = await fetchJson(url, { timeoutMs: 15000 }); // j.prices = [[ts, price], ...] const prices = Array.isArray(j?.prices) ? j.prices : []; const pts = []; const sp = []; for (const p of prices) { const t = Array.isArray(p) ? Number(p[0]) : NaN; const v = Array.isArray(p) ? Number(p[1]) : NaN; if (!isFinite(t) || !isFinite(v)) continue; pts.push(v); sp.push({ t, v }); } const p7 = normalizeTo7(pts); const sp7 = normalizeTo7(sp.map(x => x.v)).map((v, i) => { const t = sp[Math.min(i, sp.length - 1)]?.t ?? nowMs(); return { t, v }; }); const out = { value: p7.length ? p7[p7.length - 1] : (pts.length ? pts[pts.length - 1] : null), spark: p7, spark_points: sp7, source: "coingecko" }; cgCache.set(coinId, { atMs: now, data: out }); return out; } function parseKlingexPair(pair) { // accept "ETHO-USDT" or "ETHO_USDT" if (!pair) return null; const p = String(pair).replace("_", "-").toUpperCase(); const m = p.match(/^([A-Z0-9]+)-([A-Z0-9]+)$/); if (!m) return null; return { base: m[1], quote: m[2], pair: `${m[1]}-${m[2]}` }; } async function fetchKlingexLastPrice(pairStr) { const parsed = parseKlingexPair(pairStr); if (!parsed) throw new Error(`Bad klingex_pair: ${pairStr}`); // Your smoketest showed these work: // client.markets.tickers() returns array of { ticker_id: 'ETHO_USDT', last_price: '...' } // We'll use the provider fetchPair() which already encapsulates the SDK usage. const r = await fetchPair(parsed.pair); // expects "ETHO-USDT" // r should be: { price, source, ... } return r; } async function updateCryptoLine(line) { const vs = (cfg?.crypto?.vs_currency || "usd").toLowerCase(); // KlingEx override for specific coins if ((line.source || "").toLowerCase() === "klingex") { const kp = line.klingex_pair; const r = await fetchKlingexLastPrice(kp); const price = Number(r.price); if (!isFinite(price)) throw new Error(`KlingEx returned non-numeric price for ${line.key}`); // KlingEx currently gives spot only (no OHLCV from your tests), // so we keep a flat 7-point spark for now using the current price. const spark = Array(7).fill(price); const spark_points = Array.from({ length: 7 }, (_, i) => ({ t: nowMs() - (6 - i) * 86400000, v: price })); return { value: price, spark, spark_points, source: r.source || "klingex", vs: "USD" }; } // Default: CoinGecko const coinId = line.coingecko_id; if (!coinId) throw new Error(`missing coingecko_id`); const r = await fetchCoinGeckoMarketChart(coinId, vs, 7); return { ...r, vs: vs.toUpperCase() }; } /* --------------------- rotating updater --------------------- */ async function doOneUpdate() { loadConfig(); if (!queue.length) return; const now = nowMs(); if (now < backoffUntilMs) return; // rotate pointer using cycle+done const idx = state.progress.done % queue.length; const item = queue[idx]; setProgress(`${item.group}:${item.key}`); const started = nowMs(); try { if (item.group === "fiat") { const stLine = findLineInState("fiat", item.key); const r = await updateFiatLine(item); if (stLine) { stLine.value = r.value; stLine.display = formatDisplay(r.value); stLine.vs = r.vs || (cfg?.fiat?.base || "USD"); stLine.spark = r.spark || []; stLine.spark_points = r.spark_points || []; stLine.source = r.source || "fiat"; } } else { const stLine = findLineInState("crypto", item.key); const r = await updateCryptoLine(item); if (stLine) { stLine.value = r.value; stLine.display = formatDisplay(r.value); stLine.vs = r.vs || "USD"; stLine.spark = r.spark || []; stLine.spark_points = r.spark_points || []; stLine.source = r.source || "crypto"; } } const took = nowMs() - started; state.progress.done += 1; setProgress(null); // reset 429 tracking on success consecutive429 = 0; console.log(`[monitor] updated ${item.group}:${item.key} ok took=${took}ms (${state.progress.done}/${queue.length} cycle=${state.progress.cycle})`); if (state.progress.done >= queue.length) nextCycle(); } catch (e) { const took = nowMs() - started; const msg = String(e?.message || e); // If we hit 429 (CoinGecko), backoff for 60 seconds if (String(e?.status) === "429" || msg.includes("HTTP 429")) { consecutive429 += 1; backoffUntilMs = nowMs() + 60_000; console.log(`[monitor] 429 hit. backoff 60s (consecutive=${consecutive429})`); } setError(`${item.key}: ${msg}`); state.progress.done += 1; setProgress(null); console.log(`[monitor] updated ${item.group}:${item.key} WARN took=${took}ms err=${msg}`); if (state.progress.done >= queue.length) nextCycle(); } } setInterval(doOneUpdate, ROTATE_SECONDS * 1000); /* --------------------- HTTP endpoints --------------------- */ app.get("/health", (_req, res) => { res.json({ ok: true, ts: new Date().toISOString(), rotate_seconds: ROTATE_SECONDS, backoff_until_ms: backoffUntilMs || 0 }); }); app.get("/api/lines", (_req, res) => { try { loadConfig(); res.json({ ok: true, ts: new Date().toISOString(), took_ms: 0, fiat: state.fiat, crypto: state.crypto, errors: state.errors, progress: state.progress }); } catch (e) { res.status(500).json({ ok: false, error: String(e?.message || e) }); } }); app.listen(PORT, "127.0.0.1", () => { loadConfig(); console.log(`[monitor] backend listening on 127.0.0.1:${PORT} rotate=${ROTATE_SECONDS}s`); });