\ #!/usr/bin/env python3 import os, json, socket, ssl, datetime import pymysql import requests APP_ROOT = os.path.dirname(os.path.abspath(__file__)) CFG = json.load(open(os.path.join(APP_ROOT, "config.json"))) def db_conn(): return pymysql.connect( host=CFG["db"]["host"], port=int(CFG["db"]["port"]), user=CFG["db"]["user"], password=CFG["db"]["password"], database=CFG["db"]["name"], charset="utf8mb4", autocommit=True, cursorclass=pymysql.cursors.DictCursor ) def get_cert_not_after(hostname: str, port: int = 443, timeout: int = 8): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE with socket.create_connection((hostname, port), timeout=timeout) as sock: with ctx.wrap_socket(sock, server_hostname=hostname) as ssock: cert = ssock.getpeercert() # notAfter format like: 'Jun 5 12:00:00 2026 GMT' na = cert.get("notAfter") if not na: return None return datetime.datetime.strptime(na, "%b %d %H:%M:%S %Y %Z") def http_status(url: str, timeout: int = 8): # Don't download big content; HEAD sometimes blocked, so use GET with small read r = requests.get(url, timeout=timeout, allow_redirects=True, verify=False, stream=True, headers={"User-Agent":"outsidethebox-host-registry/1.0"}) try: r.close() except Exception: pass return int(r.status_code) def compute_status(code_http, code_https, err_http, err_https): # Green if 2xx on 80 OR 443 def is_green(code): return code is not None and 200 <= code <= 299 if is_green(code_http) or is_green(code_https): return "up" # Red if got a response but not 2xx, OR any connection-type error if code_http is not None or code_https is not None or err_http or err_https: return "down" return "unknown" def main(): now = datetime.datetime.utcnow() rows=[] with db_conn() as con: with con.cursor() as cur: cur.execute("SELECT id, fqdn, monitor_enabled FROM hosts WHERE monitor_enabled=1 ORDER BY zone ASC, fqdn ASC") rows = cur.fetchall() for r in rows: hid = r["id"] fqdn = (r["fqdn"] or "").strip().lower().strip(".") code_http = None code_https = None ssl_exp = None err_http = "" err_https = "" last_error = "" # HTTP 80 try: code_http = http_status(f"http://{fqdn}/") except Exception as e: err_http = f"http: {type(e).__name__}: {e}" # HTTPS 443 + cert try: code_https = http_status(f"https://{fqdn}/") except Exception as e: err_https = f"https: {type(e).__name__}: {e}" try: ssl_exp = get_cert_not_after(fqdn, 443) except Exception as e: # only store as error if https is enabled; still useful for debugging last_error = (last_error + " ; " if last_error else "") + f"cert: {type(e).__name__}: {e}" if err_http: last_error = (last_error + " ; " if last_error else "") + err_http if err_https: last_error = (last_error + " ; " if last_error else "") + err_https status = compute_status(code_http, code_https, err_http, err_https) with db_conn() as con: with con.cursor() as cur: cur.execute(""" UPDATE hosts SET status_code_http=%s, status_code_https=%s, status=%s, last_check_at=%s, last_error=%s, ssl_expires_at=%s WHERE id=%s """, (code_http, code_https, status, now, last_error[:1000] if last_error else None, ssl_exp, hid)) print(f"Checked {len(rows)} host(s) at {now.isoformat()}Z") if __name__ == "__main__": main()