|
|
from functools import wraps |
|
|
from pathlib import Path |
|
|
from datetime import datetime, timezone |
|
|
import shutil |
|
|
import zipfile |
|
|
from PIL import Image |
|
|
import re |
|
|
|
|
|
from werkzeug.utils import secure_filename |
|
|
from flask import Blueprint, flash, redirect, render_template, request, session, url_for, current_app, send_file |
|
|
|
|
|
from app.db import get_db |
|
|
from app.auth.utils import create_device_directories, remove_device_directories, slugify_device_name, compute_sha256 |
|
|
|
|
|
bp = Blueprint("main", __name__) |
|
|
|
|
|
def portal_session_required(view_func): |
|
|
@wraps(view_func) |
|
|
def wrapped(*args, **kwargs): |
|
|
if "otb_user_id" not in session or "otb_tenant_id" not in session: |
|
|
return redirect(url_for("auth.login_required_notice")) |
|
|
return view_func(*args, **kwargs) |
|
|
return wrapped |
|
|
|
|
|
def _client_ip(): |
|
|
return request.headers.get("X-Forwarded-For", request.remote_addr) |
|
|
|
|
|
def _tenant_root() -> Path: |
|
|
return Path(current_app.config["STORAGE_ROOT"]) / "tenants" / session["otb_tenant_slug"] |
|
|
|
|
|
def _stored_name(original_name: str) -> str: |
|
|
safe = secure_filename(original_name or "upload.bin") |
|
|
if not safe: |
|
|
safe = "upload.bin" |
|
|
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") |
|
|
return f"{ts}__{safe}" |
|
|
|
|
|
|
|
|
def _recovered_filename(original_name: str) -> tuple[str, str, str]: |
|
|
base_name = original_name.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] |
|
|
if "." in base_name: |
|
|
basename, extension = base_name.rsplit(".", 1) |
|
|
recovered_name = f"{basename}-recovered.{extension}" |
|
|
return recovered_name, f"{basename}-recovered", extension |
|
|
recovered_name = f"{base_name}-recovered" |
|
|
return recovered_name, f"{base_name}-recovered", "" |
|
|
|
|
|
def _display_filename(file_row) -> str: |
|
|
if not file_row: |
|
|
return "download.bin" |
|
|
return (file_row.get("display_filename") or file_row.get("original_filename") or "download.bin").strip() |
|
|
|
|
|
|
|
|
def _sanitize_display_basename(raw_name: str) -> str: |
|
|
cleaned = (raw_name or "").replace("\x00", "").strip() |
|
|
cleaned = re.sub(r'[\\/:*?"<>|]+', "", cleaned) |
|
|
cleaned = re.sub(r"\s+", " ", cleaned) |
|
|
cleaned = cleaned.strip().strip(".") |
|
|
if "." in cleaned: |
|
|
cleaned = cleaned.rsplit(".", 1)[0].strip() |
|
|
return cleaned[:200] |
|
|
|
|
|
|
|
|
|
|
|
def _generate_thumbnail(original_path: Path, thumb_path: Path): |
|
|
thumb_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
try: |
|
|
with Image.open(original_path) as img: |
|
|
img.thumbnail((400, 400)) |
|
|
img.save(thumb_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def _normalize_browser_path(raw_path: str) -> str: |
|
|
raw = (raw_path or "").replace("\\", "/").strip().strip("/") |
|
|
if not raw: |
|
|
return "" |
|
|
parts = [] |
|
|
for part in raw.split("/"): |
|
|
part = part.strip() |
|
|
if not part or part in (".", ".."): |
|
|
continue |
|
|
parts.append(part) |
|
|
return "/".join(parts) |
|
|
|
|
|
|
|
|
def _safe_path_from_relative(relative_path: str) -> Path: |
|
|
return _tenant_root() / relative_path |
|
|
|
|
|
def _get_device_for_tenant(db, device_id: int): |
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, device_name, device_type, relative_path |
|
|
FROM devices |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
(device_id, session["otb_tenant_id"]), |
|
|
) |
|
|
return cur.fetchone() |
|
|
|
|
|
def _purge_expired_deleted_files(db): |
|
|
expired = [] |
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, relative_path, original_filename |
|
|
FROM files |
|
|
WHERE tenant_id = %s |
|
|
AND is_deleted = 1 |
|
|
AND deleted_at IS NOT NULL |
|
|
AND deleted_at <= (UTC_TIMESTAMP() - INTERVAL 24 HOUR) |
|
|
""", |
|
|
(session["otb_tenant_id"],), |
|
|
) |
|
|
expired = cur.fetchall() |
|
|
|
|
|
for row in expired: |
|
|
file_path = _safe_path_from_relative(row["relative_path"]) |
|
|
if file_path.exists(): |
|
|
try: |
|
|
file_path.unlink() |
|
|
except FileNotFoundError: |
|
|
pass |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'system', 'deleted_file_purged', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
row["id"], |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Purged deleted file '{row['original_filename']}' after retention window", |
|
|
), |
|
|
) |
|
|
|
|
|
cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (row["id"], session["otb_tenant_id"])) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
@bp.route("/") |
|
|
def index(): |
|
|
if "otb_user_id" in session: |
|
|
return redirect(url_for("main.dashboard")) |
|
|
return redirect(url_for("auth.login_required_notice")) |
|
|
|
|
|
@bp.route("/dashboard") |
|
|
@portal_session_required |
|
|
def dashboard(): |
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, device_name, device_type, relative_path, is_active, created_at |
|
|
FROM devices |
|
|
WHERE tenant_id = %s |
|
|
ORDER BY id |
|
|
""", |
|
|
(session["otb_tenant_id"],), |
|
|
) |
|
|
devices = cur.fetchall() |
|
|
|
|
|
return render_template( |
|
|
"cloud/dashboard.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
devices=devices, |
|
|
) |
|
|
|
|
|
@bp.route("/devices/android/new", methods=["GET", "POST"]) |
|
|
@portal_session_required |
|
|
def create_android_device(): |
|
|
db = get_db() |
|
|
|
|
|
if request.method == "GET": |
|
|
return render_template( |
|
|
"cloud/android_device_new.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
) |
|
|
|
|
|
device_name = (request.form.get("device_name") or "").strip() |
|
|
|
|
|
if not device_name: |
|
|
flash("Device name is required.", "warning") |
|
|
return redirect(url_for("main.create_android_device")) |
|
|
|
|
|
slug = slugify_device_name(device_name) |
|
|
relative_path = f"devices/{slug}" |
|
|
|
|
|
import hashlib, secrets, datetime |
|
|
|
|
|
raw_token = secrets.token_hex(16) |
|
|
token_hash = hashlib.sha256(raw_token.encode()).hexdigest() |
|
|
|
|
|
expires_at = datetime.datetime.utcnow() + datetime.timedelta(hours=48) |
|
|
|
|
|
with db.cursor() as cur: |
|
|
# create device bucket |
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO devices (tenant_id, device_name, device_type, relative_path) |
|
|
VALUES (%s, %s, %s, %s) |
|
|
""", |
|
|
(session["otb_tenant_id"], device_name, "android", relative_path), |
|
|
) |
|
|
device_id = cur.lastrowid |
|
|
|
|
|
# create token |
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO android_device_tokens (tenant_id, device_id, token_hash, device_label, expires_at) |
|
|
VALUES (%s, %s, %s, %s, %s) |
|
|
""", |
|
|
(session["otb_tenant_id"], device_id, token_hash, device_name, expires_at), |
|
|
) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
flash(f"Activation token (valid 48h): {raw_token}", "success") |
|
|
flash("⚠️ This token must be used within 48 hours or it will expire.", "warning") |
|
|
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
|
|
|
@bp.route("/devices/new", methods=["GET", "POST"]) |
|
|
@portal_session_required |
|
|
def add_device(): |
|
|
if request.method == "GET": |
|
|
return render_template( |
|
|
"cloud/device_new.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
) |
|
|
|
|
|
device_name = (request.form.get("device_name") or "").strip() |
|
|
device_type = (request.form.get("device_type") or "").strip() |
|
|
|
|
|
if not device_name: |
|
|
flash("Device name is required.", "warning") |
|
|
return render_template( |
|
|
"cloud/device_new.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
device_name=device_name, |
|
|
device_type=device_type, |
|
|
) |
|
|
|
|
|
if not device_type: |
|
|
flash("Device type is required.", "warning") |
|
|
return render_template( |
|
|
"cloud/device_new.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
device_name=device_name, |
|
|
device_type=device_type, |
|
|
) |
|
|
|
|
|
slug = slugify_device_name(device_name) |
|
|
relative_path = f"devices/{slug}" |
|
|
|
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id |
|
|
FROM devices |
|
|
WHERE tenant_id = %s AND device_name = %s |
|
|
""", |
|
|
(session["otb_tenant_id"], device_name), |
|
|
) |
|
|
existing = cur.fetchone() |
|
|
|
|
|
if existing: |
|
|
flash("A device with that name already exists.", "warning") |
|
|
return render_template( |
|
|
"cloud/device_new.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
device_name=device_name, |
|
|
device_type=device_type, |
|
|
) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO devices (tenant_id, device_name, device_type, relative_path, is_active) |
|
|
VALUES (%s, %s, %s, %s, 1) |
|
|
""", |
|
|
(session["otb_tenant_id"], device_name, device_type, relative_path), |
|
|
) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'device_created', %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Created device '{device_name}' ({device_type}) at {relative_path}", |
|
|
), |
|
|
) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
create_device_directories(session["otb_tenant_slug"], relative_path) |
|
|
|
|
|
flash("Device added successfully.", "success") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
@bp.route("/devices/delete/<int:device_id>", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def delete_device(device_id: int): |
|
|
db = get_db() |
|
|
device = _get_device_for_tenant(db, device_id) |
|
|
|
|
|
if not device: |
|
|
flash("Device not found.", "warning") |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT COUNT(*) AS file_count |
|
|
FROM files |
|
|
WHERE tenant_id = %s AND device_id = %s AND is_deleted = 0 |
|
|
""", |
|
|
(session["otb_tenant_id"], device_id), |
|
|
) |
|
|
file_count = cur.fetchone()["file_count"] |
|
|
|
|
|
if file_count and int(file_count) > 0: |
|
|
flash("This device cannot be removed because files are still linked to it.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
DELETE FROM devices |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
(device_id, session["otb_tenant_id"]), |
|
|
) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'device_deleted', %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Deleted device '{device['device_name']}' ({device['device_type']}) at {device['relative_path']}", |
|
|
), |
|
|
) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
remove_device_directories(session["otb_tenant_slug"], device["relative_path"]) |
|
|
|
|
|
flash("Device removed successfully.", "success") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
@bp.route("/devices/<int:device_id>/upload", methods=["GET", "POST"]) |
|
|
@portal_session_required |
|
|
def upload_files(device_id: int): |
|
|
db = get_db() |
|
|
device = _get_device_for_tenant(db, device_id) |
|
|
|
|
|
if not device: |
|
|
flash("Device not found.", "warning") |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
if request.method == "GET": |
|
|
return render_template( |
|
|
"cloud/upload.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
device=device, |
|
|
) |
|
|
|
|
|
files = request.files.getlist("files") |
|
|
files = [f for f in files if f and f.filename] |
|
|
|
|
|
if not files: |
|
|
flash("Please choose at least one file to upload.", "warning") |
|
|
return render_template( |
|
|
"cloud/upload.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
device=device, |
|
|
) |
|
|
|
|
|
upload_base = _tenant_root() / device["relative_path"] / "originals" |
|
|
upload_base.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
uploaded_count = 0 |
|
|
|
|
|
with db.cursor() as cur: |
|
|
for incoming in files: |
|
|
original_filename = incoming.filename or "upload.bin" |
|
|
|
|
|
# preserve folder structure from browser |
|
|
relative_upload_path = original_filename.replace("\\", "/") |
|
|
path_parts = relative_upload_path.split("/") |
|
|
|
|
|
if len(path_parts) > 1: |
|
|
subdirs = "/".join(path_parts[:-1]) |
|
|
filename_only = path_parts[-1] |
|
|
else: |
|
|
subdirs = "" |
|
|
filename_only = path_parts[0] |
|
|
|
|
|
stored_name = _stored_name(filename_only) |
|
|
|
|
|
target_dir = upload_base |
|
|
if subdirs: |
|
|
target_dir = upload_base / subdirs |
|
|
|
|
|
target_dir.mkdir(parents=True, exist_ok=True) |
|
|
target_path = target_dir / stored_name |
|
|
|
|
|
incoming.save(target_path) |
|
|
|
|
|
size_bytes = target_path.stat().st_size |
|
|
sha256 = compute_sha256(target_path) |
|
|
|
|
|
base_name = original_filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] |
|
|
if "." in base_name: |
|
|
basename, extension = base_name.rsplit(".", 1) |
|
|
else: |
|
|
basename, extension = base_name, "" |
|
|
|
|
|
if subdirs: |
|
|
relative_path = f"{device['relative_path']}/originals/{subdirs}/{stored_name}" |
|
|
directory_path = f"{device['relative_path']}/originals/{subdirs}" |
|
|
else: |
|
|
relative_path = f"{device['relative_path']}/originals/{stored_name}" |
|
|
directory_path = f"{device['relative_path']}/originals" |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO files ( |
|
|
tenant_id, device_id, parent_file_id, file_kind, relative_path, directory_path, |
|
|
original_filename, basename, extension, mime_type, size_bytes, sha256, |
|
|
capture_date, uploaded_at, is_immutable, is_deleted, deleted_at |
|
|
) VALUES ( |
|
|
%s, %s, NULL, 'original', %s, %s, |
|
|
%s, %s, %s, %s, %s, %s, |
|
|
NULL, UTC_TIMESTAMP(), 1, 0, NULL |
|
|
) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
device["id"], |
|
|
relative_path, |
|
|
directory_path, |
|
|
original_filename, |
|
|
basename, |
|
|
extension, |
|
|
incoming.mimetype or None, |
|
|
size_bytes, |
|
|
sha256, |
|
|
), |
|
|
) |
|
|
|
|
|
file_id = cur.lastrowid |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'file_uploaded', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
file_id, |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Uploaded '{original_filename}' to {relative_path}", |
|
|
), |
|
|
) |
|
|
|
|
|
uploaded_count += 1 |
|
|
|
|
|
db.commit() |
|
|
|
|
|
flash(f"Uploaded {uploaded_count} file(s) to {device['device_name']}.", "success") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/files/<int:file_id>/thumb", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def thumbnail_file(file_id: int): |
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, original_filename, display_filename, relative_path, mime_type |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s AND is_deleted = 0 |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row: |
|
|
return "", 404 |
|
|
|
|
|
original_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
thumb_rel = file_row["relative_path"].replace("originals/", "derived/thumbs/") |
|
|
thumb_path = _safe_path_from_relative(thumb_rel) |
|
|
|
|
|
if not thumb_path.exists(): |
|
|
_generate_thumbnail(original_path, thumb_path) |
|
|
|
|
|
if thumb_path.exists(): |
|
|
return send_file(thumb_path, mimetype=file_row.get("mime_type"), as_attachment=False) |
|
|
|
|
|
return send_file(original_path, mimetype=file_row.get("mime_type"), as_attachment=False) |
|
|
|
|
|
|
|
|
@bp.route("/files/<int:file_id>/inline", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def inline_file(file_id: int): |
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, original_filename, display_filename, relative_path, mime_type |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s AND is_deleted = 0 |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row: |
|
|
flash("File not found.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
file_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
if not file_path.exists(): |
|
|
flash("File is missing from storage.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
return send_file( |
|
|
file_path, |
|
|
mimetype=file_row.get("mime_type") or None, |
|
|
as_attachment=False, |
|
|
download_name=_display_filename(file_row), |
|
|
conditional=True, |
|
|
) |
|
|
|
|
|
@bp.route("/files/<int:file_id>/download", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def download_file(file_id: int): |
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, original_filename, display_filename, relative_path |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row: |
|
|
flash("File not found.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
file_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
if not file_path.exists(): |
|
|
flash("File is missing from storage.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
return send_file(file_path, as_attachment=True, download_name=_display_filename(file_row)) |
|
|
|
|
|
@bp.route("/devices/<int:device_id>/files/download-selected", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def download_selected_files(device_id: int): |
|
|
db = get_db() |
|
|
device = _get_device_for_tenant(db, device_id) |
|
|
|
|
|
if not device: |
|
|
flash("Device not found.", "warning") |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] |
|
|
|
|
|
if not selected_ids: |
|
|
flash("Select at least one file first.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=device_id)) |
|
|
|
|
|
if len(selected_ids) != 1: |
|
|
flash("Download Selected currently supports one file at a time. Use Zip Workspace for multiple files.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=device_id)) |
|
|
|
|
|
return redirect(url_for("main.download_file", file_id=selected_ids[0])) |
|
|
|
|
|
@bp.route("/devices/<int:device_id>/files/delete-selected", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def delete_selected_files(device_id: int): |
|
|
db = get_db() |
|
|
device = _get_device_for_tenant(db, device_id) |
|
|
|
|
|
if not device: |
|
|
flash("Device not found.", "warning") |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] |
|
|
|
|
|
if not selected_ids: |
|
|
flash("Select at least one file first.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=device_id)) |
|
|
|
|
|
deleted_count = 0 |
|
|
|
|
|
with db.cursor() as cur: |
|
|
for file_id in selected_ids: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, original_filename, relative_path, directory_path, is_deleted |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s AND device_id = %s |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"], device_id), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row or file_row["is_deleted"]: |
|
|
continue |
|
|
|
|
|
source_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
target_rel = f"{device['relative_path']}/deleted/{_stored_name(file_row['original_filename'])}" |
|
|
target_path = _safe_path_from_relative(target_rel) |
|
|
target_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if source_path.exists(): |
|
|
shutil.move(str(source_path), str(target_path)) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
UPDATE files |
|
|
SET relative_path = %s, |
|
|
directory_path = %s, |
|
|
is_deleted = 1, |
|
|
deleted_at = UTC_TIMESTAMP() |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
( |
|
|
target_rel, |
|
|
f"{device['relative_path']}/deleted", |
|
|
file_id, |
|
|
session["otb_tenant_id"], |
|
|
), |
|
|
) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'file_soft_deleted', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
file_id, |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Soft-deleted '{file_row['original_filename']}' into deleted area", |
|
|
), |
|
|
) |
|
|
|
|
|
deleted_count += 1 |
|
|
|
|
|
db.commit() |
|
|
|
|
|
flash(f"Deleted {deleted_count} file(s). Deleted files are retained for up to 24 hours unless hard-deleted.", "success") |
|
|
return redirect(url_for("main.browse_device_files", device_id=device_id)) |
|
|
|
|
|
@bp.route("/devices/<int:device_id>/files/send-to-zip", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def send_selected_to_zip_workspace(device_id: int): |
|
|
db = get_db() |
|
|
device = _get_device_for_tenant(db, device_id) |
|
|
|
|
|
if not device: |
|
|
flash("Device not found.", "warning") |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
selected_ids = [int(x) for x in request.form.getlist("selected_files") if x.strip().isdigit()] |
|
|
|
|
|
if not selected_ids: |
|
|
flash("Select at least one file first.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=device_id)) |
|
|
|
|
|
staging_dir = _tenant_root() / "zip_staging" |
|
|
staging_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
copied_count = 0 |
|
|
|
|
|
with db.cursor() as cur: |
|
|
for file_id in selected_ids: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, original_filename, display_filename, relative_path, is_deleted |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s AND device_id = %s |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"], device_id), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row or file_row["is_deleted"]: |
|
|
continue |
|
|
|
|
|
source_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
if not source_path.exists(): |
|
|
continue |
|
|
|
|
|
target_name = _stored_name(_display_filename(file_row)) |
|
|
target_path = staging_dir / target_name |
|
|
shutil.copy2(source_path, target_path) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'file_staged_for_zip', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
file_id, |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Copied '{_display_filename(file_row)}' into zip workspace", |
|
|
), |
|
|
) |
|
|
|
|
|
copied_count += 1 |
|
|
|
|
|
db.commit() |
|
|
|
|
|
flash(f"Sent {copied_count} file(s) to Zip Workspace.", "success") |
|
|
return redirect(url_for("main.browse_device_files", device_id=device_id)) |
|
|
|
|
|
@bp.route("/workspace/zip", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def zip_workspace(): |
|
|
tenant_root = _tenant_root() |
|
|
staging_dir = tenant_root / "zip_staging" |
|
|
exports_dir = tenant_root / "exports" |
|
|
staging_dir.mkdir(parents=True, exist_ok=True) |
|
|
exports_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
staged_files = [] |
|
|
for p in sorted(staging_dir.iterdir(), key=lambda x: x.name.lower()): |
|
|
if p.is_file(): |
|
|
staged_files.append({ |
|
|
"name": p.name, |
|
|
"size_bytes": p.stat().st_size, |
|
|
"path": str(p.relative_to(tenant_root)), |
|
|
}) |
|
|
|
|
|
export_files = [] |
|
|
for p in sorted(exports_dir.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): |
|
|
if p.is_file(): |
|
|
export_files.append({ |
|
|
"name": p.name, |
|
|
"size_bytes": p.stat().st_size, |
|
|
"path": str(p.relative_to(tenant_root)), |
|
|
}) |
|
|
|
|
|
return render_template( |
|
|
"cloud/zip_workspace.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
staged_files=staged_files, |
|
|
export_files=export_files, |
|
|
) |
|
|
|
|
|
@bp.route("/workspace/zip/create", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def create_zip_from_workspace(): |
|
|
tenant_root = _tenant_root() |
|
|
staging_dir = tenant_root / "zip_staging" |
|
|
exports_dir = tenant_root / "exports" |
|
|
staging_dir.mkdir(parents=True, exist_ok=True) |
|
|
exports_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
staged = [p for p in staging_dir.iterdir() if p.is_file()] |
|
|
if not staged: |
|
|
flash("Zip Workspace is empty.", "warning") |
|
|
return redirect(url_for("main.zip_workspace")) |
|
|
|
|
|
zip_name = f"otb-cloud-export-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}.zip" |
|
|
zip_path = exports_dir / zip_name |
|
|
|
|
|
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: |
|
|
for p in staged: |
|
|
arcname = p.name.split("__", 1)[1] if "__" in p.name else p.name |
|
|
zf.write(p, arcname=arcname) |
|
|
|
|
|
for p in staged: |
|
|
p.unlink(missing_ok=True) |
|
|
|
|
|
db = get_db() |
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'zip_created', %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Created zip export '{zip_name}' in exports workspace; staged copies cleared after success", |
|
|
), |
|
|
) |
|
|
db.commit() |
|
|
|
|
|
flash(f"Zip file created successfully. You can find it in the Exports section below as '{zip_name}'.", "success") |
|
|
return redirect(url_for("main.zip_workspace")) |
|
|
|
|
|
@bp.route("/workspace/exports/<path:filename>/download", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def download_export(filename: str): |
|
|
exports_dir = _tenant_root() / "exports" |
|
|
file_path = exports_dir / filename |
|
|
|
|
|
if not file_path.exists() or not file_path.is_file(): |
|
|
flash("Export file not found.", "warning") |
|
|
return redirect(url_for("main.zip_workspace")) |
|
|
|
|
|
return send_file(file_path, as_attachment=True, download_name=file_path.name) |
|
|
|
|
|
@bp.route("/deleted", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def deleted_files(): |
|
|
db = get_db() |
|
|
_purge_expired_deleted_files(db) |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT |
|
|
f.id, |
|
|
f.original_filename, |
|
|
f.relative_path, |
|
|
f.size_bytes, |
|
|
f.deleted_at, |
|
|
d.device_name, |
|
|
d.device_type |
|
|
FROM files f |
|
|
LEFT JOIN devices d ON f.device_id = d.id |
|
|
WHERE f.tenant_id = %s |
|
|
AND f.is_deleted = 1 |
|
|
ORDER BY f.deleted_at DESC, f.id DESC |
|
|
""", |
|
|
(session["otb_tenant_id"],), |
|
|
) |
|
|
files = cur.fetchall() |
|
|
|
|
|
return render_template( |
|
|
"cloud/deleted_files.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
files=files, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/deleted/<int:file_id>/recover", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def recover_deleted_file(file_id: int): |
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT |
|
|
f.id, |
|
|
f.device_id, |
|
|
f.original_filename, |
|
|
f.relative_path, |
|
|
f.is_deleted, |
|
|
d.device_name, |
|
|
d.device_type, |
|
|
d.relative_path AS device_relative_path |
|
|
FROM files f |
|
|
LEFT JOIN devices d ON f.device_id = d.id |
|
|
WHERE f.id = %s |
|
|
AND f.tenant_id = %s |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row or not file_row["is_deleted"]: |
|
|
flash("Deleted file not found.", "warning") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
if not file_row["device_relative_path"]: |
|
|
flash("Cannot recover this file because its device record is missing.", "warning") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
source_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
if not source_path.exists(): |
|
|
flash("Deleted file is missing from storage.", "warning") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
recovered_name, recovered_basename, recovered_extension = _recovered_filename(file_row["original_filename"]) |
|
|
stored_name = _stored_name(recovered_name) |
|
|
|
|
|
target_rel = f"{file_row['device_relative_path']}/originals/{stored_name}" |
|
|
target_dir = f"{file_row['device_relative_path']}/originals" |
|
|
target_path = _safe_path_from_relative(target_rel) |
|
|
target_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
shutil.move(str(source_path), str(target_path)) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
UPDATE files |
|
|
SET original_filename = %s, |
|
|
display_filename = NULL, |
|
|
basename = %s, |
|
|
extension = %s, |
|
|
relative_path = %s, |
|
|
directory_path = %s, |
|
|
is_deleted = 0, |
|
|
deleted_at = NULL |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
( |
|
|
recovered_name, |
|
|
recovered_basename, |
|
|
recovered_extension, |
|
|
target_rel, |
|
|
target_dir, |
|
|
file_id, |
|
|
session["otb_tenant_id"], |
|
|
), |
|
|
) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'file_recovered', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
file_id, |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Recovered deleted file as '{recovered_name}' back into originals", |
|
|
), |
|
|
) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
flash(f"Recovered file as '{recovered_name}'. You can find it back in the device file browser.", "success") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
|
|
|
@bp.route("/deleted/<int:file_id>/hard-delete", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def hard_delete_file(file_id: int): |
|
|
db = get_db() |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, original_filename, relative_path, is_deleted |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row or not file_row["is_deleted"]: |
|
|
flash("Deleted file not found.", "warning") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
file_path = _safe_path_from_relative(file_row["relative_path"]) |
|
|
if file_path.exists(): |
|
|
file_path.unlink(missing_ok=True) |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'file_hard_deleted', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
file_id, |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Hard-deleted '{file_row['original_filename']}' immediately from deleted area", |
|
|
), |
|
|
) |
|
|
|
|
|
cur.execute("DELETE FROM files WHERE id = %s AND tenant_id = %s", (file_id, session["otb_tenant_id"])) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
flash("File permanently deleted.", "success") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/files/<int:file_id>/rename", methods=["POST"]) |
|
|
@portal_session_required |
|
|
def rename_file(file_id: int): |
|
|
db = get_db() |
|
|
requested_basename = _sanitize_display_basename(request.form.get("display_basename") or "") |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT id, device_id, original_filename, display_filename, extension, is_deleted |
|
|
FROM files |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
(file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
file_row = cur.fetchone() |
|
|
|
|
|
if not file_row: |
|
|
flash("File not found.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
if file_row["is_deleted"]: |
|
|
flash("You can only rename active files from the device file browser.", "warning") |
|
|
return redirect(url_for("main.deleted_files")) |
|
|
|
|
|
if not requested_basename: |
|
|
flash("Please enter a new file name.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) |
|
|
|
|
|
new_visible_name = ( |
|
|
f"{requested_basename}.{file_row['extension']}" |
|
|
if file_row["extension"] |
|
|
else requested_basename |
|
|
) |
|
|
|
|
|
if len(new_visible_name) > 255: |
|
|
flash("That file name is too long.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) |
|
|
|
|
|
current_visible_name = _display_filename(file_row) |
|
|
if new_visible_name == current_visible_name: |
|
|
flash("That file already has that name.", "warning") |
|
|
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) |
|
|
|
|
|
display_filename = None if new_visible_name == file_row["original_filename"] else new_visible_name |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
UPDATE files |
|
|
SET display_filename = %s |
|
|
WHERE id = %s AND tenant_id = %s |
|
|
""", |
|
|
(display_filename, file_id, session["otb_tenant_id"]), |
|
|
) |
|
|
|
|
|
final_visible_name = display_filename or file_row["original_filename"] |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
INSERT INTO audit_logs ( |
|
|
tenant_id, user_id, actor_type, event_type, file_id, ip_address, user_agent, event_detail |
|
|
) VALUES (%s, %s, 'user', 'file_display_renamed', %s, %s, %s, %s) |
|
|
""", |
|
|
( |
|
|
session["otb_tenant_id"], |
|
|
session["otb_user_id"], |
|
|
file_id, |
|
|
_client_ip(), |
|
|
request.headers.get("User-Agent", ""), |
|
|
f"Updated display filename from '{current_visible_name}' to '{final_visible_name}'", |
|
|
), |
|
|
) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
if display_filename is None: |
|
|
flash("Custom name cleared. The original file name is now shown again.", "success") |
|
|
else: |
|
|
flash(f"File renamed to '{display_filename}'.", "success") |
|
|
|
|
|
return redirect(url_for("main.browse_device_files", device_id=file_row["device_id"])) |
|
|
|
|
|
@bp.route("/devices/<int:device_id>/files", methods=["GET"]) |
|
|
@portal_session_required |
|
|
def browse_device_files(device_id: int): |
|
|
db = get_db() |
|
|
device = _get_device_for_tenant(db, device_id) |
|
|
|
|
|
if not device: |
|
|
flash("Device not found.", "warning") |
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
|
|
view_mode = request.args.get("view", "list").strip().lower() |
|
|
if view_mode not in ("list", "gallery"): |
|
|
view_mode = "list" |
|
|
|
|
|
current_path = _normalize_browser_path(request.args.get("path", "")) |
|
|
root_directory = f"{device['relative_path']}/originals" |
|
|
current_directory = f"{root_directory}/{current_path}" if current_path else root_directory |
|
|
|
|
|
with db.cursor() as cur: |
|
|
cur.execute( |
|
|
""" |
|
|
SELECT |
|
|
id, |
|
|
file_kind, |
|
|
relative_path, |
|
|
directory_path, |
|
|
original_filename, |
|
|
display_filename, |
|
|
basename, |
|
|
extension, |
|
|
mime_type, |
|
|
size_bytes, |
|
|
sha256, |
|
|
uploaded_at, |
|
|
is_immutable |
|
|
FROM files |
|
|
WHERE tenant_id = %s |
|
|
AND device_id = %s |
|
|
AND is_deleted = 0 |
|
|
AND directory_path = %s |
|
|
ORDER BY uploaded_at DESC, id DESC |
|
|
""", |
|
|
(session["otb_tenant_id"], device_id, current_directory), |
|
|
) |
|
|
files = cur.fetchall() |
|
|
|
|
|
cur.execute( |
|
|
""" |
|
|
SELECT DISTINCT directory_path |
|
|
FROM files |
|
|
WHERE tenant_id = %s |
|
|
AND device_id = %s |
|
|
AND is_deleted = 0 |
|
|
AND directory_path LIKE %s |
|
|
ORDER BY directory_path |
|
|
""", |
|
|
(session["otb_tenant_id"], device_id, f"{current_directory}/%"), |
|
|
) |
|
|
all_dirs = [row["directory_path"] for row in cur.fetchall()] |
|
|
|
|
|
folder_names = [] |
|
|
prefix = current_directory + "/" |
|
|
for d in all_dirs: |
|
|
remainder = d[len(prefix):] if d.startswith(prefix) else "" |
|
|
if not remainder: |
|
|
continue |
|
|
first_segment = remainder.split("/", 1)[0] |
|
|
if first_segment and first_segment not in folder_names: |
|
|
folder_names.append(first_segment) |
|
|
|
|
|
folders = [] |
|
|
for name in sorted(folder_names, key=lambda x: x.lower()): |
|
|
folder_path = f"{current_path}/{name}" if current_path else name |
|
|
folders.append( |
|
|
{ |
|
|
"name": name, |
|
|
"path": folder_path, |
|
|
} |
|
|
) |
|
|
|
|
|
breadcrumbs = [ |
|
|
{ |
|
|
"label": device["device_name"], |
|
|
"path": "", |
|
|
} |
|
|
] |
|
|
|
|
|
if current_path: |
|
|
accum = [] |
|
|
for segment in current_path.split("/"): |
|
|
accum.append(segment) |
|
|
breadcrumbs.append( |
|
|
{ |
|
|
"label": segment, |
|
|
"path": "/".join(accum), |
|
|
} |
|
|
) |
|
|
|
|
|
parent_path = "" |
|
|
if current_path: |
|
|
parts = current_path.split("/") |
|
|
parent_path = "/".join(parts[:-1]) |
|
|
|
|
|
return render_template( |
|
|
"cloud/device_files.html", |
|
|
user_email=session.get("otb_email"), |
|
|
tenant_slug=session.get("otb_tenant_slug"), |
|
|
device=device, |
|
|
files=files, |
|
|
file_count=len(files), |
|
|
view_mode=view_mode, |
|
|
current_path=current_path, |
|
|
parent_path=parent_path, |
|
|
folders=folders, |
|
|
breadcrumbs=breadcrumbs, |
|
|
)
|
|
|
|