otb-cloud secure encrypted backups
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

1212 lines
40 KiB

from functools import wraps
from pathlib import Path
from datetime import datetime, timezone
import shutil
import zipfile
from PIL import Image
import re
from werkzeug.utils import secure_filename
from flask import Blueprint, flash, redirect, render_template, request, session, url_for, current_app, send_file
from app.db import get_db
from app.auth.utils import create_device_directories, remove_device_directories, slugify_device_name, compute_sha256
bp = Blueprint("main", __name__)
def portal_session_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
if "otb_user_id" not in session or "otb_tenant_id" not in session:
return redirect(url_for("auth.login_required_notice"))
return view_func(*args, **kwargs)
return wrapped
def _client_ip():
return request.headers.get("X-Forwarded-For", request.remote_addr)
def _tenant_root() -> Path:
return Path(current_app.config["STORAGE_ROOT"]) / "tenants" / session["otb_tenant_slug"]
def _stored_name(original_name: str) -> str:
safe = secure_filename(original_name or "upload.bin")
if not safe:
safe = "upload.bin"
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
return f"{ts}__{safe}"
def _recovered_filename(original_name: str) -> tuple[str, str, str]:
base_name = original_name.rsplit("/", 1)[-1].rsplit("\\", 1)[-1]
if "." in base_name:
basename, extension = base_name.rsplit(".", 1)
recovered_name = f"{basename}-recovered.{extension}"
return recovered_name, f"{basename}-recovered", extension
recovered_name = f"{base_name}-recovered"
return recovered_name, f"{base_name}-recovered", ""
def _display_filename(file_row) -> str:
if not file_row:
return "download.bin"
return (file_row.get("display_filename") or file_row.get("original_filename") or "download.bin").strip()
def _sanitize_display_basename(raw_name: str) -> str:
cleaned = (raw_name or "").replace("\x00", "").strip()
cleaned = re.sub(r'[\\/:*?"<>|]+', "", cleaned)
cleaned = re.sub(r"\s+", " ", cleaned)
cleaned = cleaned.strip().strip(".")
if "." in cleaned:
cleaned = cleaned.rsplit(".", 1)[0].strip()
return cleaned[:200]
def _generate_thumbnail(original_path: Path, thumb_path: Path):
thumb_path.parent.mkdir(parents=True, exist_ok=True)
try:
with Image.open(original_path) as img:
img.thumbnail((400, 400))
img.save(thumb_path)
except Exception:
pass
def _normalize_browser_path(raw_path: str) -> str:
raw = (raw_path or "").replace("\\", "/").strip().strip("/")
if not raw:
return ""
parts = []
for part in raw.split("/"):
part = part.strip()
if not part or part in (".", ".."):
continue
parts.append(part)
return "/".join(parts)
def _safe_path_from_relative(relative_path: str) -> Path:
return _tenant_root() / relative_path
def _get_device_for_tenant(db, device_id: int):
with db.cursor() as cur:
cur.execute(
"""
SELECT id, device_name, device_type, relative_path
FROM devices
WHERE id = %s AND tenant_id = %s
""",
(device_id, session["otb_tenant_id"]),
)
return cur.fetchone()
def _purge_expired_deleted_files(db):
expired = []
with db.cursor() as cur:
cur.execute(
"""
SELECT id, relative_path, original_filename
FROM files
WHERE tenant_id = %s
AND is_deleted = 1
AND deleted_at IS NOT NULL
AND deleted_at <= (UTC_TIMESTAMP() - INTERVAL 24 HOUR)
""",
(session["otb_tenant_id"],),
)
expired = cur.fetchall()
for row in expired:
file_path = _safe_path_from_relative(row["relative_path"])
if file_path.exists():
try:
file_path.unlink()
except FileNotFoundError:
pass
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'system', 'deleted_file_purged', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
row["id"],
_client_ip(),
request.headers.get("User-Agent", ""),
f"Purged deleted file '{row['original_filename']}' after retention window",
),
)
cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (row["id"], session["otb_tenant_id"]))
db.commit()
@bp.route("/")
def index():
if "otb_user_id" in session:
return redirect(url_for("main.dashboard"))
return redirect(url_for("auth.login_required_notice"))
@bp.route("/dashboard")
@portal_session_required
def dashboard():
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT id, device_name, device_type, relative_path, is_active, created_at
FROM devices
WHERE tenant_id = %s
ORDER BY id
""",
(session["otb_tenant_id"],),
)
devices = cur.fetchall()
return render_template(
"cloud/dashboard.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
devices=devices,
)
@bp.route("/devices/new", methods=["GET", "POST"])
@portal_session_required
def add_device():
if request.method == "GET":
return render_template(
"cloud/device_new.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
)
device_name = (request.form.get("device_name") or "").strip()
device_type = (request.form.get("device_type") or "").strip()
if not device_name:
flash("Device name is required.", "warning")
return render_template(
"cloud/device_new.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
device_name=device_name,
device_type=device_type,
)
if not device_type:
flash("Device type is required.", "warning")
return render_template(
"cloud/device_new.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
device_name=device_name,
device_type=device_type,
)
slug = slugify_device_name(device_name)
relative_path = f"devices/{slug}"
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT id
FROM devices
WHERE tenant_id = %s AND device_name = %s
""",
(session["otb_tenant_id"], device_name),
)
existing = cur.fetchone()
if existing:
flash("A device with that name already exists.", "warning")
return render_template(
"cloud/device_new.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
device_name=device_name,
device_type=device_type,
)
cur.execute(
"""
INSERT INTO devices (tenant_id, device_name, device_type, relative_path, is_active)
VALUES (%s, %s, %s, %s, 1)
""",
(session["otb_tenant_id"], device_name, device_type, relative_path),
)
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'device_created', %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
_client_ip(),
request.headers.get("User-Agent", ""),
f"Created device '{device_name}' ({device_type}) at {relative_path}",
),
)
db.commit()
create_device_directories(session["otb_tenant_slug"], relative_path)
flash("Device added successfully.", "success")
return redirect(url_for("main.dashboard"))
@bp.route("/devices/delete/<int:device_id>", methods=["POST"])
@portal_session_required
def delete_device(device_id: int):
db = get_db()
device = _get_device_for_tenant(db, device_id)
if not device:
flash("Device not found.", "warning")
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
return redirect(url_for("main.dashboard"))
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
with db.cursor() as cur:
cur.execute(
"""
SELECT COUNT(*) AS file_count
FROM files
WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0
""",
(session["otb_tenant_id"], device_id),
)
file_count = cur.fetchone()["file_count"]
if file_count and int(file_count) > 0:
flash("This device cannot be removed because files are still linked to it.", "warning")
return redirect(url_for("main.dashboard"))
cur.execute(
"""
DELETE FROM devices
WHERE id = %s AND tenant_id = %s
""",
(device_id, session["otb_tenant_id"]),
)
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'device_deleted', %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
_client_ip(),
request.headers.get("User-Agent", ""),
f"Deleted device '{device['device_name']}' ({device['device_type']}) at {device['relative_path']}",
),
)
db.commit()
remove_device_directories(session["otb_tenant_slug"], device["relative_path"])
flash("Device removed successfully.", "success")
return redirect(url_for("main.dashboard"))
@bp.route("/devices/<int:device_id>/upload", methods=["GET", "POST"])
@portal_session_required
def upload_files(device_id: int):
db = get_db()
device = _get_device_for_tenant(db, device_id)
if not device:
flash("Device not found.", "warning")
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
return redirect(url_for("main.dashboard"))
if request.method == "GET":
return render_template(
"cloud/upload.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
device=device,
)
files = request.files.getlist("files")
files = [f for f in files if f and f.filename]
if not files:
flash("Please choose at least one file to upload.", "warning")
return render_template(
"cloud/upload.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
device=device,
)
upload_base = _tenant_root() / device["relative_path"] / "originals"
upload_base.mkdir(parents=True, exist_ok=True)
uploaded_count = 0
with db.cursor() as cur:
for incoming in files:
original_filename = incoming.filename or "upload.bin"
# preserve folder structure from browser
relative_upload_path = original_filename.replace("\\", "/")
path_parts = relative_upload_path.split("/")
if len(path_parts) > 1:
subdirs = "/".join(path_parts[:-1])
filename_only = path_parts[-1]
else:
subdirs = ""
filename_only = path_parts[0]
stored_name = _stored_name(filename_only)
target_dir = upload_base
if subdirs:
target_dir = upload_base / subdirs
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / stored_name
incoming.save(target_path)
size_bytes = target_path.stat().st_size
sha256 = compute_sha256(target_path)
base_name = original_filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1]
if "." in base_name:
basename, extension = base_name.rsplit(".", 1)
else:
basename, extension = base_name, ""
if subdirs:
relative_path = f"{device['relative_path']}/originals/{subdirs}/{stored_name}"
directory_path = f"{device['relative_path']}/originals/{subdirs}"
else:
relative_path = f"{device['relative_path']}/originals/{stored_name}"
directory_path = f"{device['relative_path']}/originals"
cur.execute(
"""
INSERT INTO files (
tenant_id, device_id, parent_file_id, file_kind, relative_path, directory_path,
original_filename, basename, extension, mime_type, size_bytes, sha256,
capture_date, uploaded_at, is_immutable, is_deleted, deleted_at
) VALUES (
%s, %s, NULL, 'original', %s, %s,
%s, %s, %s, %s, %s, %s,
NULL, UTC_TIMESTAMP(), 1, 0, NULL
)
""",
(
session["otb_tenant_id"],
device["id"],
relative_path,
directory_path,
original_filename,
basename,
extension,
incoming.mimetype or None,
size_bytes,
sha256,
),
)
file_id = cur.lastrowid
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'file_uploaded', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
file_id,
_client_ip(),
request.headers.get("User-Agent", ""),
f"Uploaded '{original_filename}' to {relative_path}",
),
)
uploaded_count += 1
db.commit()
flash(f"Uploaded {uploaded_count} file(s) to {device['device_name']}.", "success")
return redirect(url_for("main.dashboard"))
@bp.route("/files/<int:file_id>/thumb", methods=["GET"])
@portal_session_required
def thumbnail_file(file_id: int):
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT id, original_filename, display_filename, relative_path, mime_type
FROM files
WHERE id = %s AND tenant_id = %s AND is_deleted = 0
""",
(file_id, session["otb_tenant_id"]),
)
file_row = cur.fetchone()
if not file_row:
return "", 404
original_path = _safe_path_from_relative(file_row["relative_path"])
thumb_rel = file_row["relative_path"].replace("originals/", "derived/thumbs/")
thumb_path = _safe_path_from_relative(thumb_rel)
if not thumb_path.exists():
_generate_thumbnail(original_path, thumb_path)
if thumb_path.exists():
return send_file(thumb_path, mimetype=file_row.get("mime_type"), as_attachment=False)
return send_file(original_path, mimetype=file_row.get("mime_type"), as_attachment=False)
@bp.route("/files/<int:file_id>/inline", methods=["GET"])
@portal_session_required
def inline_file(file_id: int):
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT id, original_filename, display_filename, relative_path, mime_type
FROM files
WHERE id = %s AND tenant_id = %s AND is_deleted = 0
""",
(file_id, session["otb_tenant_id"]),
)
file_row = cur.fetchone()
if not file_row:
flash("File not found.", "warning")
return redirect(url_for("main.dashboard"))
file_path = _safe_path_from_relative(file_row["relative_path"])
if not file_path.exists():
flash("File is missing from storage.", "warning")
return redirect(url_for("main.dashboard"))
return send_file(
file_path,
mimetype=file_row.get("mime_type") or None,
as_attachment=False,
download_name=_display_filename(file_row),
conditional=True,
)
@bp.route("/files/<int:file_id>/download", methods=["GET"])
@portal_session_required
def download_file(file_id: int):
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT id, original_filename, display_filename, relative_path
FROM files
WHERE id = %s AND tenant_id = %s
""",
(file_id, session["otb_tenant_id"]),
)
file_row = cur.fetchone()
if not file_row:
flash("File not found.", "warning")
return redirect(url_for("main.dashboard"))
file_path = _safe_path_from_relative(file_row["relative_path"])
if not file_path.exists():
flash("File is missing from storage.", "warning")
return redirect(url_for("main.dashboard"))
return send_file(file_path, as_attachment=True, download_name=_display_filename(file_row))
@bp.route("/devices/<int:device_id>/files/download-selected", methods=["POST"])
@portal_session_required
def download_selected_files(device_id: int):
db = get_db()
device = _get_device_for_tenant(db, device_id)
if not device:
flash("Device not found.", "warning")
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
return redirect(url_for("main.dashboard"))
selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()]
if not selected_ids:
flash("Select at least one file first.", "warning")
return redirect(url_for("main.browse_device_files", device_id=device_id))
if len(selected_ids) != 1:
flash("Download Selected currently supports one file at a time. Use Zip Workspace for multiple files.", "warning")
return redirect(url_for("main.browse_device_files", device_id=device_id))
return redirect(url_for("main.download_file", file_id=selected_ids[0]))
@bp.route("/devices/<int:device_id>/files/delete-selected", methods=["POST"])
@portal_session_required
def delete_selected_files(device_id: int):
db = get_db()
device = _get_device_for_tenant(db, device_id)
if not device:
flash("Device not found.", "warning")
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
return redirect(url_for("main.dashboard"))
selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()]
if not selected_ids:
flash("Select at least one file first.", "warning")
return redirect(url_for("main.browse_device_files", device_id=device_id))
deleted_count = 0
with db.cursor() as cur:
for file_id in selected_ids:
cur.execute(
"""
SELECT id, original_filename, relative_path, directory_path, is_deleted
FROM files
WHERE id = %s AND tenant_id = %s AND device_id = %s
""",
(file_id, session["otb_tenant_id"], device_id),
)
file_row = cur.fetchone()
if not file_row or file_row["is_deleted"]:
continue
source_path = _safe_path_from_relative(file_row["relative_path"])
target_rel = f"{device['relative_path']}/deleted/{_stored_name(file_row['original_filename'])}"
target_path = _safe_path_from_relative(target_rel)
target_path.parent.mkdir(parents=True, exist_ok=True)
if source_path.exists():
shutil.move(str(source_path), str(target_path))
cur.execute(
"""
UPDATE files
SET relative_path = %s,
directory_path = %s,
is_deleted = 1,
deleted_at = UTC_TIMESTAMP()
WHERE id = %s AND tenant_id = %s
""",
(
target_rel,
f"{device['relative_path']}/deleted",
file_id,
session["otb_tenant_id"],
),
)
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'file_soft_deleted', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
file_id,
_client_ip(),
request.headers.get("User-Agent", ""),
f"Soft-deleted '{file_row['original_filename']}' into deleted area",
),
)
deleted_count += 1
db.commit()
flash(f"Deleted {deleted_count} file(s). Deleted files are retained for up to 24 hours unless hard-deleted.", "success")
return redirect(url_for("main.browse_device_files", device_id=device_id))
@bp.route("/devices/<int:device_id>/files/send-to-zip", methods=["POST"])
@portal_session_required
def send_selected_to_zip_workspace(device_id: int):
db = get_db()
device = _get_device_for_tenant(db, device_id)
if not device:
flash("Device not found.", "warning")
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
return redirect(url_for("main.dashboard"))
selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()]
if not selected_ids:
flash("Select at least one file first.", "warning")
return redirect(url_for("main.browse_device_files", device_id=device_id))
staging_dir = _tenant_root() / "zip_staging"
staging_dir.mkdir(parents=True, exist_ok=True)
copied_count = 0
with db.cursor() as cur:
for file_id in selected_ids:
cur.execute(
"""
SELECT id, original_filename, display_filename, relative_path, is_deleted
FROM files
WHERE id = %s AND tenant_id = %s AND device_id = %s
""",
(file_id, session["otb_tenant_id"], device_id),
)
file_row = cur.fetchone()
if not file_row or file_row["is_deleted"]:
continue
source_path = _safe_path_from_relative(file_row["relative_path"])
if not source_path.exists():
continue
target_name = _stored_name(_display_filename(file_row))
target_path = staging_dir / target_name
shutil.copy2(source_path, target_path)
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'file_staged_for_zip', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
file_id,
_client_ip(),
request.headers.get("User-Agent", ""),
f"Copied '{_display_filename(file_row)}' into zip workspace",
),
)
copied_count += 1
db.commit()
flash(f"Sent {copied_count} file(s) to Zip Workspace.", "success")
return redirect(url_for("main.browse_device_files", device_id=device_id))
@bp.route("/workspace/zip", methods=["GET"])
@portal_session_required
def zip_workspace():
tenant_root = _tenant_root()
staging_dir = tenant_root / "zip_staging"
exports_dir = tenant_root / "exports"
staging_dir.mkdir(parents=True, exist_ok=True)
exports_dir.mkdir(parents=True, exist_ok=True)
staged_files = []
for p in sorted(staging_dir.iterdir(), key=lambda x: x.name.lower()):
if p.is_file():
staged_files.append({
"name": p.name,
"size_bytes": p.stat().st_size,
"path": str(p.relative_to(tenant_root)),
})
export_files = []
for p in sorted(exports_dir.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if p.is_file():
export_files.append({
"name": p.name,
"size_bytes": p.stat().st_size,
"path": str(p.relative_to(tenant_root)),
})
return render_template(
"cloud/zip_workspace.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
staged_files=staged_files,
export_files=export_files,
)
@bp.route("/workspace/zip/create", methods=["POST"])
@portal_session_required
def create_zip_from_workspace():
tenant_root = _tenant_root()
staging_dir = tenant_root / "zip_staging"
exports_dir = tenant_root / "exports"
staging_dir.mkdir(parents=True, exist_ok=True)
exports_dir.mkdir(parents=True, exist_ok=True)
staged = [p for p in staging_dir.iterdir() if p.is_file()]
if not staged:
flash("Zip Workspace is empty.", "warning")
return redirect(url_for("main.zip_workspace"))
zip_name = f"otb-cloud-export-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}.zip"
zip_path = exports_dir / zip_name
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in staged:
arcname = p.name.split("__", 1)[1] if "__" in p.name else p.name
zf.write(p, arcname=arcname)
for p in staged:
p.unlink(missing_ok=True)
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'zip_created', %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
_client_ip(),
request.headers.get("User-Agent", ""),
f"Created zip export '{zip_name}' in exports workspace; staged copies cleared after success",
),
)
db.commit()
flash(f"Zip file created successfully. You can find it in the Exports section below as '{zip_name}'.", "success")
return redirect(url_for("main.zip_workspace"))
@bp.route("/workspace/exports/<path:filename>/download", methods=["GET"])
@portal_session_required
def download_export(filename: str):
exports_dir = _tenant_root() / "exports"
file_path = exports_dir / filename
if not file_path.exists() or not file_path.is_file():
flash("Export file not found.", "warning")
return redirect(url_for("main.zip_workspace"))
return send_file(file_path, as_attachment=True, download_name=file_path.name)
@bp.route("/deleted", methods=["GET"])
@portal_session_required
def deleted_files():
db = get_db()
_purge_expired_deleted_files(db)
with db.cursor() as cur:
cur.execute(
"""
SELECT
f.id,
f.original_filename,
f.relative_path,
f.size_bytes,
f.deleted_at,
d.device_name,
d.device_type
FROM files f
LEFT JOIN devices d ON f.device_id = d.id
WHERE f.tenant_id = %s
AND f.is_deleted = 1
ORDER BY f.deleted_at DESC, f.id DESC
""",
(session["otb_tenant_id"],),
)
files = cur.fetchall()
return render_template(
"cloud/deleted_files.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
files=files,
)
@bp.route("/deleted/<int:file_id>/recover", methods=["POST"])
@portal_session_required
def recover_deleted_file(file_id: int):
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT
f.id,
f.device_id,
f.original_filename,
f.relative_path,
f.is_deleted,
d.device_name,
d.device_type,
d.relative_path AS device_relative_path
FROM files f
LEFT JOIN devices d ON f.device_id = d.id
WHERE f.id = %s
AND f.tenant_id = %s
""",
(file_id, session["otb_tenant_id"]),
)
file_row = cur.fetchone()
if not file_row or not file_row["is_deleted"]:
flash("Deleted file not found.", "warning")
return redirect(url_for("main.deleted_files"))
if not file_row["device_relative_path"]:
flash("Cannot recover this file because its device record is missing.", "warning")
return redirect(url_for("main.deleted_files"))
source_path = _safe_path_from_relative(file_row["relative_path"])
if not source_path.exists():
flash("Deleted file is missing from storage.", "warning")
return redirect(url_for("main.deleted_files"))
recovered_name, recovered_basename, recovered_extension = _recovered_filename(file_row["original_filename"])
stored_name = _stored_name(recovered_name)
target_rel = f"{file_row['device_relative_path']}/originals/{stored_name}"
target_dir = f"{file_row['device_relative_path']}/originals"
target_path = _safe_path_from_relative(target_rel)
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source_path), str(target_path))
cur.execute(
"""
UPDATE files
SET original_filename = %s,
display_filename = NULL,
basename = %s,
extension = %s,
relative_path = %s,
directory_path = %s,
is_deleted = 0,
deleted_at = NULL
WHERE id = %s AND tenant_id = %s
""",
(
recovered_name,
recovered_basename,
recovered_extension,
target_rel,
target_dir,
file_id,
session["otb_tenant_id"],
),
)
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'file_recovered', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
file_id,
_client_ip(),
request.headers.get("User-Agent", ""),
f"Recovered deleted file as '{recovered_name}' back into originals",
),
)
db.commit()
flash(f"Recovered file as '{recovered_name}'. You can find it back in the device file browser.", "success")
return redirect(url_for("main.deleted_files"))
@bp.route("/deleted/<int:file_id>/hard-delete", methods=["POST"])
@portal_session_required
def hard_delete_file(file_id: int):
db = get_db()
with db.cursor() as cur:
cur.execute(
"""
SELECT id, original_filename, relative_path, is_deleted
FROM files
WHERE id = %s AND tenant_id = %s
""",
(file_id, session["otb_tenant_id"]),
)
file_row = cur.fetchone()
if not file_row or not file_row["is_deleted"]:
flash("Deleted file not found.", "warning")
return redirect(url_for("main.deleted_files"))
file_path = _safe_path_from_relative(file_row["relative_path"])
if file_path.exists():
file_path.unlink(missing_ok=True)
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'file_hard_deleted', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
file_id,
_client_ip(),
request.headers.get("User-Agent", ""),
f"Hard-deleted '{file_row['original_filename']}' immediately from deleted area",
),
)
cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (file_id, session["otb_tenant_id"]))
db.commit()
flash("File permanently deleted.", "success")
return redirect(url_for("main.deleted_files"))
@bp.route("/files/<int:file_id>/rename", methods=["POST"])
@portal_session_required
def rename_file(file_id: int):
db = get_db()
requested_basename = _sanitize_display_basename(request.form.get("display_basename") or "")
with db.cursor() as cur:
cur.execute(
"""
SELECT id, device_id, original_filename, display_filename, extension, is_deleted
FROM files
WHERE id = %s AND tenant_id = %s
""",
(file_id, session["otb_tenant_id"]),
)
file_row = cur.fetchone()
if not file_row:
flash("File not found.", "warning")
return redirect(url_for("main.dashboard"))
if file_row["is_deleted"]:
flash("You can only rename active files from the device file browser.", "warning")
return redirect(url_for("main.deleted_files"))
if not requested_basename:
flash("Please enter a new file name.", "warning")
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"]))
new_visible_name = (
f"{requested_basename}.{file_row['extension']}"
if file_row["extension"]
else requested_basename
)
if len(new_visible_name) > 255:
flash("That file name is too long.", "warning")
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"]))
current_visible_name = _display_filename(file_row)
if new_visible_name == current_visible_name:
flash("That file already has that name.", "warning")
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"]))
display_filename = None if new_visible_name == file_row["original_filename"] else new_visible_name
cur.execute(
"""
UPDATE files
SET display_filename = %s
WHERE id = %s AND tenant_id = %s
""",
(display_filename, file_id, session["otb_tenant_id"]),
)
final_visible_name = display_filename or file_row["original_filename"]
cur.execute(
"""
INSERT INTO audit_logs (
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail
) VALUES (%s, %s, 'user', 'file_display_renamed', %s, %s, %s, %s)
""",
(
session["otb_tenant_id"],
session["otb_user_id"],
file_id,
_client_ip(),
request.headers.get("User-Agent", ""),
f"Updated display filename from '{current_visible_name}' to '{final_visible_name}'",
),
)
db.commit()
if display_filename is None:
flash("Custom name cleared. The original file name is now shown again.", "success")
else:
flash(f"File renamed to '{display_filename}'.", "success")
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"]))
@bp.route("/devices/<int:device_id>/files", methods=["GET"])
@portal_session_required
def browse_device_files(device_id: int):
db = get_db()
device = _get_device_for_tenant(db, device_id)
if not device:
flash("Device not found.", "warning")
return redirect(url_for("main.dashboard"))
view_mode = request.args.get("view", "list").strip().lower()
if view_mode not in ("list", "gallery"):
view_mode = "list"
current_path = _normalize_browser_path(request.args.get("path", ""))
root_directory = f"{device['relative_path']}/originals"
current_directory = f"{root_directory}/{current_path}" if current_path else root_directory
with db.cursor() as cur:
cur.execute(
"""
SELECT
id,
file_kind,
relative_path,
directory_path,
original_filename,
display_filename,
basename,
extension,
mime_type,
size_bytes,
sha256,
uploaded_at,
is_immutable
FROM files
WHERE tenant_id = %s
AND device_id = %s
AND is_deleted = 0
AND directory_path = %s
ORDER BY uploaded_at DESC, id DESC
""",
(session["otb_tenant_id"], device_id, current_directory),
)
files = cur.fetchall()
cur.execute(
"""
SELECT DISTINCT directory_path
FROM files
WHERE tenant_id = %s
AND device_id = %s
AND is_deleted = 0
AND directory_path LIKE %s
ORDER BY directory_path
""",
(session["otb_tenant_id"], device_id, f"{current_directory}/%"),
)
all_dirs = [row["directory_path"] for row in cur.fetchall()]
folder_names = []
prefix = current_directory + "/"
for d in all_dirs:
remainder = d[len(prefix):] if d.startswith(prefix) else ""
if not remainder:
continue
first_segment = remainder.split("/", 1)[0]
if first_segment and first_segment not in folder_names:
folder_names.append(first_segment)
folders = []
for name in sorted(folder_names, key=lambda x: x.lower()):
folder_path = f"{current_path}/{name}" if current_path else name
folders.append(
{
"name": name,
"path": folder_path,
}
)
breadcrumbs = [
{
"label": device["device_name"],
"path": "",
}
]
if current_path:
accum = []
for segment in current_path.split("/"):
accum.append(segment)
breadcrumbs.append(
{
"label": segment,
"path": "/".join(accum),
}
)
parent_path = ""
if current_path:
parts = current_path.split("/")
parent_path = "/".join(parts[:-1])
return render_template(
"cloud/device_files.html",
user_email=session.get("otb_email"),
tenant_slug=session.get("otb_tenant_slug"),
device=device,
files=files,
file_count=len(files),
view_mode=view_mode,
current_path=current_path,
parent_path=parent_path,
folders=folders,
breadcrumbs=breadcrumbs,
)