You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

483 lines
14 KiB

// /home/def/monitor/backend/server.js
//
// Express backend for monitor.outsidethebox.top
// - Reads /home/def/monitor/config/lines.json
// - Rotating refresh (one line per interval)
// - Supports sources:
// * CoinGecko: line.source omitted or "coingecko" + coingecko_id (slug)
// * KlingEx: line.source === "klingex" + klingex_pair ("ETHO-USDT", "EGAZ-USDT")
//
// Fiat sources (USD base):
// * USD: fixed 1.0
// * CAD: Bank of Canada FXUSDCAD (1 USD = X CAD) -> invert to get CAD in USD
// * EUR: Frankfurter API EUR->USD (timeseries + latest)
//
// Endpoints:
// GET /health
// GET /api/lines
const fs = require("fs");
const path = require("path");
const express = require("express");
const { fetchPair } = require("./providers/klingex");
const oracleRoutes = require("../oracle/routes");
const app = express();
app.use("/api/oracle", oracleRoutes);
const PORT = process.env.PORT ? Number(process.env.PORT) : 4010;
const ROTATE_SECONDS = process.env.ROTATE_SECONDS ? Number(process.env.ROTATE_SECONDS) : 60;
const ROOT = "/home/def/monitor";
const CONFIG_PATH = path.join(ROOT, "config", "lines.json");
// Bank of Canada: USD/CAD (1 USD = X CAD). We invert to show CAD in USD base.
const BOC_USDCAD = "https://www.bankofcanada.ca/valet/observations/FXUSDCAD/json";
// Frankfurter FX API (ECB-based). We'll use EUR->USD for Euro.
const FRANKFURTER = "https://api.frankfurter.app";
function nowMs() { return Date.now(); }
async function fetchJson(url, { timeoutMs = 15000 } = {}) {
const ctrl = new AbortController();
const t = setTimeout(() => ctrl.abort(), timeoutMs);
try {
const res = await fetch(url, {
headers: {
"accept": "application/json",
"user-agent": "monitor.outsidethebox.top/1.0"
},
signal: ctrl.signal
});
if (!res.ok) {
const txt = await res.text().catch(() => "");
const err = new Error(`HTTP ${res.status} for ${url} :: ${txt.slice(0, 250)}`);
err.status = res.status;
throw err;
}
return await res.json();
} finally {
clearTimeout(t);
}
}
function normalizeTo7(points) {
const cleaned = points.filter(v => typeof v === "number" && isFinite(v));
if (cleaned.length <= 7) return cleaned;
const out = [];
for (let i = 0; i < 7; i++) {
const idx = Math.round((i * (cleaned.length - 1)) / 6);
out.push(cleaned[idx]);
}
return out;
}
function formatDisplay(v) {
if (v == null || !isFinite(v)) return "—";
if (v >= 1000) return v.toFixed(0);
if (v >= 1) return v.toFixed(4);
return v.toFixed(6);
}
function isoDate(d) { return d.toISOString().slice(0, 10); }
function dateRange(days) {
const end = new Date();
const start = new Date();
start.setDate(end.getDate() - (days + 3)); // small buffer
return { start: isoDate(start), end: isoDate(end) };
}
function cgMarketChartUrl(coinId, vs, days = 7) {
return `https://api.coingecko.com/api/v3/coins/${encodeURIComponent(coinId)}/market_chart?vs_currency=${encodeURIComponent(vs)}&days=${days}&interval=daily`;
}
/* --------------------- state --------------------- */
let cfg = null;
let cfgMtime = 0;
let queue = []; // rotating items (fiat lines + crypto lines)
let state = {
fiat: [],
crypto: [],
errors: [],
progress: { total: 0, done: 0, current: null, cycle: 0 }
};
let backoffUntilMs = 0;
let consecutive429 = 0;
/* --------------------- config load --------------------- */
function loadConfig() {
const st = fs.statSync(CONFIG_PATH);
if (st.mtimeMs <= cfgMtime && cfg) return cfg;
const raw = fs.readFileSync(CONFIG_PATH, "utf8");
const parsed = JSON.parse(raw);
cfg = parsed;
cfgMtime = st.mtimeMs;
// Flatten into one rotating queue in the order we want to update
const fiatLines = (cfg?.fiat?.lines || []).map(x => ({ ...x, group: "fiat" }));
const cryptoLines = (cfg?.crypto?.lines || []).map(x => ({ ...x, group: "crypto" }));
queue = [...fiatLines, ...cryptoLines];
// init visible state arrays (keep ordering)
state.fiat = fiatLines.map(x => ({
...x,
value: null,
display: "—",
vs: cfg?.fiat?.base || "USD",
spark: [],
spark_points: [],
source: "init"
}));
state.crypto = cryptoLines.map(x => ({
...x,
value: null,
display: "—",
vs: (cfg?.crypto?.vs_currency || "usd").toUpperCase(),
spark: [],
spark_points: [],
source: "init"
}));
state.errors = [];
state.progress.total = queue.length;
state.progress.done = 0;
state.progress.current = null;
return cfg;
}
function findLineInState(group, key) {
const arr = group === "fiat" ? state.fiat : state.crypto;
return arr.find(x => x.key === key);
}
function setError(msg) {
state.errors.push(msg);
// keep last 20 errors max
if (state.errors.length > 20) state.errors = state.errors.slice(-20);
}
function setProgress(currentKey = null) {
state.progress.current = currentKey;
}
function nextCycle() {
state.progress.cycle += 1;
state.progress.done = 0;
state.progress.current = null;
}
/* --------------------- fiat fetchers --------------------- */
// CAD in USD base using BoC FXUSDCAD (1 USD = X CAD) -> invert for CADUSD
async function fetchCadUsdFromBoC() {
const j = await fetchJson(BOC_USDCAD, { timeoutMs: 15000 });
const obs = Array.isArray(j?.observations) ? j.observations : [];
// obs entries: { d: "YYYY-MM-DD", FXUSDCAD: { v: "1.34" } }
const points = [];
const sparkPoints = [];
for (const o of obs) {
const d = o?.d;
const v = o?.FXUSDCAD?.v;
const num = v != null ? Number(v) : NaN;
if (!d || !isFinite(num) || num <= 0) continue;
// invert: 1 CAD = 1 / (USD->CAD)
const cadusd = 1 / num;
const ts = Date.parse(d + "T00:00:00Z");
points.push(cadusd);
sparkPoints.push({ t: ts, v: cadusd });
}
const p7 = normalizeTo7(points);
const sp7 = normalizeTo7(sparkPoints.map(x => x.v)).map((v, i) => {
const t = sparkPoints[Math.min(i, sparkPoints.length - 1)]?.t ?? nowMs();
return { t, v };
});
const latest = p7.length ? p7[p7.length - 1] : (points.length ? points[points.length - 1] : null);
return { latest, spark: p7, spark_points: sp7, source: "boc" };
}
// EUR in USD base using Frankfurter timeseries EUR->USD
async function fetchEurUsdFromFrankfurter() {
const { start, end } = dateRange(7);
const url = `${FRANKFURTER}/${start}..${end}?from=EUR&to=USD`;
const j = await fetchJson(url, { timeoutMs: 15000 });
// response: { start_date, end_date, base, rates: { "YYYY-MM-DD": { "USD": 1.09 } } }
const rates = j?.rates && typeof j.rates === "object" ? j.rates : {};
// Sort dates ascending
const dates = Object.keys(rates).sort();
const points = [];
const sparkPoints = [];
for (const d of dates) {
const usd = rates?.[d]?.USD;
const num = usd != null ? Number(usd) : NaN;
if (!isFinite(num) || num <= 0) continue;
const ts = Date.parse(d + "T00:00:00Z");
points.push(num);
sparkPoints.push({ t: ts, v: num });
}
const p7 = normalizeTo7(points);
const sp7 = normalizeTo7(sparkPoints.map(x => x.v)).map((v, i) => {
const t = sparkPoints[Math.min(i, sparkPoints.length - 1)]?.t ?? nowMs();
return { t, v };
});
const latest = p7.length ? p7[p7.length - 1] : (points.length ? points[points.length - 1] : null);
return { latest, spark: p7, spark_points: sp7, source: "frankfurter" };
}
async function updateFiatLine(line) {
const base = (cfg?.fiat?.base || "USD").toUpperCase();
if (line.key === "USD") {
// fixed USD base
return {
value: 1,
spark: [1, 1, 1, 1, 1, 1, 1],
spark_points: Array.from({ length: 7 }, (_, i) => ({ t: nowMs() - (6 - i) * 86400000, v: 1 })),
source: "fixed",
vs: base
};
}
if (line.key === "CAD") {
const r = await fetchCadUsdFromBoC();
return {
value: r.latest,
spark: r.spark,
spark_points: r.spark_points,
source: r.source,
vs: base
};
}
if (line.key === "EUR") {
const r = await fetchEurUsdFromFrankfurter();
return {
value: r.latest,
spark: r.spark,
spark_points: r.spark_points,
source: r.source,
vs: base
};
}
// Unknown fiat (not implemented)
throw new Error(`Unsupported fiat key: ${line.key}`);
}
/* --------------------- crypto fetchers --------------------- */
let cgCache = new Map(); // coinId -> { atMs, data }
const CG_CACHE_MS = 5 * 60 * 1000; // 5 min cache to reduce calls
async function fetchCoinGeckoMarketChart(coinId, vs, days = 7) {
const now = nowMs();
const cached = cgCache.get(coinId);
if (cached && (now - cached.atMs) < CG_CACHE_MS) {
return { ...cached.data, source: "coingecko:cache" };
}
const url = cgMarketChartUrl(coinId, vs, days);
const j = await fetchJson(url, { timeoutMs: 15000 });
// j.prices = [[ts, price], ...]
const prices = Array.isArray(j?.prices) ? j.prices : [];
const pts = [];
const sp = [];
for (const p of prices) {
const t = Array.isArray(p) ? Number(p[0]) : NaN;
const v = Array.isArray(p) ? Number(p[1]) : NaN;
if (!isFinite(t) || !isFinite(v)) continue;
pts.push(v);
sp.push({ t, v });
}
const p7 = normalizeTo7(pts);
const sp7 = normalizeTo7(sp.map(x => x.v)).map((v, i) => {
const t = sp[Math.min(i, sp.length - 1)]?.t ?? nowMs();
return { t, v };
});
const out = {
value: p7.length ? p7[p7.length - 1] : (pts.length ? pts[pts.length - 1] : null),
spark: p7,
spark_points: sp7,
source: "coingecko"
};
cgCache.set(coinId, { atMs: now, data: out });
return out;
}
function parseKlingexPair(pair) {
// accept "ETHO-USDT" or "ETHO_USDT"
if (!pair) return null;
const p = String(pair).replace("_", "-").toUpperCase();
const m = p.match(/^([A-Z0-9]+)-([A-Z0-9]+)$/);
if (!m) return null;
return { base: m[1], quote: m[2], pair: `${m[1]}-${m[2]}` };
}
async function fetchKlingexLastPrice(pairStr) {
const parsed = parseKlingexPair(pairStr);
if (!parsed) throw new Error(`Bad klingex_pair: ${pairStr}`);
// Your smoketest showed these work:
// client.markets.tickers() returns array of { ticker_id: 'ETHO_USDT', last_price: '...' }
// We'll use the provider fetchPair() which already encapsulates the SDK usage.
const r = await fetchPair(parsed.pair); // expects "ETHO-USDT"
// r should be: { price, source, ... }
return r;
}
async function updateCryptoLine(line) {
const vs = (cfg?.crypto?.vs_currency || "usd").toLowerCase();
// KlingEx override for specific coins
if ((line.source || "").toLowerCase() === "klingex") {
const kp = line.klingex_pair;
const r = await fetchKlingexLastPrice(kp);
const price = Number(r.price);
if (!isFinite(price)) throw new Error(`KlingEx returned non-numeric price for ${line.key}`);
// KlingEx currently gives spot only (no OHLCV from your tests),
// so we keep a flat 7-point spark for now using the current price.
const spark = Array(7).fill(price);
const spark_points = Array.from({ length: 7 }, (_, i) => ({ t: nowMs() - (6 - i) * 86400000, v: price }));
return { value: price, spark, spark_points, source: r.source || "klingex", vs: "USD" };
}
// Default: CoinGecko
const coinId = line.coingecko_id;
if (!coinId) throw new Error(`missing coingecko_id`);
const r = await fetchCoinGeckoMarketChart(coinId, vs, 7);
return { ...r, vs: vs.toUpperCase() };
}
/* --------------------- rotating updater --------------------- */
async function doOneUpdate() {
loadConfig();
if (!queue.length) return;
const now = nowMs();
if (now < backoffUntilMs) return;
// rotate pointer using cycle+done
const idx = state.progress.done % queue.length;
const item = queue[idx];
setProgress(`${item.group}:${item.key}`);
const started = nowMs();
try {
if (item.group === "fiat") {
const stLine = findLineInState("fiat", item.key);
const r = await updateFiatLine(item);
if (stLine) {
stLine.value = r.value;
stLine.display = formatDisplay(r.value);
stLine.vs = r.vs || (cfg?.fiat?.base || "USD");
stLine.spark = r.spark || [];
stLine.spark_points = r.spark_points || [];
stLine.source = r.source || "fiat";
}
} else {
const stLine = findLineInState("crypto", item.key);
const r = await updateCryptoLine(item);
if (stLine) {
stLine.value = r.value;
stLine.display = formatDisplay(r.value);
stLine.vs = r.vs || "USD";
stLine.spark = r.spark || [];
stLine.spark_points = r.spark_points || [];
stLine.source = r.source || "crypto";
}
}
const took = nowMs() - started;
state.progress.done += 1;
setProgress(null);
// reset 429 tracking on success
consecutive429 = 0;
console.log(`[monitor] updated ${item.group}:${item.key} ok took=${took}ms (${state.progress.done}/${queue.length} cycle=${state.progress.cycle})`);
if (state.progress.done >= queue.length) nextCycle();
} catch (e) {
const took = nowMs() - started;
const msg = String(e?.message || e);
// If we hit 429 (CoinGecko), backoff for 60 seconds
if (String(e?.status) === "429" || msg.includes("HTTP 429")) {
consecutive429 += 1;
backoffUntilMs = nowMs() + 60_000;
console.log(`[monitor] 429 hit. backoff 60s (consecutive=${consecutive429})`);
}
setError(`${item.key}: ${msg}`);
state.progress.done += 1;
setProgress(null);
console.log(`[monitor] updated ${item.group}:${item.key} WARN took=${took}ms err=${msg}`);
if (state.progress.done >= queue.length) nextCycle();
}
}
setInterval(doOneUpdate, ROTATE_SECONDS * 1000);
/* --------------------- HTTP endpoints --------------------- */
app.get("/health", (_req, res) => {
res.json({
ok: true,
ts: new Date().toISOString(),
rotate_seconds: ROTATE_SECONDS,
backoff_until_ms: backoffUntilMs || 0
});
});
app.get("/api/lines", (_req, res) => {
try {
loadConfig();
res.json({
ok: true,
ts: new Date().toISOString(),
took_ms: 0,
fiat: state.fiat,
crypto: state.crypto,
errors: state.errors,
progress: state.progress
});
} catch (e) {
res.status(500).json({ ok: false, error: String(e?.message || e) });
}
});
app.listen(PORT, "127.0.0.1", () => {
loadConfig();
console.log(`[monitor] backend listening on 127.0.0.1:${PORT} rotate=${ROTATE_SECONDS}s`);
});