import hashlib import hmac import re import shutil import time from pathlib import Path from flask import current_app from app.db import get_db SLUG_RE = re.compile(r"[^a-z0-9]+") def make_signature(email: str, ts: str, portal_user_id: str, secret: str) -> str: payload = f"{portal_user_id}|{email}|{ts}".encode("utf-8") return hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest() def is_valid_signature(email: str, ts: str, portal_user_id: str, sig: str) -> bool: secret = current_app.config["OTB_PORTAL_SHARED_SECRET"] expected = make_signature(email=email, ts=ts, portal_user_id=portal_user_id, secret=secret) return hmac.compare_digest(expected, sig) def is_valid_timestamp(ts: str) -> bool: try: ts_int = int(ts) except ValueError: return False skew = current_app.config["OTB_PORTAL_ALLOWED_SKEW_SECONDS"] return abs(int(time.time()) - ts_int) <= skew def slugify_email(email: str) -> str: local = email.split("@", 1)[0].lower().strip() slug = SLUG_RE.sub("-", local).strip("-") return slug or "tenant" def slugify_device_name(name: str) -> str: return SLUG_RE.sub("-", name.lower().strip()).strip("-") or "device" def ensure_user_tenant_and_devices(email: str, portal_user_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, portal_user_id, email, display_name FROM users WHERE email = %s """, (email,), ) user = cur.fetchone() if user is None: cur.execute( """ INSERT INTO users (portal_user_id, email, display_name, is_active) VALUES (%s, %s, %s, 1) """, (portal_user_id, email, email), ) user_id = cur.lastrowid else: user_id = user["id"] cur.execute( """ UPDATE users SET portal_user_id = %s, last_login_at = NOW() WHERE id = %s """, (portal_user_id, user_id), ) cur.execute( """ SELECT id, slug, storage_root FROM tenants WHERE owner_user_id = %s """, (user_id,), ) tenant = cur.fetchone() if tenant is None: base_slug = slugify_email(email) slug = base_slug suffix = 1 while True: cur.execute("SELECT id FROM tenants WHERE slug = %s", (slug,)) existing = cur.fetchone() if existing is None: break suffix += 1 slug = f"{base_slug}-{suffix}" storage_root = f"{current_app.config['STORAGE_ROOT']}/tenants/{slug}" cur.execute( """ INSERT INTO tenants (owner_user_id, slug, storage_root, service_status, retention_mode) VALUES (%s, %s, %s, 'active', 'standard') """, (user_id, slug, storage_root), ) tenant_id = cur.lastrowid create_tenant_root(slug) else: tenant_id = tenant["id"] slug = tenant["slug"] storage_root = tenant["storage_root"] db.commit() return { "user_id": user_id, "tenant_id": tenant_id, "tenant_slug": slug, "storage_root": storage_root, "email": email, } def create_tenant_root(tenant_slug: str): tenant_root = Path(current_app.config["STORAGE_ROOT"]) / "tenants" / tenant_slug (tenant_root / "logs").mkdir(parents=True, exist_ok=True) (tenant_root / "support").mkdir(parents=True, exist_ok=True) (tenant_root / "devices").mkdir(parents=True, exist_ok=True) (tenant_root / "zip_staging").mkdir(parents=True, exist_ok=True) (tenant_root / "exports").mkdir(parents=True, exist_ok=True) def create_device_directories(tenant_slug: str, device_path: str): tenant_root = Path(current_app.config["STORAGE_ROOT"]) / "tenants" / tenant_slug base = tenant_root / device_path for subdir in ["originals", "derived", "exports", "deleted", "tmp"]: (base / subdir).mkdir(parents=True, exist_ok=True) def remove_device_directories(tenant_slug: str, device_path: str): tenant_root = Path(current_app.config["STORAGE_ROOT"]) / "tenants" / tenant_slug base = tenant_root / device_path if base.exists() and base.is_dir(): shutil.rmtree(base) def compute_sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest()