from functools import wraps from pathlib import Path from datetime import datetime, timezone import shutil import zipfile import tarfile import tempfile import os from xml.sax.saxutils import escape as xml_escape from reportlab.lib.pagesizes import letter from reportlab.lib import colors from reportlab.platypus import SimpleDocTemplate, Image as ReportLabImage, Paragraph, Spacer, PageBreak from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from PIL import Image as PILImage, ImageOps from PIL import Image import re import hashlib from werkzeug.utils import secure_filename from flask import after_this_request, send_file, Blueprint, flash, redirect, render_template, request, session, url_for, current_app, send_file, jsonify from app.db import get_db from app.auth.utils import create_device_directories, remove_device_directories, slugify_device_name, compute_sha256 bp = Blueprint("main", __name__) def portal_session_required(view_func): @wraps(view_func) def wrapped(*args, **kwargs): if "otb_user_id" not in session or "otb_tenant_id" not in session: return redirect(url_for("auth.login_required_notice")) return view_func(*args, **kwargs) return wrapped def _client_ip(): return request.headers.get("X-Forwarded-For", request.remote_addr) def _tenant_root() -> Path: return Path(current_app.config["STORAGE_ROOT"]) / "tenants" / session["otb_tenant_slug"] def _stored_name(original_name: str) -> str: safe = secure_filename(original_name or "upload.bin") if not safe: safe = "upload.bin" ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") return f"{ts}__{safe}" def _recovered_filename(original_name: str) -> tuple[str, str, str]: base_name = original_name.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] if "." in base_name: basename, extension = base_name.rsplit(".", 1) recovered_name = f"{basename}-recovered.{extension}" return recovered_name, f"{basename}-recovered", extension recovered_name = f"{base_name}-recovered" return recovered_name, f"{base_name}-recovered", "" def _display_filename(file_row) -> str: if not file_row: return "download.bin" return (file_row.get("display_filename") or file_row.get("original_filename") or "download.bin").strip() def _sanitize_display_basename(raw_name: str) -> str: cleaned = (raw_name or "").replace("\x00", "").strip() cleaned = re.sub(r'[\\/:*?"<>|]+', "", cleaned) cleaned = re.sub(r"\s+", " ", cleaned) cleaned = cleaned.strip().strip(".") if "." in cleaned: cleaned = cleaned.rsplit(".", 1)[0].strip() return cleaned[:200] def _generate_thumbnail(original_path: Path, thumb_path: Path): thumb_path.parent.mkdir(parents=True, exist_ok=True) try: with Image.open(original_path) as img: img.thumbnail((400, 400)) img.save(thumb_path) except Exception: pass def _normalize_browser_path(raw_path: str) -> str: raw = (raw_path or "").replace("\\", "/").strip().strip("/") if not raw: return "" parts = [] for part in raw.split("/"): part = part.strip() if not part or part in (".", ".."): continue parts.append(part) return "/".join(parts) def _safe_path_from_relative(relative_path: str) -> Path: return _tenant_root() / relative_path def _get_device_for_tenant(db, device_id: int): with db.cursor() as cur: cur.execute( """ SELECT id, device_name, device_type, relative_path FROM devices WHERE id = %s AND tenant_id = %s """, (device_id, session["otb_tenant_id"]), ) return cur.fetchone() def _purge_expired_deleted_files(db): expired = [] with db.cursor() as cur: cur.execute( """ SELECT id, relative_path, original_filename FROM files WHERE tenant_id = %s AND is_deleted = 1 AND deleted_at IS NOT NULL AND deleted_at <= (UTC_TIMESTAMP() - INTERVAL 24 HOUR) """, (session["otb_tenant_id"],), ) expired = cur.fetchall() for row in expired: file_path = _safe_path_from_relative(row["relative_path"]) if file_path.exists(): try: file_path.unlink() except FileNotFoundError: pass cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'system', 'deleted_file_purged', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], row["id"], _client_ip(), request.headers.get("User-Agent", ""), f"Purged deleted file '{row['original_filename']}' after retention window", ), ) cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (row["id"], session["otb_tenant_id"])) db.commit() @bp.route("/") def index(): if "otb_user_id" in session: return redirect(url_for("main.dashboard")) return redirect(url_for("auth.login_required_notice")) @bp.route("/dashboard") @portal_session_required def dashboard(): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, device_name, device_type, relative_path, is_active, created_at FROM devices WHERE tenant_id = %s ORDER BY id """, (session["otb_tenant_id"],), ) devices = cur.fetchall() return render_template( "cloud/dashboard.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), devices=devices, ) @bp.route("/devices/android/new", methods=["GET", "POST"]) @portal_session_required def create_android_device(): db = get_db() if request.method == "GET": return render_template( "cloud/android_device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), ) device_name = (request.form.get("device_name") or "").strip() if not device_name: flash("Device name is required.", "warning") return redirect(url_for("main.create_android_device")) slug = slugify_device_name(device_name) relative_path = f"devices/{slug}" import hashlib, secrets, datetime raw_token = secrets.token_hex(16) token_hash = hashlib.sha256(raw_token.encode()).hexdigest() expires_at = datetime.datetime.utcnow() + datetime.timedelta(hours=48) with db.cursor() as cur: # create device bucket cur.execute( """ INSERT INTO devices (tenant_id, device_name, device_type, relative_path) VALUES (%s, %s, %s, %s) """, (session["otb_tenant_id"], device_name, "android", relative_path), ) device_id = cur.lastrowid # create token cur.execute( """ INSERT INTO android_device_tokens (tenant_id, device_id, token_hash, device_label, expires_at) VALUES (%s, %s, %s, %s, %s) """, (session["otb_tenant_id"], device_id, token_hash, device_name, expires_at), ) db.commit() flash(f"Activation token (valid 48h): {raw_token}", "success") flash("⚠️ This token must be used within 48 hours or it will expire.", "warning") return redirect(url_for("main.dashboard")) @bp.route("/devices/new", methods=["GET", "POST"]) @portal_session_required def add_device(): if request.method == "GET": return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), ) device_name = (request.form.get("device_name") or "").strip() device_type = (request.form.get("device_type") or "").strip() if not device_name: flash("Device name is required.", "warning") return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device_name=device_name, device_type=device_type, ) if not device_type: flash("Device type is required.", "warning") return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device_name=device_name, device_type=device_type, ) slug = slugify_device_name(device_name) relative_path = f"devices/{slug}" db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id FROM devices WHERE tenant_id = %s AND device_name = %s """, (session["otb_tenant_id"], device_name), ) existing = cur.fetchone() if existing: flash("A device with that name already exists.", "warning") return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device_name=device_name, device_type=device_type, ) cur.execute( """ INSERT INTO devices (tenant_id, device_name, device_type, relative_path, is_active) VALUES (%s, %s, %s, %s, 1) """, (session["otb_tenant_id"], device_name, device_type, relative_path), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'device_created', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Created device '{device_name}' ({device_type}) at {relative_path}", ), ) db.commit() create_device_directories(session["otb_tenant_slug"], relative_path) flash("Device added successfully.", "success") return redirect(url_for("main.dashboard")) @bp.route("/devices/delete/", methods=["POST"]) @portal_session_required def delete_device(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") return redirect(url_for("main.dashboard")) with db.cursor() as cur: cur.execute( """ SELECT COUNT(*) AS total_file_count, SUM(CASE WHEN is_deleted = 0 THEN 1 ELSE 0 END) AS active_file_count, SUM(CASE WHEN is_deleted = 1 THEN 1 ELSE 0 END) AS deleted_file_count FROM files WHERE tenant_id = %s AND device_id = %s """, (session["otb_tenant_id"], device_id), ) counts = cur.fetchone() total_file_count = int(counts["total_file_count"] or 0) active_file_count = int(counts["active_file_count"] or 0) deleted_file_count = int(counts["deleted_file_count"] or 0) if total_file_count > 0: if deleted_file_count > 0: flash( f"This device is not empty. It still has {active_file_count} active file(s) and {deleted_file_count} file(s) in Deleted Files. Please clear those before deleting the device.", "warning", ) else: flash( f"This device is not empty. It still has {active_file_count} active file(s). Please remove them before deleting the device.", "warning", ) return redirect(url_for("main.dashboard")) cur.execute( """ DELETE FROM android_device_tokens WHERE tenant_id = %s AND device_id = %s """, (session["otb_tenant_id"], device_id), ) cur.execute( """ DELETE FROM devices WHERE id = %s AND tenant_id = %s """, (device_id, session["otb_tenant_id"]), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'device_deleted', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Deleted device '{device['device_name']}' ({device['device_type']}) at {device['relative_path']}", ), ) db.commit() remove_device_directories(session["otb_tenant_slug"], device["relative_path"]) flash("Device removed successfully.", "success") return redirect(url_for("main.dashboard")) @bp.route("/devices//upload", methods=["GET", "POST"]) @portal_session_required def upload_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") return redirect(url_for("main.dashboard")) if device.get("device_type") == "android": flash("Android backup devices only accept uploads from the OTB Cloud mobile app.", "warning") return redirect(url_for("main.dashboard")) view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) if request.method == "GET": return render_template( "cloud/upload.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device=device, ) files = request.files.getlist("files") files = [f for f in files if f and f.filename] if not files: flash("Please choose at least one file to upload.", "warning") return render_template( "cloud/upload.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device=device, ) upload_base = _tenant_root() / device["relative_path"] / "originals" upload_base.mkdir(parents=True, exist_ok=True) uploaded_count = 0 with db.cursor() as cur: for incoming in files: original_filename = incoming.filename or "upload.bin" # preserve folder structure from browser relative_upload_path = original_filename.replace("\\", "/") path_parts = relative_upload_path.split("/") if len(path_parts) > 1: subdirs = "/".join(path_parts[:-1]) filename_only = path_parts[-1] else: subdirs = "" filename_only = path_parts[0] stored_name = _stored_name(filename_only) target_dir = upload_base if subdirs: target_dir = upload_base / subdirs target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / stored_name incoming.save(target_path) size_bytes = target_path.stat().st_size sha256 = compute_sha256(target_path) base_name = original_filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] if "." in base_name: basename, extension = base_name.rsplit(".", 1) else: basename, extension = base_name, "" if subdirs: relative_path = f"{device['relative_path']}/originals/{subdirs}/{stored_name}" directory_path = f"{device['relative_path']}/originals/{subdirs}" else: relative_path = f"{device['relative_path']}/originals/{stored_name}" directory_path = f"{device['relative_path']}/originals" cur.execute( """ INSERT INTO files ( tenant_id, device_id, parent_file_id, file_kind, relative_path, directory_path, original_filename, basename, extension, mime_type, size_bytes, sha256, capture_date, uploaded_at, is_immutable, is_deleted, deleted_at ) VALUES ( %s, %s, NULL, 'original', %s, %s, %s, %s, %s, %s, %s, %s, NULL, UTC_TIMESTAMP(), 1, 0, NULL ) """, ( session["otb_tenant_id"], device["id"], relative_path, directory_path, original_filename, basename, extension, incoming.mimetype or None, size_bytes, sha256, ), ) file_id = cur.lastrowid cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_uploaded', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Uploaded '{original_filename}' to {relative_path}", ), ) uploaded_count += 1 db.commit() flash(f"Uploaded {uploaded_count} file(s) to {device['device_name']}.", "success") return redirect(url_for("main.dashboard")) @bp.route("/files//thumb", methods=["GET"]) @portal_session_required def thumbnail_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, display_filename, relative_path, mime_type FROM files WHERE id = %s AND tenant_id = %s AND is_deleted = 0 """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: return "", 404 original_path = _safe_path_from_relative(file_row["relative_path"]) thumb_rel = file_row["relative_path"].replace("originals/", "derived/thumbs/") thumb_path = _safe_path_from_relative(thumb_rel) if not thumb_path.exists(): _generate_thumbnail(original_path, thumb_path) if thumb_path.exists(): return send_file(thumb_path, mimetype=file_row.get("mime_type"), as_attachment=False) return send_file(original_path, mimetype=file_row.get("mime_type"), as_attachment=False) @bp.route("/files//inline", methods=["GET"]) @portal_session_required def inline_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, display_filename, relative_path, mime_type FROM files WHERE id = %s AND tenant_id = %s AND is_deleted = 0 """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: flash("File not found.", "warning") return redirect(url_for("main.dashboard")) file_path = _safe_path_from_relative(file_row["relative_path"]) if not file_path.exists(): flash("File is missing from storage.", "warning") return redirect(url_for("main.dashboard")) return send_file( file_path, mimetype=file_row.get("mime_type") or None, as_attachment=False, download_name=_display_filename(file_row), conditional=True, ) @bp.route("/files//download", methods=["GET"]) @portal_session_required def download_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, display_filename, relative_path FROM files WHERE id = %s AND tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: flash("File not found.", "warning") return redirect(url_for("main.dashboard")) file_path = _safe_path_from_relative(file_row["relative_path"]) if not file_path.exists(): flash("File is missing from storage.", "warning") return redirect(url_for("main.dashboard")) return send_file(file_path, as_attachment=True, download_name=_display_filename(file_row)) @bp.route("/devices//files/download-selected", methods=["POST"]) @portal_session_required def download_selected_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] if not selected_ids: flash("Select at least one file first.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) if len(selected_ids) != 1: flash("Download Selected currently supports one file at a time. Use Archive Workspace for multiple files.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) return redirect(url_for("main.download_file", file_id=selected_ids[0])) @bp.route("/devices//files/delete-selected", methods=["POST"]) @portal_session_required def delete_selected_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] if not selected_ids: flash("Select at least one file first.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) deleted_count = 0 with db.cursor() as cur: for file_id in selected_ids: cur.execute( """ SELECT id, original_filename, relative_path, directory_path, is_deleted FROM files WHERE id = %s AND tenant_id = %s AND device_id = %s """, (file_id, session["otb_tenant_id"], device_id), ) file_row = cur.fetchone() if not file_row or file_row["is_deleted"]: continue source_path = _safe_path_from_relative(file_row["relative_path"]) target_rel = f"{device['relative_path']}/deleted/{_stored_name(file_row['original_filename'])}" target_path = _safe_path_from_relative(target_rel) target_path.parent.mkdir(parents=True, exist_ok=True) if source_path.exists(): shutil.move(str(source_path), str(target_path)) cur.execute( """ UPDATE files SET relative_path = %s, directory_path = %s, is_deleted = 1, deleted_at = UTC_TIMESTAMP() WHERE id = %s AND tenant_id = %s """, ( target_rel, f"{device['relative_path']}/deleted", file_id, session["otb_tenant_id"], ), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_soft_deleted', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Soft-deleted '{file_row['original_filename']}' into deleted area", ), ) deleted_count += 1 db.commit() flash(f"Deleted {deleted_count} file(s). Deleted files are retained for up to 24 hours unless hard-deleted.", "success") return redirect(url_for("main.browse_device_files", device_id=device_id)) @bp.route("/devices//files/send-to-zip", methods=["POST"]) @portal_session_required def send_selected_to_zip_workspace(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] if not selected_ids: flash("Select at least one file first.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) staging_dir = _tenant_root() / "zip_staging" staging_dir.mkdir(parents=True, exist_ok=True) copied_count = 0 with db.cursor() as cur: for file_id in selected_ids: cur.execute( """ SELECT id, original_filename, display_filename, relative_path, is_deleted FROM files WHERE id = %s AND tenant_id = %s AND device_id = %s """, (file_id, session["otb_tenant_id"], device_id), ) file_row = cur.fetchone() if not file_row or file_row["is_deleted"]: continue source_path = _safe_path_from_relative(file_row["relative_path"]) if not source_path.exists(): continue target_name = _stored_name(_display_filename(file_row)) target_path = staging_dir / target_name shutil.copy2(source_path, target_path) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_staged_for_zip', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Copied '{_display_filename(file_row)}' into zip workspace", ), ) copied_count += 1 db.commit() flash(f"Sent {copied_count} file(s) to Archive Workspace.", "success") return redirect(url_for("main.browse_device_files", device_id=device_id)) @bp.route("/workspace/zip", methods=["GET"]) @portal_session_required def zip_workspace(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" exports_dir = tenant_root / "exports" staging_dir.mkdir(parents=True, exist_ok=True) exports_dir.mkdir(parents=True, exist_ok=True) staged_files = [] for p in sorted(staging_dir.iterdir(), key=lambda x: x.name.lower()): if p.is_file(): staged_files.append({ "name": p.name, "size_bytes": p.stat().st_size, "path": str(p.relative_to(tenant_root)), }) export_files = [] for p in sorted(exports_dir.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file(): export_files.append({ "name": p.name, "size_bytes": p.stat().st_size, "path": str(p.relative_to(tenant_root)), }) return render_template( "cloud/zip_workspace.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), staged_files=staged_files, export_files=export_files, ) def _prepare_report_image_for_pdf(source_path, quality_mode="standard"): """ Compress a staged image into a temporary JPEG for PDF embedding. Original uploaded/staged image is not modified. """ modes = { "high": {"max_px": 2400, "quality": 82}, "standard": {"max_px": 1600, "quality": 70}, "compressed": {"max_px": 1200, "quality": 55}, } cfg = modes.get(quality_mode, modes["standard"]) with PILImage.open(source_path) as im: im = ImageOps.exif_transpose(im) if im.mode in ("RGBA", "LA", "P"): bg = PILImage.new("RGB", im.size, (255, 255, 255)) if im.mode == "P": im = im.convert("RGBA") bg.paste(im, mask=im.split()[-1] if im.mode in ("RGBA", "LA") else None) im = bg else: im = im.convert("RGB") max_px = cfg["max_px"] if max(im.size) > max_px: ratio = max_px / float(max(im.size)) new_size = (max(1, int(im.width * ratio)), max(1, int(im.height * ratio))) im = im.resize(new_size, PILImage.LANCZOS) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") tmp.close() im.save(tmp.name, "JPEG", quality=cfg["quality"], optimize=True, progressive=True) return tmp.name, im.width, im.height def _pdf_scaled_dimensions(width, height, max_w=500, max_h=620): scale = min(max_w / float(width), max_h / float(height), 1.0) return width * scale, height * scale @bp.route("/workspace/zip/flush", methods=["POST"]) @portal_session_required def flush_zip_workspace(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" staging_dir.mkdir(parents=True, exist_ok=True) removed = 0 for p in staging_dir.iterdir(): if p.is_file(): p.unlink(missing_ok=True) removed += 1 db = get_db() with db.cursor() as cur: cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'zip_workspace_flushed', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Flushed Archive Workspace; removed {removed} staged file(s)", ), ) db.commit() flash(f"Archive Workspace flushed. Removed {removed} staged file(s).", "success") return redirect(url_for("main.zip_workspace")) @bp.route("/workspace/zip/create", methods=["POST"]) @portal_session_required def create_zip_from_workspace(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" exports_dir = tenant_root / "exports" staging_dir.mkdir(parents=True, exist_ok=True) exports_dir.mkdir(parents=True, exist_ok=True) staged = [p for p in staging_dir.iterdir() if p.is_file()] if not staged: flash("Archive Workspace is empty.", "warning") return redirect(url_for("main.zip_workspace")) archive_name = (request.form.get("archive_name") or "").strip() archive_format = (request.form.get("format") or "zip").strip().lower() if not archive_name: archive_name = f"otb-cloud-export-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}" archive_name = re.sub(r"[^A-Za-z0-9._-]+", "_", archive_name).strip("._-") if not archive_name: archive_name = f"otb-cloud-export-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}" if archive_format == "tar": archive_filename = f"{archive_name}.tar" archive_path = exports_dir / archive_filename with tarfile.open(archive_path, "w") as tf: for p in staged: arcname = p.name.split("__", 1)[1] if "__" in p.name else p.name tf.add(p, arcname=arcname) elif archive_format == "targz": archive_filename = f"{archive_name}.tar.gz" archive_path = exports_dir / archive_filename with tarfile.open(archive_path, "w:gz") as tf: for p in staged: arcname = p.name.split("__", 1)[1] if "__" in p.name else p.name tf.add(p, arcname=arcname) elif archive_format == "pdf": archive_filename = f"{archive_name}.pdf" archive_path = exports_dir / archive_filename image_files = [] for p in staged: if p.suffix.lower() in [".jpg", ".jpeg", ".png", ".webp"]: image_files.append(p) if not image_files: flash("PDF Job Report needs image files only. No supported images were staged.", "warning") return redirect(url_for("main.zip_workspace")) doc = SimpleDocTemplate(str(archive_path), pagesize=letter) styles = getSampleStyleSheet() elements = [] elements.append(Paragraph("OTB Cloud Job Report", styles["Title"])) elements.append(Spacer(1, 12)) elements.append(Paragraph(f"Generated UTC: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}", styles["Normal"])) elements.append(Paragraph(f"Images included: {len(image_files)}", styles["Normal"])) elements.append(Spacer(1, 24)) max_w = 500 max_h = 620 for idx, img_path in enumerate(image_files, start=1): display_name = img_path.name.split("__", 1)[1] if "__" in img_path.name else img_path.name try: tmp_img, w, h = _prepare_report_image_for_pdf(img_path, "standard") pdf_temp_files.append(tmp_img) draw_w, draw_h = _pdf_scaled_dimensions(w, h, max_w, max_h) elements.append(Paragraph(f"{idx}. {display_name}", styles["Heading3"])) elements.append(Spacer(1, 8)) elements.append(ReportLabImage(tmp_img, width=draw_w, height=draw_h)) elements.append(Spacer(1, 18)) if idx != len(image_files): elements.append(PageBreak()) except Exception as e: elements.append(Paragraph(f"Skipped image: {display_name} ({e})", styles["Normal"])) elements.append(Spacer(1, 12)) pdf_temp_files = [] try: doc.build(elements) finally: for tmp_path in pdf_temp_files: try: os.unlink(tmp_path) except OSError: pass else: archive_format = "zip" archive_filename = f"{archive_name}.zip" archive_path = exports_dir / archive_filename with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for p in staged: arcname = p.name.split("__", 1)[1] if "__" in p.name else p.name zf.write(p, arcname=arcname) for p in staged: p.unlink(missing_ok=True) db = get_db() with db.cursor() as cur: cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'zip_created', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Created {archive_format} export '{archive_filename}' in exports workspace; staged copies cleared after success", ), ) db.commit() flash(f"Archive created successfully. You can find it in the Exports section below as '{archive_filename}'.", "success") return redirect(url_for("main.zip_workspace")) @bp.route("/workspace/exports//download", methods=["GET"]) @portal_session_required def download_export(filename: str): exports_dir = _tenant_root() / "exports" file_path = exports_dir / filename if not file_path.exists() or not file_path.is_file(): flash("Export file not found.", "warning") return redirect(url_for("main.zip_workspace")) return send_file(file_path, as_attachment=True, download_name=file_path.name) @bp.route("/deleted", methods=["GET"]) @portal_session_required def deleted_files(): db = get_db() _purge_expired_deleted_files(db) with db.cursor() as cur: cur.execute( """ SELECT f.id, f.original_filename, f.relative_path, f.size_bytes, f.deleted_at, d.device_name, d.device_type FROM files f LEFT JOIN devices d ON f.device_id = d.id WHERE f.tenant_id = %s AND f.is_deleted = 1 ORDER BY f.deleted_at DESC, f.id DESC """, (session["otb_tenant_id"],), ) files = cur.fetchall() return render_template( "cloud/deleted_files.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), files=files, ) @bp.route("/deleted//recover", methods=["POST"]) @portal_session_required def recover_deleted_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT f.id, f.device_id, f.original_filename, f.relative_path, f.is_deleted, d.device_name, d.device_type, d.relative_path AS device_relative_path FROM files f LEFT JOIN devices d ON f.device_id = d.id WHERE f.id = %s AND f.tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row or not file_row["is_deleted"]: flash("Deleted file not found.", "warning") return redirect(url_for("main.deleted_files")) if not file_row["device_relative_path"]: flash("Cannot recover this file because its device record is missing.", "warning") return redirect(url_for("main.deleted_files")) source_path = _safe_path_from_relative(file_row["relative_path"]) if not source_path.exists(): flash("Deleted file is missing from storage.", "warning") return redirect(url_for("main.deleted_files")) recovered_name, recovered_basename, recovered_extension = _recovered_filename(file_row["original_filename"]) stored_name = _stored_name(recovered_name) target_rel = f"{file_row['device_relative_path']}/originals/{stored_name}" target_dir = f"{file_row['device_relative_path']}/originals" target_path = _safe_path_from_relative(target_rel) target_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source_path), str(target_path)) cur.execute( """ UPDATE files SET original_filename = %s, display_filename = NULL, basename = %s, extension = %s, relative_path = %s, directory_path = %s, is_deleted = 0, deleted_at = NULL WHERE id = %s AND tenant_id = %s """, ( recovered_name, recovered_basename, recovered_extension, target_rel, target_dir, file_id, session["otb_tenant_id"], ), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_recovered', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Recovered deleted file as '{recovered_name}' back into originals", ), ) db.commit() flash(f"Recovered file as '{recovered_name}'. You can find it back in the device file browser.", "success") return redirect(url_for("main.deleted_files")) @bp.route("/deleted//hard-delete", methods=["POST"]) @portal_session_required def hard_delete_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, relative_path, is_deleted FROM files WHERE id = %s AND tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row or not file_row["is_deleted"]: flash("Deleted file not found.", "warning") return redirect(url_for("main.deleted_files")) file_path = _safe_path_from_relative(file_row["relative_path"]) if file_path.exists(): file_path.unlink(missing_ok=True) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_hard_deleted', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Hard-deleted '{file_row['original_filename']}' immediately from deleted area", ), ) cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (file_id, session["otb_tenant_id"])) db.commit() flash("File permanently deleted.", "success") return redirect(url_for("main.deleted_files")) @bp.route("/files//rename", methods=["POST"]) @portal_session_required def rename_file(file_id: int): db = get_db() requested_basename = _sanitize_display_basename(request.form.get("display_basename") or "") with db.cursor() as cur: cur.execute( """ SELECT id, device_id, original_filename, display_filename, extension, is_deleted FROM files WHERE id = %s AND tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: flash("File not found.", "warning") return redirect(url_for("main.dashboard")) if file_row["is_deleted"]: flash("You can only rename active files from the device file browser.", "warning") return redirect(url_for("main.deleted_files")) if not requested_basename: flash("Please enter a new file name.", "warning") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) new_visible_name = ( f"{requested_basename}.{file_row['extension']}" if file_row["extension"] else requested_basename ) if len(new_visible_name) > 255: flash("That file name is too long.", "warning") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) current_visible_name = _display_filename(file_row) if new_visible_name == current_visible_name: flash("That file already has that name.", "warning") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) display_filename = None if new_visible_name == file_row["original_filename"] else new_visible_name cur.execute( """ UPDATE files SET display_filename = %s WHERE id = %s AND tenant_id = %s """, (display_filename, file_id, session["otb_tenant_id"]), ) final_visible_name = display_filename or file_row["original_filename"] cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_display_renamed', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Updated display filename from '{current_visible_name}' to '{final_visible_name}'", ), ) db.commit() if display_filename is None: flash("Custom name cleared. The original file name is now shown again.", "success") else: flash(f"File renamed to '{display_filename}'.", "success") return redirect(url_for( "main.browse_device_files", device_id=file_row["device_id"], path=request.form.get("path", ""), view=request.form.get("view", "list"), page=request.form.get("page", "1"), )) @bp.route("/devices//files", methods=["GET"]) @portal_session_required def browse_device_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") return redirect(url_for("main.dashboard")) current_path = _normalize_browser_path(request.args.get("path", "")) requested_view = request.args.get("view") if requested_view: view_mode = requested_view.strip().lower() elif current_path == "images" or current_path.startswith("images/"): view_mode = "gallery" else: view_mode = "list" if view_mode not in ("list", "gallery"): view_mode = "list" root_directory = f"{device['relative_path']}/originals" current_directory = f"{root_directory}/{current_path}" if current_path else root_directory try: page = max(1, int(request.args.get("page", "1"))) except Exception: page = 1 per_page_param = request.args.get("per_page", "100").strip().lower() if per_page_param == "all": per_page = None offset = 0 page = 1 else: try: per_page = int(per_page_param) except Exception: per_page = 100 if per_page not in (100, 250, 500): per_page = 100 offset = (page - 1) * per_page with db.cursor() as cur: base_files_sql = """ SELECT id, file_kind, relative_path, directory_path, original_filename, display_filename, basename, extension, mime_type, size_bytes, sha256, uploaded_at, is_immutable FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 AND directory_path = %s ORDER BY uploaded_at DESC, id DESC """ if per_page is None: cur.execute( base_files_sql, (session["otb_tenant_id"], device_id, current_directory), ) else: cur.execute( base_files_sql + " LIMIT %s OFFSET %s", (session["otb_tenant_id"], device_id, current_directory, per_page, offset), ) files = cur.fetchall() cur.execute( """ SELECT COUNT(*) AS total_files FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 AND directory_path = %s """, (session["otb_tenant_id"], device_id, current_directory), ) count_row = cur.fetchone() or {"total_files": 0} total_files = count_row["total_files"] if isinstance(count_row, dict) else count_row[0] cur.execute( """ SELECT DISTINCT directory_path FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 AND directory_path LIKE %s ORDER BY directory_path """, (session["otb_tenant_id"], device_id, f"{current_directory}/%"), ) all_dirs = [row["directory_path"] for row in cur.fetchall()] folder_names = [] prefix = current_directory + "/" for d in all_dirs: remainder = d[len(prefix):] if d.startswith(prefix) else "" if not remainder: continue first_segment = remainder.split("/", 1)[0] if first_segment and first_segment not in folder_names: folder_names.append(first_segment) folders = [] for name in sorted(folder_names, key=lambda x: x.lower()): folder_path = f"{current_path}/{name}" if current_path else name folders.append( { "name": name, "path": folder_path, } ) breadcrumbs = [ { "label": device["device_name"], "path": "", } ] if current_path: accum = [] for segment in current_path.split("/"): accum.append(segment) breadcrumbs.append( { "label": segment, "path": "/".join(accum), } ) parent_path = "" if current_path: parts = current_path.split("/") parent_path = "/".join(parts[:-1]) processed_videos = [] try: from pathlib import Path tenant = session.get("tenant") or "def" with db.cursor() as cur: cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if tenant_row: storage_root = Path(tenant_row["storage_root"]) device_root = storage_root / device["relative_path"] video_dir = device_root / "video" if video_dir.exists(): for p in sorted(video_dir.glob("*"), reverse=True): if p.is_file(): processed_videos.append({ "name": p.name, "relative_path": str(p.relative_to(storage_root)), "size": p.stat().st_size, }) except Exception: processed_videos = [] return render_template( "cloud/device_files.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device=device, files=files, file_count=len(files), total_files=total_files, page=page, per_page=per_page, has_prev=page > 1, has_next=(per_page is not None and (offset + per_page) < total_files), view_mode=view_mode, current_path=current_path, parent_path=parent_path, folders=folders, breadcrumbs=breadcrumbs, ) @bp.route("/api/android/activate", methods=["POST"]) def android_activate(): db = get_db() payload = request.get_json(silent=True) or {} raw_token = (payload.get("token") or "").strip() device_uuid = (payload.get("device_uuid") or "").strip() phone_label = (payload.get("phone_label") or "").strip() if not raw_token: return jsonify({"ok": False, "error": "missing_token"}), 400 if not device_uuid: return jsonify({"ok": False, "error": "missing_device_uuid"}), 400 token_hash = hashlib.sha256(raw_token.encode()).hexdigest() with db.cursor() as cur: cur.execute( """ SELECT t.id, t.tenant_id, t.device_id, t.status, t.expires_at, d.device_name, d.relative_path FROM android_device_tokens t JOIN devices d ON d.id = t.device_id WHERE t.token_hash = %s LIMIT 1 """, (token_hash,), ) row = cur.fetchone() if not row: return jsonify({"ok": False, "error": "invalid_token"}), 404 if row["status"] == "activated": return jsonify({"ok": True, "status": "already_activated"}), 200 cur.execute( """ UPDATE android_device_tokens SET status='activated', activated_at=UTC_TIMESTAMP(), device_uuid=%s WHERE id=%s """, (device_uuid, row["id"]), ) db.commit() return jsonify({ "ok": True, "device_id": row["device_id"], "device_name": row["device_name"], "relative_path": row["relative_path"] }) @bp.route("/api/android/upload", methods=["POST"]) def android_upload(): db = get_db() device_uuid = (request.form.get("device_uuid") or "").strip() if not device_uuid: return jsonify({"ok": False, "error": "missing_device_uuid"}), 400 with db.cursor() as cur: cur.execute( """ SELECT t.tenant_id, t.device_id, t.status, d.device_name, d.relative_path, tn.slug AS tenant_slug FROM android_device_tokens t JOIN devices d ON d.id = t.device_id JOIN tenants tn ON tn.id = t.tenant_id WHERE t.device_uuid = %s LIMIT 1 """, (device_uuid,), ) row = cur.fetchone() if not row: return jsonify({"ok": False, "error": "device_not_found"}), 404 if row["status"] != "activated": return jsonify({"ok": False, "error": "device_not_activated"}), 403 files = request.files.getlist("files") files = [f for f in files if f and f.filename] if not files: return jsonify({"ok": False, "error": "no_files"}), 400 base_path = Path(current_app.config["STORAGE_ROOT"]) / "tenants" / row["tenant_slug"] / row["relative_path"] / "originals" base_path.mkdir(parents=True, exist_ok=True) uploaded_count = 0 with db.cursor() as cur: for incoming in files: original_filename = incoming.filename or "upload.bin" stored_name = _stored_name(original_filename) mime = (incoming.mimetype or "").lower() if mime.startswith("video/"): subdir = "video" else: subdir = "images" upload_base = base_path / subdir upload_base.mkdir(parents=True, exist_ok=True) target_path = upload_base / stored_name incoming.save(target_path) size_bytes = target_path.stat().st_size sha256 = compute_sha256(target_path) if "." in original_filename: basename, extension = original_filename.rsplit(".", 1) else: basename, extension = original_filename, "" relative_path = f"{row['relative_path']}/originals/{subdir}/{stored_name}" directory_path = f"{row['relative_path']}/originals/{subdir}" cur.execute( """ INSERT INTO files ( tenant_id, device_id, parent_file_id, file_kind, relative_path, directory_path, original_filename, basename, extension, mime_type, size_bytes, sha256, capture_date, uploaded_at, is_immutable, is_deleted, deleted_at ) VALUES ( %s, %s, NULL, 'original', %s, %s, %s, %s, %s, %s, %s, %s, NULL, UTC_TIMESTAMP(), 1, 0, NULL ) """, ( row["tenant_id"], row["device_id"], relative_path, directory_path, original_filename, basename, extension, incoming.mimetype or None, size_bytes, sha256, ), ) file_id = cur.lastrowid cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, NULL, 'device', 'file_uploaded', %s, %s, %s, %s) """, ( row["tenant_id"], file_id, request.headers.get("X-Forwarded-For", request.remote_addr), request.headers.get("User-Agent", ""), f"Android upload '{original_filename}' to {relative_path}", ), ) uploaded_count += 1 db.commit() return jsonify({ "ok": True, "uploaded": uploaded_count, "device_name": row["device_name"] }), 200 @bp.route("/api/android/file-exists", methods=["GET"]) def android_file_exists(): db = get_db() device_uuid = (request.args.get("device_uuid") or "").strip() original_filename = (request.args.get("original_filename") or "").strip() size_bytes_raw = (request.args.get("size_bytes") or "").strip() if not device_uuid: return jsonify({"ok": False, "error": "missing_device_uuid"}), 400 if not original_filename: return jsonify({"ok": False, "error": "missing_original_filename"}), 400 try: size_bytes = int(size_bytes_raw) except Exception: return jsonify({"ok": False, "error": "invalid_size_bytes"}), 400 with db.cursor() as cur: cur.execute( """ SELECT t.tenant_id, t.device_id, t.status FROM android_device_tokens t WHERE t.device_uuid = %s LIMIT 1 """, (device_uuid,), ) token_row = cur.fetchone() if not token_row: return jsonify({"ok": False, "error": "device_not_found"}), 404 if token_row["status"] != "activated": return jsonify({"ok": False, "error": "device_not_activated"}), 403 cur.execute( """ SELECT id FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 AND original_filename = %s AND size_bytes = %s LIMIT 1 """, ( token_row["tenant_id"], token_row["device_id"], original_filename, size_bytes, ), ) file_row = cur.fetchone() return jsonify({ "ok": True, "exists": bool(file_row), }), 200 @bp.route("/workspace/exports//move-to-lts", methods=["POST"]) @portal_session_required def move_export_to_lts(filename: str): tenant_root = _tenant_root() exports_dir = tenant_root / "exports" lts_dir = tenant_root / "lts" lts_dir.mkdir(parents=True, exist_ok=True) src = exports_dir / filename dst = lts_dir / filename if not src.exists(): flash("Archive not found.", "warning") return redirect(url_for("main.zip_workspace")) src.rename(dst) flash(f"Moved '{filename}' to LTS storage.", "success") return redirect(url_for("main.zip_workspace")) @bp.route("/workspace/exports//download-remove", methods=["GET"]) @portal_session_required def download_and_remove_export(filename: str): exports_dir = _tenant_root() / "exports" file_path = exports_dir / filename if not file_path.exists() or not file_path.is_file(): abort(404) @after_this_request def remove_file(response): try: file_path.unlink(missing_ok=True) except Exception: pass return response return send_file(file_path, as_attachment=True) @bp.route("/workspace/lts", methods=["GET"]) @portal_session_required def lts_view(): tenant_root = _tenant_root() lts_dir = tenant_root / "lts" lts_dir.mkdir(parents=True, exist_ok=True) lts_files = [] for p in lts_dir.iterdir(): if p.is_file(): lts_files.append({ "name": p.name, "size_bytes": p.stat().st_size, "path": str(p), }) return render_template( "cloud/lts.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), lts_files=lts_files, ) # ========================= # VIDEO WORKSHOP (alpha3-a) # ========================= from app.services.video_jobs import create_video_job, list_jobs_for_tenant @bp.route("/image-workshop/") @portal_session_required def image_workshop(device_id): return render_template( "cloud/image_workshop.html", device_id=device_id, user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), ) @bp.route("/workshop/") @portal_session_required def workshop(device_id): from app.db import get_db db = get_db() with db.cursor() as cur: cur.execute( """ SELECT COUNT(*) AS queued_jobs FROM video_jobs WHERE status = 'queued' """ ) qrow = cur.fetchone() or {"queued_jobs": 0} cur.execute( """ SELECT COUNT(DISTINCT tenant_id) AS active_users FROM video_jobs WHERE status IN ('queued', 'processing') """ ) arow = cur.fetchone() or {"active_users": 0} return render_template( "cloud/workshop.html", device_id=device_id, queued_jobs=qrow["queued_jobs"] or 0, active_users=arow["active_users"] or 0, ) @bp.route("/api/video/enqueue", methods=["POST"]) def video_enqueue(): from uuid import uuid4 data = request.json or {} tenant = session.get("tenant") or 'def' device_id = data.get("device_id") files = data.get("files", []) profiles = data.get("profiles", []) rotation_override = data.get("rotation_override") if not profiles: profiles = ["default"] batch_id = uuid4().hex job_ids = [] for f in files: for profile in profiles: job_id = create_video_job( tenant=tenant, device_id=device_id, source_file_id=f, profile=profile, rotation_override=rotation_override, batch_id=batch_id ) job_ids.append(job_id) return jsonify({"status": "ok", "jobs": job_ids, "batch_id": batch_id}) @bp.route("/video-jobs") def global_video_jobs(): from app.db import get_db tenant = session.get("tenant") or "def" db = get_db() with db.cursor() as cur: cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if not tenant_row: return "Tenant not found", 404 tenant_id = tenant_row["id"] storage_root = Path(tenant_row["storage_root"]) with db.cursor() as cur: cur.execute( """ SELECT vj.id, vj.device_id, d.device_name, vj.source_file_id, vj.source_relative_path, vj.source_original_filename, vj.requested_profile, vj.requested_rotation_degrees, vj.status, vj.progress_percent, vj.assigned_processor, vj.output_relative_path, vj.error_message, vj.created_at, vj.started_at, vj.completed_at, vj.gpu_seconds FROM video_jobs vj LEFT JOIN devices d ON d.id = vj.device_id WHERE vj.tenant_id = %s ORDER BY vj.id DESC LIMIT 300 """, (tenant_id,) ) rows = cur.fetchall() def safe_size(rel_path): if not rel_path: return None p = storage_root / rel_path try: if p.exists() and p.is_file(): return p.stat().st_size except Exception: pass return None jobs = [] for r in rows: jobs.append({ "id": r["id"], "device_id": r["device_id"], "device_name": r["device_name"] or f"Device {r['device_id']}", "source_file_id": r["source_file_id"], "filename": r["source_original_filename"], "source_relative_path": r["source_relative_path"], "profile": r["requested_profile"], "rotation_override": r["requested_rotation_degrees"], "status": r["status"], "progress_percent": r["progress_percent"], "assigned_processor": r["assigned_processor"], "output_relative_path": r["output_relative_path"], "error_message": r["error_message"], "original_size": safe_size(r["source_relative_path"]), "processed_size": safe_size(r["output_relative_path"]), "gpu_seconds": r["gpu_seconds"] or 0, "created_at": str(r["created_at"]) if r["created_at"] else "", "started_at": str(r["started_at"]) if r["started_at"] else "", "completed_at": str(r["completed_at"]) if r["completed_at"] else "", }) return render_template("cloud/video_jobs.html", jobs=jobs) @bp.route("/health") def cloud_health(): from pathlib import Path version_file = Path(current_app.root_path).parent / "VERSION" app_version = version_file.read_text().strip() if version_file.exists() else "unknown" from app.db import get_db tenant = session.get("tenant") or "def" db = get_db() with db.cursor() as cur: cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if not tenant_row: return "Tenant not found", 404 tenant_id = tenant_row["id"] storage_root = Path(tenant_row["storage_root"]) def scan_dir(rel_path): p = storage_root / rel_path count = 0 total = 0 if p.exists(): for f in p.rglob("*"): if f.is_file(): count += 1 total += f.stat().st_size return count, total uploaded_count, uploaded_bytes = scan_dir("devices") lts_count, lts_bytes = scan_dir("lts") archive_count, archive_bytes = scan_dir("archive") total_used = 0 if storage_root.exists(): for f in storage_root.rglob("*"): if f.is_file(): total_used += f.stat().st_size with db.cursor() as cur: cur.execute( """ SELECT COALESCE(video_jobs_total,0) AS total_jobs, COALESCE(video_jobs_complete,0) AS complete_jobs, COALESCE(video_jobs_failed,0) AS failed_jobs, COALESCE(gpu_seconds_total,0) AS gpu_seconds FROM tenant_usage_metrics WHERE tenant_id = %s LIMIT 1 """, (tenant_id,) ) stats = cur.fetchone() or { "total_jobs": 0, "complete_jobs": 0, "failed_jobs": 0, "gpu_seconds": 0, } with db.cursor() as cur: cur.execute( """ SELECT COUNT(*) AS image_processed FROM files WHERE tenant_id = %s AND file_kind = 'image_processed' AND is_deleted = 0 """, (tenant_id,), ) image_row = cur.fetchone() or {"image_processed": 0} image_processed = image_row["image_processed"] or 0 def human_bytes(n): n = int(n or 0) if n < 1024: return f"{n} B" if n < 1024**2: return f"{n/1024:.1f} KB" if n < 1024**3: return f"{n/1024**2:.2f} MB" return f"{n/1024**3:.2f} GB" def human_seconds(n): n = int(n or 0) h = n // 3600 m = (n % 3600) // 60 s = n % 60 parts = [] if h: parts.append(f"{h}h") if m: parts.append(f"{m}m") parts.append(f"{s}s") return " ".join(parts) return render_template( "cloud/health.html", app_version=app_version, uploaded_count=uploaded_count, uploaded_bytes=human_bytes(uploaded_bytes), lts_count=lts_count, lts_bytes=human_bytes(lts_bytes), archive_count=archive_count, archive_bytes=human_bytes(archive_bytes), total_used=human_bytes(total_used), image_processed=image_processed, total_jobs=stats["total_jobs"] or 0, complete_jobs=stats["complete_jobs"] or 0, failed_jobs=stats["failed_jobs"] or 0, gpu_time=human_seconds(stats["gpu_seconds"] or 0), ) @bp.route("/video-output//view") def view_video_output(job_id): from app.db import get_db tenant = session.get("tenant") or "def" db = get_db() with db.cursor() as cur: cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if not tenant_row: return "Tenant not found", 404 tenant_id = tenant_row["id"] storage_root = tenant_row["storage_root"] cur.execute( """ SELECT output_relative_path FROM video_jobs WHERE id = %s AND tenant_id = %s LIMIT 1 """, (job_id, tenant_id) ) job = cur.fetchone() if not job or not job["output_relative_path"]: return "No output file for this job", 404 full_path = Path(storage_root) / job["output_relative_path"] if not full_path.exists(): return "Output file missing on disk", 404 return send_file(full_path, as_attachment=False) @bp.route("/video-output//send-to-lts", methods=["POST"]) def send_video_output_to_lts(job_id): from app.db import get_db import shutil tenant = session.get("tenant") or "def" db = get_db() with db.cursor() as cur: cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if not tenant_row: return jsonify({"ok": False, "error": "Tenant not found"}), 404 tenant_id = tenant_row["id"] storage_root = Path(tenant_row["storage_root"]) cur.execute( """ SELECT id, output_relative_path FROM video_jobs WHERE id = %s AND tenant_id = %s LIMIT 1 """, (job_id, tenant_id) ) job = cur.fetchone() if not job or not job["output_relative_path"]: return jsonify({"ok": False, "error": "Job output not found"}), 404 src = storage_root / job["output_relative_path"] if not src.exists(): return jsonify({"ok": False, "error": "Output file missing on disk"}), 404 ext = src.suffix.lower() if ext in [".mp4", ".mov", ".mkv", ".webm", ".avi"]: lts_rel_dir = Path("lts") / "video" elif ext in [".zip", ".tar", ".gz", ".7z", ".rar"]: lts_rel_dir = Path("lts") / "archived" elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"]: lts_rel_dir = Path("lts") / "pictures" else: lts_rel_dir = Path("lts") / "archived" lts_dir = storage_root / lts_rel_dir lts_dir.mkdir(parents=True, exist_ok=True) dest = lts_dir / src.name if dest.exists(): stem = dest.stem suffix = dest.suffix n = 2 while True: candidate = lts_dir / f"{stem}-{n}{suffix}" if not candidate.exists(): dest = candidate break n += 1 shutil.move(str(src), str(dest)) with db.cursor() as cur: cur.execute( """ UPDATE video_jobs SET output_relative_path = %s WHERE id = %s AND tenant_id = %s """, (str(dest.relative_to(storage_root)), job_id, tenant_id) ) db.commit() return jsonify({"ok": True, "output_relative_path": str(dest.relative_to(storage_root))}) @bp.route("/video-output//download") def download_video_output(job_id): from app.db import get_db tenant = session.get("tenant") or "def" db = get_db() with db.cursor() as cur: cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if not tenant_row: return "Tenant not found", 404 tenant_id = tenant_row["id"] storage_root = tenant_row["storage_root"] cur.execute( """ SELECT output_relative_path, source_original_filename, status FROM video_jobs WHERE id = %s AND tenant_id = %s LIMIT 1 """, (job_id, tenant_id) ) job = cur.fetchone() if not job: return "Job not found", 404 if not job["output_relative_path"]: return "No output file for this job", 404 full_path = Path(storage_root) / job["output_relative_path"] if not full_path.exists(): return "Output file missing on disk", 404 download_name = Path(job["output_relative_path"]).name return send_file(full_path, as_attachment=True, download_name=download_name) @bp.route("/api/video/jobs//delete", methods=["POST"]) def video_job_delete(job_id): from app.db import get_db tenant = session.get("tenant") or "def" db = get_db() with db.cursor() as cur: cur.execute("SELECT id FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) tenant_row = cur.fetchone() if not tenant_row: return jsonify({"ok": False, "error": "tenant not found"}), 404 tenant_id = tenant_row["id"] cur.execute( "DELETE FROM video_jobs WHERE id = %s AND tenant_id = %s", (job_id, tenant_id) ) deleted = cur.rowcount db.commit() if not deleted: return jsonify({"ok": False, "error": "job not found"}), 404 return jsonify({"ok": True, "deleted_id": job_id}) @bp.route("/api/video/jobs") def video_jobs(): tenant = session.get("tenant") or 'def' jobs = list_jobs_for_tenant(tenant) return jsonify(jobs) @bp.route("/api/video/queue-summary") def video_queue_summary(): from app.db import get_db db = get_db() cur = db.cursor() cur.execute("SELECT COUNT(*) AS c FROM video_jobs WHERE status='queued'") row1 = cur.fetchone() queue_count = row1["c"] if isinstance(row1, dict) else row1[0] cur.execute(""" SELECT COUNT(DISTINCT tenant_id) AS c FROM video_jobs WHERE status IN ('queued','processing') """) row2 = cur.fetchone() active_users = row2["c"] if isinstance(row2, dict) else row2[0] return { "queue_count": queue_count, "active_users": active_users } @bp.route("/api/image/process", methods=["POST"]) @portal_session_required def image_process(): from PIL import Image, ImageOps from datetime import datetime db = get_db() data = request.get_json(silent=True) or {} items = data.get("items") or [] state = data.get("state") or {} if not isinstance(items, list) or not items: return jsonify({"ok": False, "error": "no_images_selected"}), 400 if len(items) > 25: return jsonify({"ok": False, "error": "Image Workshop is limited to 25 images per batch."}), 400 tenant_id = session.get("otb_tenant_id") tenant_slug = session.get("otb_tenant_slug") or session.get("tenant") or "def" processed = [] with db.cursor() as cur: cur.execute("SELECT storage_root FROM tenants WHERE id = %s LIMIT 1", (tenant_id,)) tenant_row = cur.fetchone() if not tenant_row: return jsonify({"ok": False, "error": "tenant_not_found"}), 404 storage_root = Path(tenant_row["storage_root"]) for item in items: try: file_id = int(item.get("id")) except Exception: continue with db.cursor() as cur: cur.execute( """ SELECT id, tenant_id, device_id, relative_path, directory_path, original_filename, display_filename, mime_type FROM files WHERE id = %s AND tenant_id = %s AND is_deleted = 0 AND mime_type LIKE 'image%%' LIMIT 1 """, (file_id, tenant_id), ) row = cur.fetchone() if not row: continue cfg = state.get(str(file_id)) or state.get(file_id) or {} rotation = int(cfg.get("rotation") or 0) filter_name = cfg.get("filter") fmt = (cfg.get("format") or "").lower() src = storage_root / row["relative_path"] if not src.exists() or not src.is_file(): continue out_dir_rel = row["directory_path"] out_dir = storage_root / out_dir_rel out_dir.mkdir(parents=True, exist_ok=True) src_name = Path(row["original_filename"]) requested_name = (cfg.get("name") or "").strip() if requested_name: base = "".join(c for c in requested_name if c.isalnum() or c in ("-", "_", " ")).strip().replace(" ", "_") else: base = src_name.stem + "_edited" requested_ext = fmt if fmt in ("jpg", "jpeg", "png", "webp") else src_name.suffix.lower().lstrip(".") if requested_ext == "jpeg": requested_ext = "jpg" if requested_ext not in ("jpg", "png", "webp"): requested_ext = "jpg" stamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S%fZ") out_name = f"{base}.{requested_ext}" if (out_dir / out_name).exists(): out_name = f"{base}_{stamp}.{requested_ext}" out_path = out_dir / out_name out_rel = f"{out_dir_rel}/{out_name}" with Image.open(src) as img: img = ImageOps.exif_transpose(img) if rotation in (90, 180, 270): img = img.rotate(-rotation, expand=True) if filter_name == "bw": img = ImageOps.grayscale(img).convert("RGB") elif filter_name == "sepia": gray = ImageOps.grayscale(img) img = ImageOps.colorize(gray, "#2b1b0f", "#f2d3a3").convert("RGB") else: if img.mode not in ("RGB", "RGBA"): img = img.convert("RGB") save_kwargs = {} if requested_ext in ("jpg", "webp"): if img.mode == "RGBA": img = img.convert("RGB") save_kwargs["quality"] = 88 if requested_ext == "jpg": img.save(out_path, "JPEG", **save_kwargs) mime = "image/jpeg" elif requested_ext == "png": img.save(out_path, "PNG") mime = "image/png" else: img.save(out_path, "WEBP", **save_kwargs) mime = "image/webp" size_bytes = out_path.stat().st_size sha256 = compute_sha256(out_path) with db.cursor() as cur: cur.execute( """ INSERT INTO files ( tenant_id, device_id, parent_file_id, file_kind, relative_path, directory_path, original_filename, display_filename, basename, extension, mime_type, size_bytes, sha256, capture_date, uploaded_at, is_immutable, is_deleted, deleted_at ) VALUES (%s, %s, %s, 'image_processed', %s, %s, %s, %s, %s, %s, %s, %s, %s, NULL, UTC_TIMESTAMP(), 1, 0, NULL) """, ( row["tenant_id"], row["device_id"], row["id"], out_rel, out_dir_rel, out_name, out_name, Path(out_name).stem, requested_ext, mime, size_bytes, sha256, ), ) new_id = cur.lastrowid db.commit() processed.append({ "id": new_id, "filename": out_name, "relative_path": out_rel, "size_bytes": size_bytes, }) return jsonify({"ok": True, "processed": processed}) @bp.route("/workspace/pdf-report/preview/") @portal_session_required def pdf_report_preview(filename: str): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" file_path = staging_dir / filename try: file_path = file_path.resolve() staging_resolved = staging_dir.resolve() if staging_resolved not in file_path.parents and file_path != staging_resolved: abort(403) if not file_path.exists() or not file_path.is_file(): abort(404) if file_path.suffix.lower() not in [".jpg", ".jpeg", ".png", ".webp"]: abort(404) return send_file(file_path) except Exception: abort(404) def _prepare_report_image_for_pdf_v2(source_path, quality_mode="standard", rotation_degrees=0): modes = { "high": {"max_px": 2400, "quality": 82}, "standard": {"max_px": 1600, "quality": 70}, "compressed": {"max_px": 1200, "quality": 55}, } cfg = modes.get(quality_mode, modes["standard"]) with PILImage.open(source_path) as im: im = ImageOps.exif_transpose(im) try: rotation_degrees = int(rotation_degrees or 0) except ValueError: rotation_degrees = 0 if rotation_degrees in (90, 180, 270): im = im.rotate(-rotation_degrees, expand=True) if im.mode in ("RGBA", "LA", "P"): bg = PILImage.new("RGB", im.size, (255, 255, 255)) if im.mode == "P": im = im.convert("RGBA") bg.paste(im, mask=im.split()[-1] if im.mode in ("RGBA", "LA") else None) im = bg else: im = im.convert("RGB") max_px = cfg["max_px"] if max(im.size) > max_px: ratio = max_px / float(max(im.size)) im = im.resize((max(1, int(im.width * ratio)), max(1, int(im.height * ratio))), PILImage.LANCZOS) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") tmp.close() im.save(tmp.name, "JPEG", quality=cfg["quality"], optimize=True, progressive=True) return tmp.name, im.width, im.height @bp.route("/workspace/pdf-report") @portal_session_required def pdf_report_workshop(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" staging_dir.mkdir(parents=True, exist_ok=True) staged_files = [] for p in sorted(staging_dir.iterdir(), key=lambda x: x.name.lower()): if p.is_file() and p.suffix.lower() in [".jpg", ".jpeg", ".png", ".webp"]: staged_files.append({ "name": p.name, "path": str(p), }) return render_template( "cloud/pdf_report_workshop.html", staged_files=staged_files ) def _otb_pdf_footer(canvas, doc): footer_id = getattr(doc, "footer_id", "") or "" logo_path = Path(__file__).resolve().parents[1] / "static" / "otb_pdf_logo.png" canvas.saveState() y = 24 x = 42 if logo_path.exists(): try: canvas.drawImage(str(logo_path), x, y - 6, width=42, height=24, preserveAspectRatio=True, mask="auto") x += 48 except Exception: pass canvas.setFont("Helvetica", 8) footer_text = "OTB-Cloud PDF generator" if footer_id: footer_text += f" - generated for {footer_id}" canvas.drawString(x, y, footer_text) canvas.drawRightString(570, y, f"Page {doc.page}") canvas.setStrokeColor(colors.lightgrey) canvas.setLineWidth(0.25) canvas.line(42, 38, 570, 38) canvas.restoreState() @bp.route("/workspace/pdf-report/create", methods=["POST"]) @portal_session_required def create_pdf_report(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" exports_dir = tenant_root / "exports" exports_dir.mkdir(parents=True, exist_ok=True) report_name = (request.form.get("report_name") or "").strip() if not report_name: report_name = f"otb-report-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}" report_name = re.sub(r"[^A-Za-z0-9._-]+", "_", report_name).strip("._-") if not report_name: report_name = f"otb-report-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}" quality_mode = (request.form.get("quality_mode") or "standard").strip().lower() if quality_mode not in ["high", "standard", "compressed"]: quality_mode = "standard" footer_id = (request.form.get("footer_id") or "").strip() excluded = set(request.form.getlist("excluded_files")) total_files = int(request.form.get("total_files") or 0) selected = [] for idx in range(total_files): filename = request.form.get(f"file_{idx}", "").strip() if not filename or filename in excluded: continue candidate = (staging_dir / filename).resolve() try: staging_resolved = staging_dir.resolve() if staging_resolved not in candidate.parents: continue if not candidate.exists() or not candidate.is_file(): continue if candidate.suffix.lower() not in [".jpg", ".jpeg", ".png", ".webp"]: continue except Exception: continue selected.append((idx, candidate)) if not selected: flash("No images selected for PDF report.", "warning") return redirect(url_for("main.pdf_report_workshop")) pdf_path = exports_dir / f"{report_name}.pdf" job_title = request.form.get("job_title") or "Job Report" customer = request.form.get("customer", "") address = request.form.get("address", "") technician = request.form.get("technician", "") report_date = request.form.get("date", "") doc = SimpleDocTemplate( str(pdf_path), pagesize=letter, topMargin=54, bottomMargin=54, leftMargin=42, rightMargin=42, ) doc.title = job_title doc.author = "OTB Cloud" doc.creator = "OTB Cloud PDF Report Workshop" doc.subject = "Job Report" doc.footer_id = footer_id styles = getSampleStyleSheet() title_style = ParagraphStyle( "OTBTitle", parent=styles["Title"], fontSize=20, leading=24, spaceAfter=16, ) meta_style = ParagraphStyle( "OTBMeta", parent=styles["Normal"], fontSize=10.5, leading=13, ) contents_style = ParagraphStyle( "OTBContents", parent=styles["Normal"], fontSize=8.5, leading=10, ) elements = [] pdf_temp_files = [] first_contents_capacity = 20 continued_contents_capacity = 32 remaining_after_first = max(0, len(selected) - first_contents_capacity) continued_pages = 0 if remaining_after_first: continued_pages = (remaining_after_first + continued_contents_capacity - 1) // continued_contents_capacity contents_pages = 1 + continued_pages elements.append(Paragraph(xml_escape(job_title), title_style)) elements.append(Spacer(1, 14)) elements.append(Paragraph(f"Customer: {xml_escape(customer)}", meta_style)) elements.append(Paragraph(f"Address: {xml_escape(address)}", meta_style)) elements.append(Paragraph(f"Technician: {xml_escape(technician)}", meta_style)) elements.append(Paragraph(f"Date: {xml_escape(report_date)}", meta_style)) elements.append(Paragraph(f"PDF quality: {xml_escape(quality_mode)}", meta_style)) elements.append(Paragraph(f"Images included: {len(selected)}", meta_style)) elements.append(Spacer(1, 105)) elements.append(Paragraph("Contents", styles["Heading2"])) elements.append(Spacer(1, 6)) for pos, (idx, img_path) in enumerate(selected[:first_contents_capacity], start=1): display_name = img_path.name.split("__", 1)[1] if "__" in img_path.name else img_path.name caption = request.form.get(f"caption_{idx}", "").strip() label = caption if caption else display_name image_page = contents_pages + pos elements.append(Paragraph(f"{pos}. {xml_escape(label)} — page {image_page}", contents_style)) listed = first_contents_capacity while listed < len(selected): elements.append(PageBreak()) elements.append(Paragraph("Contents continued", styles["Heading2"])) elements.append(Spacer(1, 8)) chunk = selected[listed:listed + continued_contents_capacity] for offset, (idx, img_path) in enumerate(chunk, start=1): pos = listed + offset display_name = img_path.name.split("__", 1)[1] if "__" in img_path.name else img_path.name caption = request.form.get(f"caption_{idx}", "").strip() label = caption if caption else display_name image_page = contents_pages + pos elements.append(Paragraph(f"{pos}. {xml_escape(label)} — page {image_page}", contents_style)) listed += continued_contents_capacity elements.append(PageBreak()) try: for pos, (idx, img_path) in enumerate(selected, start=1): display_name = img_path.name.split("__", 1)[1] if "__" in img_path.name else img_path.name caption = request.form.get(f"caption_{idx}", "").strip() notes = request.form.get(f"notes_{idx}", "").strip() rotation = request.form.get(f"rotation_{idx}", "0") elements.append(Paragraph(xml_escape(display_name), styles["Heading3"])) elements.append(Spacer(1, 8)) try: tmp_img, w, h = _prepare_report_image_for_pdf_v2(img_path, quality_mode, rotation) pdf_temp_files.append(tmp_img) draw_w, draw_h = _pdf_scaled_dimensions(w, h, 430, 455) elements.append(ReportLabImage(tmp_img, width=draw_w, height=draw_h)) except Exception as e: elements.append(Paragraph(f"Image failed to load: {xml_escape(str(e))}", styles["Normal"])) elements.append(Spacer(1, 10)) if caption: elements.append(Paragraph(f"Caption: {xml_escape(caption)}", styles["Normal"])) if notes: elements.append(Paragraph(f"Notes: {xml_escape(notes)}", styles["Normal"])) if pos != len(selected): elements.append(PageBreak()) doc.build(elements, onFirstPage=_otb_pdf_footer, onLaterPages=_otb_pdf_footer) finally: for tmp_path in pdf_temp_files: try: os.unlink(tmp_path) except OSError: pass flash(f"PDF Report created: {report_name}.pdf", "success") return redirect(url_for("main.zip_workspace")) @bp.route("/workspace/exports//delete", methods=["POST"]) @portal_session_required def delete_export(filename: str): exports_dir = _tenant_root() / "exports" file_path = exports_dir / filename if file_path.exists() and file_path.is_file(): file_path.unlink(missing_ok=True) flash(f"Deleted export '{filename}'", "success") return redirect(url_for("main.zip_workspace"))