import os, json, datetime, csv, io, subprocess, time from flask import Flask, request, redirect, url_for, render_template, flash, Response import pymysql APP_ROOT = os.path.dirname(os.path.abspath(__file__)) CFG = json.load(open(os.path.join(APP_ROOT, "config.json"))) # App start (for app-uptime metric) APP_START_UTC = datetime.datetime.utcnow() from functools import wraps from flask import Response def _basic_auth_required(): # If auth_user/auth_pass are not set, do not block access (but warn in logs). u = CFG.get("site", {}).get("auth_user") p = CFG.get("site", {}).get("auth_pass") if not u or not p or p == "change-me": return None auth = request.authorization if not auth or auth.username != u or auth.password != p: return Response("Authentication required", 401, {"WWW-Authenticate": 'Basic realm="Host Registry"'}) return None def require_auth(fn): @wraps(fn) def wrapper(*args, **kwargs): r = _basic_auth_required() if r is not None: return r return fn(*args, **kwargs) return wrapper def db_conn(): return pymysql.connect( host=CFG["db"]["host"], port=int(CFG["db"]["port"]), user=CFG["db"]["user"], password=CFG["db"]["password"], database=CFG["db"]["name"], charset="utf8mb4", autocommit=True, cursorclass=pymysql.cursors.DictCursor ) def _fmt_duration(seconds: float) -> str: try: seconds = int(seconds) except Exception: return "unknown" days, rem = divmod(seconds, 86400) hours, rem = divmod(rem, 3600) mins, _ = divmod(rem, 60) if days > 0: return f"{days}d {hours}h {mins}m" if hours > 0: return f"{hours}h {mins}m" return f"{mins}m" def system_uptime_seconds() -> float: # Linux: /proc/uptime => " " try: with open("/proc/uptime","r") as f: return float(f.read().split()[0]) except Exception: return -1 def system_uptime_str() -> str: sec = system_uptime_seconds() if sec < 0: return "unknown" return _fmt_duration(sec) def app_uptime_str() -> str: try: sec = (datetime.datetime.utcnow() - APP_START_UTC).total_seconds() return _fmt_duration(sec) except Exception: return "unknown" def load_avg_str() -> str: try: a,b,c = os.getloadavg() return f"{a:.2f} / {b:.2f} / {c:.2f}" except Exception: return "unknown" def mem_str() -> str: # MemTotal/MemAvailable from /proc/meminfo (in kB) try: total = avail = None with open("/proc/meminfo","r") as f: for line in f: if line.startswith("MemTotal:"): total = int(line.split()[1]) elif line.startswith("MemAvailable:"): avail = int(line.split()[1]) if total is not None and avail is not None: break if total is None or avail is None: return "unknown" used = max(total - avail, 0) # convert kB -> MB return f"{used/1024:.1f} / {total/1024:.1f} MB" except Exception: return "unknown" def disk_str(path: str = "/") -> str: try: st = os.statvfs(path) total = st.f_frsize * st.f_blocks free = st.f_frsize * st.f_bavail used = max(total - free, 0) gb = 1024**3 return f"{used/gb:.2f} / {total/gb:.2f} GB" except Exception: return "unknown" def db_health() -> tuple[bool, int, str]: """Return (ok, host_count, error_message).""" try: with db_conn() as con: with con.cursor() as cur: cur.execute("SELECT COUNT(*) AS n FROM hosts") n = int((cur.fetchone() or {}).get("n",0)) return True, n, "" except Exception as e: return False, 0, str(e) def _format_uptime(seconds: float) -> str: try: seconds = int(seconds) except Exception: return "unknown" days, rem = divmod(seconds, 86400) hours, rem = divmod(rem, 3600) mins, _ = divmod(rem, 60) if days > 0: return f"{days}d {hours}h {mins}m" if hours > 0: return f"{hours}h {mins}m" return f"{mins}m" def _get_uptime_str() -> str: # Prefer system uptime (best for VM/host); fall back to process uptime. try: with open("/proc/uptime","r") as f: s = float(f.read().split()[0]) return _format_uptime(s) except Exception: return "unknown" def _db_status_str() -> str: try: with db_conn() as con: with con.cursor() as cur: cur.execute("SELECT 1 AS ok") cur.fetchone() return "OK" except Exception: return "DOWN" def _backup_dir() -> str: return (CFG.get("backup", {}) or {}).get("dir") or "/opt/outsidethedb/backups" def _backup_prefix() -> str: return (CFG.get("backup", {}) or {}).get("prefix") or "outsidethedb" def run_db_backup() -> str: """Create a gzipped mysqldump and return the output path.""" bdir = _backup_dir() os.makedirs(bdir, exist_ok=True) ts = datetime.datetime.utcnow().strftime("%Y-%m-%d_%H%M%S") out_path = os.path.join(bdir, f"db_{CFG['db']['name']}_{_backup_prefix()}_{ts}.sql.gz") env = os.environ.copy() # Avoid password in process args; MySQL honors MYSQL_PWD. env["MYSQL_PWD"] = str(CFG["db"]["password"]) cmd = [ "mysqldump", "--host", str(CFG["db"]["host"]), "--port", str(CFG["db"]["port"]), "--user", str(CFG["db"]["user"]), "--single-transaction", "--routines", "--events", "--triggers", str(CFG["db"]["name"]), ] # Pipe into gzip ourselves (portable across mysqldump versions) p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) p2 = subprocess.Popen(["gzip", "-c"], stdin=p1.stdout, stdout=open(out_path, "wb")) p1.stdout.close() _, err = p1.communicate() rc = p1.returncode p2.wait() if rc != 0: try: os.remove(out_path) except Exception: pass raise RuntimeError(err.decode("utf-8", errors="replace").strip() or f"mysqldump failed rc={rc}") return out_path def parse_fqdn(fqdn: str): fqdn = (fqdn or "").strip().lower().strip(".") if not fqdn or "." not in fqdn: return "", None, "" parts = fqdn.split(".") zone = ".".join(parts[-2:]) sub = ".".join(parts[:-2]) or None return zone, sub, fqdn def dt_from_html(val: str): if not val: return None val = val.strip() # accept yyyy-mm-dd or yyyy-mm-ddTHH:MM try: if "T" in val: return datetime.datetime.fromisoformat(val) return datetime.datetime.fromisoformat(val + "T00:00:00") except Exception: return None def dot_class(row): # blank if not configured if not row.get("monitor_enabled"): return "dot-blank" now = datetime.datetime.utcnow() # red if down if row.get("status") == "down": return "dot-red" # orange if host expires within 30 days he = row.get("host_expires_at") if he and isinstance(he, datetime.datetime) and he <= now + datetime.timedelta(days=30): return "dot-orange" # yellow if SSL expires within 30 days se = row.get("ssl_expires_at") if se and isinstance(se, datetime.datetime) and se <= now + datetime.timedelta(days=30): return "dot-yellow" # green if up if row.get("status") == "up": return "dot-green" return "dot-yellow" # enabled but unknown -> treat as warning app = Flask(__name__) app.secret_key = CFG["site"]["secret_key"] @app.context_processor def inject_meta(): db_ok, host_count, _ = db_health() return { "version": CFG.get("version","v3.2.1"), "year": datetime.datetime.now().year, "sys_uptime": system_uptime_str(), "app_uptime": app_uptime_str(), "db_ok": db_ok, "host_count": host_count, } @app.route("/") @require_auth def index(): return redirect(url_for("hosts")) @app.route("/hosts") @require_auth def hosts(): q = request.args.get("q","").strip() rows=[] sql = """ SELECT * FROM hosts """ args=[] if q: like=f"%{q}%" sql += " WHERE (fqdn LIKE %s OR zone LIKE %s OR sub LIKE %s OR client_name LIKE %s OR email LIKE %s OR public_ip LIKE %s OR private_ip LIKE %s OR pve_host LIKE %s OR notes LIKE %s)" args=[like,like,like,like,like,like,like,like,like] sql += " ORDER BY zone ASC, (sub IS NULL OR sub='') DESC, sub ASC, fqdn ASC LIMIT 2000" try: with db_conn() as con: with con.cursor() as cur: cur.execute(sql, args) rows = cur.fetchall() except Exception as e: flash(f"DB error: {e}", "err") rows=[] # compute dot class & pretty fields for r in rows: r["dot"] = dot_class(r) r["host_display"] = r["fqdn"] r["ssl_expires_str"] = r["ssl_expires_at"].strftime("%Y-%m-%d") if r.get("ssl_expires_at") else "" r["host_expires_str"] = r["host_expires_at"].strftime("%Y-%m-%d") if r.get("host_expires_at") else "" r["last_check_str"] = r["last_check_at"].strftime("%Y-%m-%d %H:%M") if r.get("last_check_at") else "" return render_template("hosts.html", rows=rows, q=q) @app.route("/hosts/new", methods=["GET","POST"]) @require_auth def host_new(): if request.method == "POST": fqdn = request.form.get("fqdn","").strip().lower().strip(".") zone = request.form.get("zone","").strip().lower().strip(".") sub = request.form.get("sub","").strip().lower().strip(".") or None if not zone: zone, sub2, fqdn2 = parse_fqdn(fqdn) if not fqdn: fqdn = fqdn2 if sub is None: sub = sub2 if fqdn and not zone: zone, sub2, _ = parse_fqdn(fqdn) if sub is None: sub = sub2 if not fqdn: # build from zone/sub if not zone: flash("Zone or FQDN is required", "err") return render_template("edit_host.html", host={}, is_new=True) fqdn = f"{sub+'.' if sub else ''}{zone}" # other fields data = { "zone": zone, "sub": sub, "fqdn": fqdn, "monitor_enabled": 1 if request.form.get("monitor_enabled")=="on" else 0, "public_ip": request.form.get("public_ip") or None, "private_ip": request.form.get("private_ip") or None, "pve_host": request.form.get("pve_host") or None, "client_name": request.form.get("client_name") or None, "email": request.form.get("email") or None, "country": request.form.get("country") or None, "package_type": request.form.get("package_type") or None, "dns_provider": request.form.get("dns_provider") or None, "notes": request.form.get("notes") or None, "host_expires_at": dt_from_html(request.form.get("host_expires_at","")), } try: with db_conn() as con: with con.cursor() as cur: cur.execute(""" INSERT INTO hosts (zone, sub, fqdn, monitor_enabled, public_ip, private_ip, pve_host, client_name, email, country, package_type, dns_provider, notes, host_expires_at) VALUES (%(zone)s, %(sub)s, %(fqdn)s, %(monitor_enabled)s, %(public_ip)s, %(private_ip)s, %(pve_host)s, %(client_name)s, %(email)s, %(country)s, %(package_type)s, %(dns_provider)s, %(notes)s, %(host_expires_at)s) """, data) flash("Host added", "ok") return redirect(url_for("hosts")) except Exception as e: flash(f"DB error: {e}", "err") return render_template("edit_host.html", host=data, is_new=True) return render_template("edit_host.html", host={}, is_new=True) @app.route("/hosts//edit", methods=["GET","POST"]) @require_auth def host_edit(hid): host={} try: with db_conn() as con: with con.cursor() as cur: cur.execute("SELECT * FROM hosts WHERE id=%s", (hid,)) host = cur.fetchone() or {} except Exception as e: flash(f"DB error: {e}", "err") host={} if request.method=="POST": host.update({ "zone": request.form.get("zone","").strip().lower().strip("."), "sub": (request.form.get("sub","").strip().lower().strip(".") or None), "fqdn": request.form.get("fqdn","").strip().lower().strip("."), "monitor_enabled": 1 if request.form.get("monitor_enabled")=="on" else 0, "public_ip": request.form.get("public_ip") or None, "private_ip": request.form.get("private_ip") or None, "pve_host": request.form.get("pve_host") or None, "client_name": request.form.get("client_name") or None, "email": request.form.get("email") or None, "country": request.form.get("country") or None, "package_type": request.form.get("package_type") or None, "dns_provider": request.form.get("dns_provider") or None, "notes": request.form.get("notes") or None, "host_expires_at": dt_from_html(request.form.get("host_expires_at","")), }) # rebuild fqdn if missing if not host.get("fqdn") and host.get("zone"): host["fqdn"] = f"{host.get('sub')+'.' if host.get('sub') else ''}{host.get('zone')}" if not host.get("zone") and host.get("fqdn"): z,s,_ = parse_fqdn(host["fqdn"]) host["zone"]=z host["sub"]=s # final validation if not host.get("fqdn") or not host.get("zone"): flash("FQDN and zone are required (zone can be derived from FQDN).", "err") return render_template("edit_host.html", host=host, is_new=False) try: with db_conn() as con: with con.cursor() as cur: cur.execute(""" UPDATE hosts SET zone=%s, sub=%s, fqdn=%s, monitor_enabled=%s, public_ip=%s, private_ip=%s, pve_host=%s, client_name=%s, email=%s, country=%s, package_type=%s, dns_provider=%s, notes=%s, host_expires_at=%s WHERE id=%s """, (host["zone"], host["sub"], host["fqdn"], host["monitor_enabled"], host["public_ip"], host["private_ip"], host["pve_host"], host["client_name"], host["email"], host["country"], host["package_type"], host["dns_provider"], host["notes"], host["host_expires_at"], hid)) flash("Saved", "ok") return redirect(url_for("hosts")) except Exception as e: flash(f"DB error: {e}", "err") # html datetime-local value if host.get("host_expires_at"): try: host["host_expires_at_html"] = host["host_expires_at"].strftime("%Y-%m-%dT%H:%M") except Exception: host["host_expires_at_html"] = "" else: host["host_expires_at_html"] = "" return render_template("edit_host.html", host=host, is_new=False) @app.route("/hosts//delete", methods=["POST"]) @require_auth def host_delete(hid): try: with db_conn() as con: with con.cursor() as cur: cur.execute("DELETE FROM hosts WHERE id=%s", (hid,)) flash("Deleted", "ok") except Exception as e: flash(f"DB error: {e}", "err") return redirect(url_for("hosts")) @app.route("/hosts/export.csv") @require_auth def hosts_export(): # export all rows try: with db_conn() as con: with con.cursor() as cur: cur.execute("SELECT zone, sub, fqdn, monitor_enabled, public_ip, private_ip, pve_host, client_name, email, country, package_type, dns_provider, host_expires_at, ssl_expires_at, notes FROM hosts ORDER BY zone ASC, (sub IS NULL OR sub='') DESC, sub ASC, fqdn ASC") rows = cur.fetchall() except Exception as e: return Response(f"DB error: {e}\n", status=500, mimetype="text/plain") output = io.StringIO() w = csv.writer(output) w.writerow(["zone","sub","fqdn","monitor_enabled","public_ip","private_ip","pve_host","client_name","email","country","package_type","dns_provider","host_expires_at","ssl_expires_at","notes"]) for r in rows: w.writerow([ r.get("zone",""), r.get("sub") or "", r.get("fqdn",""), int(r.get("monitor_enabled") or 0), r.get("public_ip") or "", r.get("private_ip") or "", r.get("pve_host") or "", r.get("client_name") or "", r.get("email") or "", r.get("country") or "", r.get("package_type") or "", r.get("dns_provider") or "", r.get("host_expires_at").strftime("%Y-%m-%d") if r.get("host_expires_at") else "", r.get("ssl_expires_at").strftime("%Y-%m-%d") if r.get("ssl_expires_at") else "", (r.get("notes") or "").replace("\r"," ").replace("\n"," ").strip(), ]) data = output.getvalue().encode("utf-8") return Response( data, mimetype="text/csv", headers={"Content-Disposition":"attachment; filename=hosts-export.csv"} ) @app.route("/hosts/import", methods=["GET","POST"]) @require_auth def hosts_import(): if request.method == "POST": f = request.files.get("csvfile") if not f: flash("No file uploaded", "err") return redirect(url_for("hosts_import")) content = f.read().decode("utf-8", errors="replace") reader = csv.DictReader(io.StringIO(content)) count=0 errors=0 with db_conn() as con: with con.cursor() as cur: for row in reader: try: fqdn = (row.get("fqdn") or "").strip().lower().strip(".") zone = (row.get("zone") or "").strip().lower().strip(".") sub = (row.get("sub") or "").strip().lower().strip(".") or None if not fqdn: if not zone: continue fqdn = f"{sub+'.' if sub else ''}{zone}" if not zone: zone, sub2, _ = parse_fqdn(fqdn) if not sub: sub = sub2 monitor_enabled = int(row.get("monitor_enabled") or 0) host_expires_at = dt_from_html((row.get("host_expires_at") or "").strip()) # upsert by fqdn cur.execute(""" INSERT INTO hosts (zone, sub, fqdn, monitor_enabled, public_ip, private_ip, pve_host, client_name, email, country, package_type, dns_provider, notes, host_expires_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE zone=VALUES(zone), sub=VALUES(sub), monitor_enabled=VALUES(monitor_enabled), public_ip=VALUES(public_ip), private_ip=VALUES(private_ip), pve_host=VALUES(pve_host), client_name=VALUES(client_name), email=VALUES(email), country=VALUES(country), package_type=VALUES(package_type), dns_provider=VALUES(dns_provider), notes=VALUES(notes), host_expires_at=VALUES(host_expires_at) """, ( zone, sub, fqdn, monitor_enabled, (row.get("public_ip") or "").strip() or None, (row.get("private_ip") or "").strip() or None, (row.get("pve_host") or "").strip() or None, (row.get("client_name") or "").strip() or None, (row.get("email") or "").strip() or None, (row.get("country") or "").strip() or None, (row.get("package_type") or "").strip() or None, (row.get("dns_provider") or "").strip() or None, (row.get("notes") or "").strip() or None, host_expires_at )) count += 1 except Exception: errors += 1 continue flash(f"Imported {count} rows ({errors} skipped)", "ok" if errors==0 else "warn") return redirect(url_for("hosts")) return render_template("import_hosts.html") @app.route("/hosts/bulk", methods=["GET","POST"]) @require_auth def hosts_bulk(): zone = "" subs = "" include_apex = True defaults = { "client_name": "", "email": "", "country": "", "package_type": "", "dns_provider": "", "public_ip": "", "private_ip": "", "pve_host": "", "monitor_enabled": False, } if request.method == "POST": zone = request.form.get("zone","").strip().lower().strip(".") subs = request.form.get("subs","") include_apex = (request.form.get("include_apex") == "on") defaults.update({ "client_name": (request.form.get("client_name") or "").strip(), "email": (request.form.get("email") or "").strip(), "country": (request.form.get("country") or "").strip(), "package_type": (request.form.get("package_type") or "").strip(), "dns_provider": (request.form.get("dns_provider") or "").strip(), "public_ip": (request.form.get("public_ip") or "").strip(), "private_ip": (request.form.get("private_ip") or "").strip(), "pve_host": (request.form.get("pve_host") or "").strip(), "monitor_enabled": True if request.form.get("monitor_enabled") == "on" else False, }) if not zone: flash("Base zone is required.", "err") return render_template("bulk_hosts.html", zone=zone, subs=subs, include_apex=include_apex, defaults=defaults) # parse subdomains raw = subs.replace(",", "\n") parts = [] for line in raw.splitlines(): s = line.strip().lower().strip(".") if not s: continue # allow full hostnames pasted if "." in s and s.endswith(zone): # take sub part only s = s[:-(len(zone)+1)] parts.append(s) # de-dupe while preserving order seen=set() sub_list=[] for s in parts: if s in seen: continue seen.add(s) sub_list.append(s) to_create=[] if include_apex: to_create.append((zone, None, zone)) for s in sub_list: fqdn = f"{s}.{zone}" if s else zone to_create.append((zone, s, fqdn)) created=0 updated=0 try: with db_conn() as con: with con.cursor() as cur: for z, sub, fqdn in to_create: cur.execute("SELECT id FROM hosts WHERE fqdn=%s", (fqdn,)) row = cur.fetchone() if row: cur.execute("""UPDATE hosts SET zone=%s, sub=%s, monitor_enabled=%s, public_ip=%s, private_ip=%s, pve_host=%s, client_name=%s, email=%s, country=%s, package_type=%s, dns_provider=%s WHERE fqdn=%s""", ( z, sub, 1 if defaults["monitor_enabled"] else 0, defaults["public_ip"] or None, defaults["private_ip"] or None, defaults["pve_host"] or None, defaults["client_name"] or None, defaults["email"] or None, defaults["country"] or None, defaults["package_type"] or None, defaults["dns_provider"] or None, fqdn )) updated += 1 else: cur.execute("""INSERT INTO hosts (zone, sub, fqdn, monitor_enabled, public_ip, private_ip, pve_host, client_name, email, country, package_type, dns_provider, status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'unknown')""", ( z, sub, fqdn, 1 if defaults["monitor_enabled"] else 0, defaults["public_ip"] or None, defaults["private_ip"] or None, defaults["pve_host"] or None, defaults["client_name"] or None, defaults["email"] or None, defaults["country"] or None, defaults["package_type"] or None, defaults["dns_provider"] or None )) created += 1 flash(f"Bulk add complete: {created} created, {updated} updated.", "ok") return redirect(url_for("hosts")) except Exception as e: flash(f"DB error: {e}", "err") return render_template("bulk_hosts.html", zone=zone, subs=subs, include_apex=include_apex, defaults=defaults) @app.route("/backup-db", methods=["POST"]) @require_auth def backup_db(): try: out_path = run_db_backup() flash(f"DB backup created: {out_path}", "ok") except Exception as e: flash(f"Backup failed: {e}", "err") return redirect(url_for("hosts")) @app.route("/health") def health(): ok=False host_count=0 try: with db_conn() as con: with con.cursor() as cur: cur.execute("SELECT COUNT(*) AS n FROM hosts") host_count = int((cur.fetchone() or {}).get("n",0)) ok=True except Exception as e: flash(f"DB error: {e}", "err") return render_template("health.html", ok=ok, host_count=host_count)