from functools import wraps from pathlib import Path from datetime import datetime, timezone import shutil import zipfile from PIL import Image import re from werkzeug.utils import secure_filename from flask import Blueprint, flash, redirect, render_template, request, session, url_for, current_app, send_file from app.db import get_db from app.auth.utils import create_device_directories, remove_device_directories, slugify_device_name, compute_sha256 bp = Blueprint("main", __name__) def portal_session_required(view_func): @wraps(view_func) def wrapped(*args, **kwargs): if "otb_user_id" not in session or "otb_tenant_id" not in session: return redirect(url_for("auth.login_required_notice")) return view_func(*args, **kwargs) return wrapped def _client_ip(): return request.headers.get("X-Forwarded-For", request.remote_addr) def _tenant_root() -> Path: return Path(current_app.config["STORAGE_ROOT"]) / "tenants" / session["otb_tenant_slug"] def _stored_name(original_name: str) -> str: safe = secure_filename(original_name or "upload.bin") if not safe: safe = "upload.bin" ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") return f"{ts}__{safe}" def _recovered_filename(original_name: str) -> tuple[str, str, str]: base_name = original_name.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] if "." in base_name: basename, extension = base_name.rsplit(".", 1) recovered_name = f"{basename}-recovered.{extension}" return recovered_name, f"{basename}-recovered", extension recovered_name = f"{base_name}-recovered" return recovered_name, f"{base_name}-recovered", "" def _display_filename(file_row) -> str: if not file_row: return "download.bin" return (file_row.get("display_filename") or file_row.get("original_filename") or "download.bin").strip() def _sanitize_display_basename(raw_name: str) -> str: cleaned = (raw_name or "").replace("\x00", "").strip() cleaned = re.sub(r'[\\/:*?"<>|]+', "", cleaned) cleaned = re.sub(r"\s+", " ", cleaned) cleaned = cleaned.strip().strip(".") if "." in cleaned: cleaned = cleaned.rsplit(".", 1)[0].strip() return cleaned[:200] def _generate_thumbnail(original_path: Path, thumb_path: Path): thumb_path.parent.mkdir(parents=True, exist_ok=True) try: with Image.open(original_path) as img: img.thumbnail((400, 400)) img.save(thumb_path) except Exception: pass def _normalize_browser_path(raw_path: str) -> str: raw = (raw_path or "").replace("\\", "/").strip().strip("/") if not raw: return "" parts = [] for part in raw.split("/"): part = part.strip() if not part or part in (".", ".."): continue parts.append(part) return "/".join(parts) def _safe_path_from_relative(relative_path: str) -> Path: return _tenant_root() / relative_path def _get_device_for_tenant(db, device_id: int): with db.cursor() as cur: cur.execute( """ SELECT id, device_name, device_type, relative_path FROM devices WHERE id = %s AND tenant_id = %s """, (device_id, session["otb_tenant_id"]), ) return cur.fetchone() def _purge_expired_deleted_files(db): expired = [] with db.cursor() as cur: cur.execute( """ SELECT id, relative_path, original_filename FROM files WHERE tenant_id = %s AND is_deleted = 1 AND deleted_at IS NOT NULL AND deleted_at <= (UTC_TIMESTAMP() - INTERVAL 24 HOUR) """, (session["otb_tenant_id"],), ) expired = cur.fetchall() for row in expired: file_path = _safe_path_from_relative(row["relative_path"]) if file_path.exists(): try: file_path.unlink() except FileNotFoundError: pass cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'system', 'deleted_file_purged', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], row["id"], _client_ip(), request.headers.get("User-Agent", ""), f"Purged deleted file '{row['original_filename']}' after retention window", ), ) cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (row["id"], session["otb_tenant_id"])) db.commit() @bp.route("/") def index(): if "otb_user_id" in session: return redirect(url_for("main.dashboard")) return redirect(url_for("auth.login_required_notice")) @bp.route("/dashboard") @portal_session_required def dashboard(): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, device_name, device_type, relative_path, is_active, created_at FROM devices WHERE tenant_id = %s ORDER BY id """, (session["otb_tenant_id"],), ) devices = cur.fetchall() return render_template( "cloud/dashboard.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), devices=devices, ) @bp.route("/devices/android/new", methods=["GET", "POST"]) @portal_session_required def create_android_device(): db = get_db() if request.method == "GET": return render_template( "cloud/android_device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), ) device_name = (request.form.get("device_name") or "").strip() if not device_name: flash("Device name is required.", "warning") return redirect(url_for("main.create_android_device")) slug = slugify_device_name(device_name) relative_path = f"devices/{slug}" import hashlib, secrets, datetime raw_token = secrets.token_hex(16) token_hash = hashlib.sha256(raw_token.encode()).hexdigest() expires_at = datetime.datetime.utcnow() + datetime.timedelta(hours=48) with db.cursor() as cur: # create device bucket cur.execute( """ INSERT INTO devices (tenant_id, device_name, device_type, relative_path) VALUES (%s, %s, %s, %s) """, (session["otb_tenant_id"], device_name, "android", relative_path), ) device_id = cur.lastrowid # create token cur.execute( """ INSERT INTO android_device_tokens (tenant_id, device_id, token_hash, device_label, expires_at) VALUES (%s, %s, %s, %s, %s) """, (session["otb_tenant_id"], device_id, token_hash, device_name, expires_at), ) db.commit() flash(f"Activation token (valid 48h): {raw_token}", "success") flash("⚠️ This token must be used within 48 hours or it will expire.", "warning") return redirect(url_for("main.dashboard")) @bp.route("/devices/new", methods=["GET", "POST"]) @portal_session_required def add_device(): if request.method == "GET": return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), ) device_name = (request.form.get("device_name") or "").strip() device_type = (request.form.get("device_type") or "").strip() if not device_name: flash("Device name is required.", "warning") return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device_name=device_name, device_type=device_type, ) if not device_type: flash("Device type is required.", "warning") return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device_name=device_name, device_type=device_type, ) slug = slugify_device_name(device_name) relative_path = f"devices/{slug}" db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id FROM devices WHERE tenant_id = %s AND device_name = %s """, (session["otb_tenant_id"], device_name), ) existing = cur.fetchone() if existing: flash("A device with that name already exists.", "warning") return render_template( "cloud/device_new.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device_name=device_name, device_type=device_type, ) cur.execute( """ INSERT INTO devices (tenant_id, device_name, device_type, relative_path, is_active) VALUES (%s, %s, %s, %s, 1) """, (session["otb_tenant_id"], device_name, device_type, relative_path), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'device_created', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Created device '{device_name}' ({device_type}) at {relative_path}", ), ) db.commit() create_device_directories(session["otb_tenant_slug"], relative_path) flash("Device added successfully.", "success") return redirect(url_for("main.dashboard")) @bp.route("/devices/delete/", methods=["POST"]) @portal_session_required def delete_device(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") return redirect(url_for("main.dashboard")) if device.get("device_type") == "android": flash("Android backup devices only accept uploads from the OTB Cloud mobile app.", "warning") return redirect(url_for("main.dashboard")) view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" with db.cursor() as cur: cur.execute( """ SELECT COUNT(*) AS file_count FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 """, (session["otb_tenant_id"], device_id), ) file_count = cur.fetchone()["file_count"] if file_count and int(file_count) > 0: flash("This device cannot be removed because files are still linked to it.", "warning") return redirect(url_for("main.dashboard")) cur.execute( """ DELETE FROM devices WHERE id = %s AND tenant_id = %s """, (device_id, session["otb_tenant_id"]), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'device_deleted', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Deleted device '{device['device_name']}' ({device['device_type']}) at {device['relative_path']}", ), ) db.commit() remove_device_directories(session["otb_tenant_slug"], device["relative_path"]) flash("Device removed successfully.", "success") return redirect(url_for("main.dashboard")) @bp.route("/devices//upload", methods=["GET", "POST"]) @portal_session_required def upload_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") return redirect(url_for("main.dashboard")) if device.get("device_type") == "android": flash("Android backup devices only accept uploads from the OTB Cloud mobile app.", "warning") return redirect(url_for("main.dashboard")) view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) if request.method == "GET": return render_template( "cloud/upload.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device=device, ) files = request.files.getlist("files") files = [f for f in files if f and f.filename] if not files: flash("Please choose at least one file to upload.", "warning") return render_template( "cloud/upload.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device=device, ) upload_base = _tenant_root() / device["relative_path"] / "originals" upload_base.mkdir(parents=True, exist_ok=True) uploaded_count = 0 with db.cursor() as cur: for incoming in files: original_filename = incoming.filename or "upload.bin" # preserve folder structure from browser relative_upload_path = original_filename.replace("\\", "/") path_parts = relative_upload_path.split("/") if len(path_parts) > 1: subdirs = "/".join(path_parts[:-1]) filename_only = path_parts[-1] else: subdirs = "" filename_only = path_parts[0] stored_name = _stored_name(filename_only) target_dir = upload_base if subdirs: target_dir = upload_base / subdirs target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / stored_name incoming.save(target_path) size_bytes = target_path.stat().st_size sha256 = compute_sha256(target_path) base_name = original_filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] if "." in base_name: basename, extension = base_name.rsplit(".", 1) else: basename, extension = base_name, "" if subdirs: relative_path = f"{device['relative_path']}/originals/{subdirs}/{stored_name}" directory_path = f"{device['relative_path']}/originals/{subdirs}" else: relative_path = f"{device['relative_path']}/originals/{stored_name}" directory_path = f"{device['relative_path']}/originals" cur.execute( """ INSERT INTO files ( tenant_id, device_id, parent_file_id, file_kind, relative_path, directory_path, original_filename, basename, extension, mime_type, size_bytes, sha256, capture_date, uploaded_at, is_immutable, is_deleted, deleted_at ) VALUES ( %s, %s, NULL, 'original', %s, %s, %s, %s, %s, %s, %s, %s, NULL, UTC_TIMESTAMP(), 1, 0, NULL ) """, ( session["otb_tenant_id"], device["id"], relative_path, directory_path, original_filename, basename, extension, incoming.mimetype or None, size_bytes, sha256, ), ) file_id = cur.lastrowid cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_uploaded', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Uploaded '{original_filename}' to {relative_path}", ), ) uploaded_count += 1 db.commit() flash(f"Uploaded {uploaded_count} file(s) to {device['device_name']}.", "success") return redirect(url_for("main.dashboard")) @bp.route("/files//thumb", methods=["GET"]) @portal_session_required def thumbnail_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, display_filename, relative_path, mime_type FROM files WHERE id = %s AND tenant_id = %s AND is_deleted = 0 """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: return "", 404 original_path = _safe_path_from_relative(file_row["relative_path"]) thumb_rel = file_row["relative_path"].replace("originals/", "derived/thumbs/") thumb_path = _safe_path_from_relative(thumb_rel) if not thumb_path.exists(): _generate_thumbnail(original_path, thumb_path) if thumb_path.exists(): return send_file(thumb_path, mimetype=file_row.get("mime_type"), as_attachment=False) return send_file(original_path, mimetype=file_row.get("mime_type"), as_attachment=False) @bp.route("/files//inline", methods=["GET"]) @portal_session_required def inline_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, display_filename, relative_path, mime_type FROM files WHERE id = %s AND tenant_id = %s AND is_deleted = 0 """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: flash("File not found.", "warning") return redirect(url_for("main.dashboard")) file_path = _safe_path_from_relative(file_row["relative_path"]) if not file_path.exists(): flash("File is missing from storage.", "warning") return redirect(url_for("main.dashboard")) return send_file( file_path, mimetype=file_row.get("mime_type") or None, as_attachment=False, download_name=_display_filename(file_row), conditional=True, ) @bp.route("/files//download", methods=["GET"]) @portal_session_required def download_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, display_filename, relative_path FROM files WHERE id = %s AND tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: flash("File not found.", "warning") return redirect(url_for("main.dashboard")) file_path = _safe_path_from_relative(file_row["relative_path"]) if not file_path.exists(): flash("File is missing from storage.", "warning") return redirect(url_for("main.dashboard")) return send_file(file_path, as_attachment=True, download_name=_display_filename(file_row)) @bp.route("/devices//files/download-selected", methods=["POST"]) @portal_session_required def download_selected_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] if not selected_ids: flash("Select at least one file first.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) if len(selected_ids) != 1: flash("Download Selected currently supports one file at a time. Use Zip Workspace for multiple files.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) return redirect(url_for("main.download_file", file_id=selected_ids[0])) @bp.route("/devices//files/delete-selected", methods=["POST"]) @portal_session_required def delete_selected_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] if not selected_ids: flash("Select at least one file first.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) deleted_count = 0 with db.cursor() as cur: for file_id in selected_ids: cur.execute( """ SELECT id, original_filename, relative_path, directory_path, is_deleted FROM files WHERE id = %s AND tenant_id = %s AND device_id = %s """, (file_id, session["otb_tenant_id"], device_id), ) file_row = cur.fetchone() if not file_row or file_row["is_deleted"]: continue source_path = _safe_path_from_relative(file_row["relative_path"]) target_rel = f"{device['relative_path']}/deleted/{_stored_name(file_row['original_filename'])}" target_path = _safe_path_from_relative(target_rel) target_path.parent.mkdir(parents=True, exist_ok=True) if source_path.exists(): shutil.move(str(source_path), str(target_path)) cur.execute( """ UPDATE files SET relative_path = %s, directory_path = %s, is_deleted = 1, deleted_at = UTC_TIMESTAMP() WHERE id = %s AND tenant_id = %s """, ( target_rel, f"{device['relative_path']}/deleted", file_id, session["otb_tenant_id"], ), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_soft_deleted', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Soft-deleted '{file_row['original_filename']}' into deleted area", ), ) deleted_count += 1 db.commit() flash(f"Deleted {deleted_count} file(s). Deleted files are retained for up to 24 hours unless hard-deleted.", "success") return redirect(url_for("main.browse_device_files", device_id=device_id)) @bp.route("/devices//files/send-to-zip", methods=["POST"]) @portal_session_required def send_selected_to_zip_workspace(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" return redirect(url_for("main.dashboard")) selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] if not selected_ids: flash("Select at least one file first.", "warning") return redirect(url_for("main.browse_device_files", device_id=device_id)) staging_dir = _tenant_root() / "zip_staging" staging_dir.mkdir(parents=True, exist_ok=True) copied_count = 0 with db.cursor() as cur: for file_id in selected_ids: cur.execute( """ SELECT id, original_filename, display_filename, relative_path, is_deleted FROM files WHERE id = %s AND tenant_id = %s AND device_id = %s """, (file_id, session["otb_tenant_id"], device_id), ) file_row = cur.fetchone() if not file_row or file_row["is_deleted"]: continue source_path = _safe_path_from_relative(file_row["relative_path"]) if not source_path.exists(): continue target_name = _stored_name(_display_filename(file_row)) target_path = staging_dir / target_name shutil.copy2(source_path, target_path) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_staged_for_zip', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Copied '{_display_filename(file_row)}' into zip workspace", ), ) copied_count += 1 db.commit() flash(f"Sent {copied_count} file(s) to Zip Workspace.", "success") return redirect(url_for("main.browse_device_files", device_id=device_id)) @bp.route("/workspace/zip", methods=["GET"]) @portal_session_required def zip_workspace(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" exports_dir = tenant_root / "exports" staging_dir.mkdir(parents=True, exist_ok=True) exports_dir.mkdir(parents=True, exist_ok=True) staged_files = [] for p in sorted(staging_dir.iterdir(), key=lambda x: x.name.lower()): if p.is_file(): staged_files.append({ "name": p.name, "size_bytes": p.stat().st_size, "path": str(p.relative_to(tenant_root)), }) export_files = [] for p in sorted(exports_dir.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file(): export_files.append({ "name": p.name, "size_bytes": p.stat().st_size, "path": str(p.relative_to(tenant_root)), }) return render_template( "cloud/zip_workspace.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), staged_files=staged_files, export_files=export_files, ) @bp.route("/workspace/zip/create", methods=["POST"]) @portal_session_required def create_zip_from_workspace(): tenant_root = _tenant_root() staging_dir = tenant_root / "zip_staging" exports_dir = tenant_root / "exports" staging_dir.mkdir(parents=True, exist_ok=True) exports_dir.mkdir(parents=True, exist_ok=True) staged = [p for p in staging_dir.iterdir() if p.is_file()] if not staged: flash("Zip Workspace is empty.", "warning") return redirect(url_for("main.zip_workspace")) zip_name = f"otb-cloud-export-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}.zip" zip_path = exports_dir / zip_name with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for p in staged: arcname = p.name.split("__", 1)[1] if "__" in p.name else p.name zf.write(p, arcname=arcname) for p in staged: p.unlink(missing_ok=True) db = get_db() with db.cursor() as cur: cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'zip_created', %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], _client_ip(), request.headers.get("User-Agent", ""), f"Created zip export '{zip_name}' in exports workspace; staged copies cleared after success", ), ) db.commit() flash(f"Zip file created successfully. You can find it in the Exports section below as '{zip_name}'.", "success") return redirect(url_for("main.zip_workspace")) @bp.route("/workspace/exports//download", methods=["GET"]) @portal_session_required def download_export(filename: str): exports_dir = _tenant_root() / "exports" file_path = exports_dir / filename if not file_path.exists() or not file_path.is_file(): flash("Export file not found.", "warning") return redirect(url_for("main.zip_workspace")) return send_file(file_path, as_attachment=True, download_name=file_path.name) @bp.route("/deleted", methods=["GET"]) @portal_session_required def deleted_files(): db = get_db() _purge_expired_deleted_files(db) with db.cursor() as cur: cur.execute( """ SELECT f.id, f.original_filename, f.relative_path, f.size_bytes, f.deleted_at, d.device_name, d.device_type FROM files f LEFT JOIN devices d ON f.device_id = d.id WHERE f.tenant_id = %s AND f.is_deleted = 1 ORDER BY f.deleted_at DESC, f.id DESC """, (session["otb_tenant_id"],), ) files = cur.fetchall() return render_template( "cloud/deleted_files.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), files=files, ) @bp.route("/deleted//recover", methods=["POST"]) @portal_session_required def recover_deleted_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT f.id, f.device_id, f.original_filename, f.relative_path, f.is_deleted, d.device_name, d.device_type, d.relative_path AS device_relative_path FROM files f LEFT JOIN devices d ON f.device_id = d.id WHERE f.id = %s AND f.tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row or not file_row["is_deleted"]: flash("Deleted file not found.", "warning") return redirect(url_for("main.deleted_files")) if not file_row["device_relative_path"]: flash("Cannot recover this file because its device record is missing.", "warning") return redirect(url_for("main.deleted_files")) source_path = _safe_path_from_relative(file_row["relative_path"]) if not source_path.exists(): flash("Deleted file is missing from storage.", "warning") return redirect(url_for("main.deleted_files")) recovered_name, recovered_basename, recovered_extension = _recovered_filename(file_row["original_filename"]) stored_name = _stored_name(recovered_name) target_rel = f"{file_row['device_relative_path']}/originals/{stored_name}" target_dir = f"{file_row['device_relative_path']}/originals" target_path = _safe_path_from_relative(target_rel) target_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source_path), str(target_path)) cur.execute( """ UPDATE files SET original_filename = %s, display_filename = NULL, basename = %s, extension = %s, relative_path = %s, directory_path = %s, is_deleted = 0, deleted_at = NULL WHERE id = %s AND tenant_id = %s """, ( recovered_name, recovered_basename, recovered_extension, target_rel, target_dir, file_id, session["otb_tenant_id"], ), ) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_recovered', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Recovered deleted file as '{recovered_name}' back into originals", ), ) db.commit() flash(f"Recovered file as '{recovered_name}'. You can find it back in the device file browser.", "success") return redirect(url_for("main.deleted_files")) @bp.route("/deleted//hard-delete", methods=["POST"]) @portal_session_required def hard_delete_file(file_id: int): db = get_db() with db.cursor() as cur: cur.execute( """ SELECT id, original_filename, relative_path, is_deleted FROM files WHERE id = %s AND tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row or not file_row["is_deleted"]: flash("Deleted file not found.", "warning") return redirect(url_for("main.deleted_files")) file_path = _safe_path_from_relative(file_row["relative_path"]) if file_path.exists(): file_path.unlink(missing_ok=True) cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_hard_deleted', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Hard-deleted '{file_row['original_filename']}' immediately from deleted area", ), ) cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (file_id, session["otb_tenant_id"])) db.commit() flash("File permanently deleted.", "success") return redirect(url_for("main.deleted_files")) @bp.route("/files//rename", methods=["POST"]) @portal_session_required def rename_file(file_id: int): db = get_db() requested_basename = _sanitize_display_basename(request.form.get("display_basename") or "") with db.cursor() as cur: cur.execute( """ SELECT id, device_id, original_filename, display_filename, extension, is_deleted FROM files WHERE id = %s AND tenant_id = %s """, (file_id, session["otb_tenant_id"]), ) file_row = cur.fetchone() if not file_row: flash("File not found.", "warning") return redirect(url_for("main.dashboard")) if file_row["is_deleted"]: flash("You can only rename active files from the device file browser.", "warning") return redirect(url_for("main.deleted_files")) if not requested_basename: flash("Please enter a new file name.", "warning") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) new_visible_name = ( f"{requested_basename}.{file_row['extension']}" if file_row["extension"] else requested_basename ) if len(new_visible_name) > 255: flash("That file name is too long.", "warning") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) current_visible_name = _display_filename(file_row) if new_visible_name == current_visible_name: flash("That file already has that name.", "warning") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) display_filename = None if new_visible_name == file_row["original_filename"] else new_visible_name cur.execute( """ UPDATE files SET display_filename = %s WHERE id = %s AND tenant_id = %s """, (display_filename, file_id, session["otb_tenant_id"]), ) final_visible_name = display_filename or file_row["original_filename"] cur.execute( """ INSERT INTO audit_logs ( tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail ) VALUES (%s, %s, 'user', 'file_display_renamed', %s, %s, %s, %s) """, ( session["otb_tenant_id"], session["otb_user_id"], file_id, _client_ip(), request.headers.get("User-Agent", ""), f"Updated display filename from '{current_visible_name}' to '{final_visible_name}'", ), ) db.commit() if display_filename is None: flash("Custom name cleared. The original file name is now shown again.", "success") else: flash(f"File renamed to '{display_filename}'.", "success") return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) @bp.route("/devices//files", methods=["GET"]) @portal_session_required def browse_device_files(device_id: int): db = get_db() device = _get_device_for_tenant(db, device_id) if not device: flash("Device not found.", "warning") return redirect(url_for("main.dashboard")) view_mode = request.args.get("view", "list").strip().lower() if view_mode not in ("list", "gallery"): view_mode = "list" current_path = _normalize_browser_path(request.args.get("path", "")) root_directory = f"{device['relative_path']}/originals" current_directory = f"{root_directory}/{current_path}" if current_path else root_directory with db.cursor() as cur: cur.execute( """ SELECT id, file_kind, relative_path, directory_path, original_filename, display_filename, basename, extension, mime_type, size_bytes, sha256, uploaded_at, is_immutable FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 AND directory_path = %s ORDER BY uploaded_at DESC, id DESC """, (session["otb_tenant_id"], device_id, current_directory), ) files = cur.fetchall() cur.execute( """ SELECT DISTINCT directory_path FROM files WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 AND directory_path LIKE %s ORDER BY directory_path """, (session["otb_tenant_id"], device_id, f"{current_directory}/%"), ) all_dirs = [row["directory_path"] for row in cur.fetchall()] folder_names = [] prefix = current_directory + "/" for d in all_dirs: remainder = d[len(prefix):] if d.startswith(prefix) else "" if not remainder: continue first_segment = remainder.split("/", 1)[0] if first_segment and first_segment not in folder_names: folder_names.append(first_segment) folders = [] for name in sorted(folder_names, key=lambda x: x.lower()): folder_path = f"{current_path}/{name}" if current_path else name folders.append( { "name": name, "path": folder_path, } ) breadcrumbs = [ { "label": device["device_name"], "path": "", } ] if current_path: accum = [] for segment in current_path.split("/"): accum.append(segment) breadcrumbs.append( { "label": segment, "path": "/".join(accum), } ) parent_path = "" if current_path: parts = current_path.split("/") parent_path = "/".join(parts[:-1]) return render_template( "cloud/device_files.html", user_email=session.get("otb_email"), tenant_slug=session.get("otb_tenant_slug"), device=device, files=files, file_count=len(files), view_mode=view_mode, current_path=current_path, parent_path=parent_path, folders=folders, breadcrumbs=breadcrumbs, )