diff --git a/PROJECT_STATE.md b/PROJECT_STATE.md index 8813e62..a3f142d 100644 --- a/PROJECT_STATE.md +++ b/PROJECT_STATE.md @@ -1,51 +1,85 @@ # PROJECT_STATE.md Project: OTB Cloud -Version: v1.1.0-alpha3 -Updated: 2026-04-19 +Version: v1.1.0-alpha4 +Updated: 2026-04-20 Location: /opt/otb_cloud ## Current State -OTB Cloud now has a functioning workshop-driven video processing pipeline. +OTB Cloud now has a working multi-profile, multi-GPU video processing pipeline integrated into the tenant storage platform. ### Confirmed Working -- Portal and branded UI shell -- Device browser -- File selection flow into Video Workshop +- Portal-branded OTB Cloud dashboard +- Device creation and browsing +- File-ID based workshop staging +- Device-specific Video Workspace access from dashboard - Video Workshop page -- Enqueue API -- Jobs API -- MariaDB-backed video_jobs integration -- Tenant/device path resolution for queued jobs -- Worker service startup and queue pickup -- Worker-side absolute path resolution from tenant storage_root -- Intel iGPU processing path -- Successful completed output for device 27 (ripper) - -### Latest Proven Result -A queued workshop job for: -- source file: 05142013003.mp4 -- device: 27 (ripper) - -completed successfully with: -- assigned_processor: intel -- status: complete -- progress_percent: 100 -- output_relative_path: - devices/ripper/originals/20260413T210325474049Z__05142013003_processed.mp4 +- Multi-profile processing selection: + - default + - compress + - hq +- Manual rotation override option: + - auto/default behavior when unchecked + - selectable 90 / 180 / 270 override when enabled +- Job queue API and job listing API +- File-ID based source resolution +- Output routing to: + - devices//video/ +- Profile-specific output filenames +- Completed job actions: + - View + - Send to LTS + - Download Output + - Delete +- Failed job delete action +- LTS routing by file type: + - lts/video + - lts/archived + - lts/pictures +- Health page +- Lifetime processing metrics retained after visible job deletion +- Intel + AMD GPU processing both in service +- GPU time accounting active in Health page +- Global video jobs route exists in codebase +- Processed video section exists in device browser flow + +### Processing / GPU Behavior +Current live behavior: +- both GPUs take jobs +- AMD prioritizes heavier / HQ work first +- Intel handles lighter work +- workers continue taking suitable jobs from the queue batch as available + +### Latest Proven Health State +Health page currently shows stable cumulative values including: +- uploaded file counts and space +- LTS counts and space +- archive counts and space +- total jobs +- completed jobs +- failed jobs +- cumulative GPU time that does not zero out when workshop cards are deleted + +### Current Storage Layout +- originals remain in device originals tree +- processed outputs go to: + - devices//video/ +- LTS destinations include: + - lts/video/ + - lts/archived/ + - lts/pictures/ ## Known Remaining Improvements -- Jobs panel is still raw JSON instead of a polished table/cards view -- Failed jobs do not yet surface log_excerpt nicely in UI -- No direct preview/download button for completed outputs in workshop -- No health/storage/GPU dashboard panel yet -- No explicit processor chooser in UI -- Output placement may later deserve a dedicated derived/video output area -- Existing patch helper scripts were moved out of repo to keep git clean +- README is now being realigned to actual live state +- Global video jobs page should be fully wired into UI navigation and polished +- Dashboard template still contains some mixed button class styles that should be normalized +- Health page can be expanded with per-processor breakdown later +- Processing metrics can be refined further into Intel/AMD/CPU buckets if desired +- Output browsing UX can still be improved further with richer previewing and filtering ## Recommended Next Step -Proceed to alpha3-b: -- replace raw JSON jobs output with styled job cards/table -- add output links for completed jobs -- add visible failure details from log_excerpt -- add storage/GPU/worker health panel +Proceed after alpha4 with: +1. global video jobs page polish and filters +2. per-processor GPU metrics split (Intel / AMD / CPU) +3. scheduler documentation and/or scheduler UI visibility +4. processed output browsing improvements in device view diff --git a/README.md b/README.md index eed819e..8a98b89 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,20 @@ # OTB Cloud +## v1.1.0-alpha4 - 2026-04-20 + +- Promoted project state to alpha4 based on current live multi-GPU video pipeline +- Added file-ID based workshop queueing and source resolution +- Added multi-profile processing with distinct output naming +- Added manual rotation override controls in workshop +- Added processed output routing to devices//video/ +- Added completed-job actions for View, Send to LTS, Download Output, and Delete +- Added failed-job delete action +- Added LTS storage routing for video, archived content, and pictures +- Added Health page with persistent cumulative processing and GPU-time tracking +- Confirmed health metrics remain intact even after workshop job cards are deleted +- Dashboard now exposes device-level Video Workspace access +- Dual GPU behavior in active use: AMD prioritizes heavier/HQ work, Intel handles lighter work + ## v1.1.0-alpha3 - 2026-04-19 - Added Video Workshop UI for queued processing diff --git a/VERSION b/VERSION index 51ec1c5..acb6358 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.1.0-alpha3 +v1.1.0-alpha4 diff --git a/app/main/routes.py b/app/main/routes.py index a2a5849..d93b7d8 100644 --- a/app/main/routes.py +++ b/app/main/routes.py @@ -1285,6 +1285,28 @@ def browse_device_files(device_id: int): parts = current_path.split("/") parent_path = "/".join(parts[:-1]) + processed_videos = [] + try: + from pathlib import Path + tenant = session.get("tenant") or "def" + with db.cursor() as cur: + cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + if tenant_row: + storage_root = Path(tenant_row["storage_root"]) + device_root = storage_root / device["relative_path"] + video_dir = device_root / "video" + if video_dir.exists(): + for p in sorted(video_dir.glob("*"), reverse=True): + if p.is_file(): + processed_videos.append({ + "name": p.name, + "relative_path": str(p.relative_to(storage_root)), + "size": p.stat().st_size, + }) + except Exception: + processed_videos = [] + return render_template( "cloud/device_files.html", user_email=session.get("otb_email"), @@ -1623,21 +1645,400 @@ def video_enqueue(): tenant = session.get("tenant") or 'def' device_id = data.get("device_id") files = data.get("files", []) - profile = data.get("profile", "default") + profiles = data.get("profiles", []) + rotation_override = data.get("rotation_override") + + if not profiles: + profiles = ["default"] job_ids = [] for f in files: - job_id = create_video_job( - tenant=tenant, - device_id=device_id, - input_filename=f, - profile=profile - ) - job_ids.append(job_id) + for profile in profiles: + job_id = create_video_job( + tenant=tenant, + device_id=device_id, + source_file_id=f, + profile=profile, + rotation_override=rotation_override + ) + job_ids.append(job_id) return jsonify({"status": "ok", "jobs": job_ids}) + + + + + +@bp.route("/video-jobs") +def global_video_jobs(): + from app.db import get_db + from pathlib import Path + + tenant = session.get("tenant") or "def" + db = get_db() + + with db.cursor() as cur: + cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + + if not tenant_row: + return "Tenant not found", 404 + + tenant_id = tenant_row["id"] + storage_root = Path(tenant_row["storage_root"]) + + with db.cursor() as cur: + cur.execute( + """ + SELECT + vj.id, + vj.device_id, + d.device_name, + vj.source_file_id, + vj.source_relative_path, + vj.source_original_filename, + vj.requested_profile, + vj.requested_rotation_degrees, + vj.status, + vj.progress_percent, + vj.assigned_processor, + vj.output_relative_path, + vj.error_message, + vj.created_at, + vj.started_at, + vj.completed_at, + vj.gpu_seconds + FROM video_jobs vj + LEFT JOIN devices d ON d.id = vj.device_id + WHERE vj.tenant_id = %s + ORDER BY vj.id DESC + LIMIT 300 + """, + (tenant_id,) + ) + rows = cur.fetchall() + + def safe_size(rel_path): + if not rel_path: + return None + p = storage_root / rel_path + try: + if p.exists() and p.is_file(): + return p.stat().st_size + except Exception: + pass + return None + + jobs = [] + for r in rows: + jobs.append({ + "id": r["id"], + "device_id": r["device_id"], + "device_name": r["device_name"] or f"Device {r['device_id']}", + "source_file_id": r["source_file_id"], + "filename": r["source_original_filename"], + "source_relative_path": r["source_relative_path"], + "profile": r["requested_profile"], + "rotation_override": r["requested_rotation_degrees"], + "status": r["status"], + "progress_percent": r["progress_percent"], + "assigned_processor": r["assigned_processor"], + "output_relative_path": r["output_relative_path"], + "error_message": r["error_message"], + "original_size": safe_size(r["source_relative_path"]), + "processed_size": safe_size(r["output_relative_path"]), + "gpu_seconds": r["gpu_seconds"] or 0, + "created_at": str(r["created_at"]) if r["created_at"] else "", + "started_at": str(r["started_at"]) if r["started_at"] else "", + "completed_at": str(r["completed_at"]) if r["completed_at"] else "", + }) + + return render_template("cloud/video_jobs.html", jobs=jobs) + +@bp.route("/health") +def cloud_health(): + from app.db import get_db + from pathlib import Path + + tenant = session.get("tenant") or "def" + db = get_db() + + with db.cursor() as cur: + cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + + if not tenant_row: + return "Tenant not found", 404 + + tenant_id = tenant_row["id"] + storage_root = Path(tenant_row["storage_root"]) + + def scan_dir(rel_path): + p = storage_root / rel_path + count = 0 + total = 0 + if p.exists(): + for f in p.rglob("*"): + if f.is_file(): + count += 1 + total += f.stat().st_size + return count, total + + uploaded_count, uploaded_bytes = scan_dir("devices") + lts_count, lts_bytes = scan_dir("lts") + archive_count, archive_bytes = scan_dir("archive") + + total_used = 0 + if storage_root.exists(): + for f in storage_root.rglob("*"): + if f.is_file(): + total_used += f.stat().st_size + + with db.cursor() as cur: + cur.execute( + """ + SELECT + COALESCE(video_jobs_total,0) AS total_jobs, + COALESCE(video_jobs_complete,0) AS complete_jobs, + COALESCE(video_jobs_failed,0) AS failed_jobs, + COALESCE(gpu_seconds_total,0) AS gpu_seconds + FROM tenant_usage_metrics + WHERE tenant_id = %s + LIMIT 1 + """, + (tenant_id,) + ) + stats = cur.fetchone() or { + "total_jobs": 0, + "complete_jobs": 0, + "failed_jobs": 0, + "gpu_seconds": 0, + } + + def human_bytes(n): + n = int(n or 0) + if n < 1024: + return f"{n} B" + if n < 1024**2: + return f"{n/1024:.1f} KB" + if n < 1024**3: + return f"{n/1024**2:.2f} MB" + return f"{n/1024**3:.2f} GB" + + def human_seconds(n): + n = int(n or 0) + h = n // 3600 + m = (n % 3600) // 60 + s = n % 60 + parts = [] + if h: + parts.append(f"{h}h") + if m: + parts.append(f"{m}m") + parts.append(f"{s}s") + return " ".join(parts) + + return render_template( + "cloud/health.html", + uploaded_count=uploaded_count, + uploaded_bytes=human_bytes(uploaded_bytes), + lts_count=lts_count, + lts_bytes=human_bytes(lts_bytes), + archive_count=archive_count, + archive_bytes=human_bytes(archive_bytes), + total_used=human_bytes(total_used), + total_jobs=stats["total_jobs"] or 0, + complete_jobs=stats["complete_jobs"] or 0, + failed_jobs=stats["failed_jobs"] or 0, + gpu_time=human_seconds(stats["gpu_seconds"] or 0), + ) + + +@bp.route("/video-output//view") +def view_video_output(job_id): + from app.db import get_db + from pathlib import Path + + tenant = session.get("tenant") or "def" + db = get_db() + + with db.cursor() as cur: + cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + if not tenant_row: + return "Tenant not found", 404 + + tenant_id = tenant_row["id"] + storage_root = tenant_row["storage_root"] + + cur.execute( + """ + SELECT output_relative_path + FROM video_jobs + WHERE id = %s AND tenant_id = %s + LIMIT 1 + """, + (job_id, tenant_id) + ) + job = cur.fetchone() + + if not job or not job["output_relative_path"]: + return "No output file for this job", 404 + + full_path = Path(storage_root) / job["output_relative_path"] + if not full_path.exists(): + return "Output file missing on disk", 404 + + return send_file(full_path, as_attachment=False) + +@bp.route("/video-output//send-to-lts", methods=["POST"]) +def send_video_output_to_lts(job_id): + from app.db import get_db + from pathlib import Path + import shutil + + tenant = session.get("tenant") or "def" + db = get_db() + + with db.cursor() as cur: + cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + if not tenant_row: + return jsonify({"ok": False, "error": "Tenant not found"}), 404 + + tenant_id = tenant_row["id"] + storage_root = Path(tenant_row["storage_root"]) + + cur.execute( + """ + SELECT id, output_relative_path + FROM video_jobs + WHERE id = %s AND tenant_id = %s + LIMIT 1 + """, + (job_id, tenant_id) + ) + job = cur.fetchone() + + if not job or not job["output_relative_path"]: + return jsonify({"ok": False, "error": "Job output not found"}), 404 + + src = storage_root / job["output_relative_path"] + if not src.exists(): + return jsonify({"ok": False, "error": "Output file missing on disk"}), 404 + + ext = src.suffix.lower() + if ext in [".mp4", ".mov", ".mkv", ".webm", ".avi"]: + lts_rel_dir = Path("lts") / "video" + elif ext in [".zip", ".tar", ".gz", ".7z", ".rar"]: + lts_rel_dir = Path("lts") / "archived" + elif ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"]: + lts_rel_dir = Path("lts") / "pictures" + else: + lts_rel_dir = Path("lts") / "archived" + + lts_dir = storage_root / lts_rel_dir + lts_dir.mkdir(parents=True, exist_ok=True) + + dest = lts_dir / src.name + if dest.exists(): + stem = dest.stem + suffix = dest.suffix + n = 2 + while True: + candidate = lts_dir / f"{stem}-{n}{suffix}" + if not candidate.exists(): + dest = candidate + break + n += 1 + + shutil.move(str(src), str(dest)) + + with db.cursor() as cur: + cur.execute( + """ + UPDATE video_jobs + SET output_relative_path = %s + WHERE id = %s AND tenant_id = %s + """, + (str(dest.relative_to(storage_root)), job_id, tenant_id) + ) + db.commit() + + return jsonify({"ok": True, "output_relative_path": str(dest.relative_to(storage_root))}) + +@bp.route("/video-output//download") +def download_video_output(job_id): + from app.db import get_db + + tenant = session.get("tenant") or "def" + db = get_db() + + with db.cursor() as cur: + cur.execute("SELECT id, storage_root FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + if not tenant_row: + return "Tenant not found", 404 + + tenant_id = tenant_row["id"] + storage_root = tenant_row["storage_root"] + + cur.execute( + """ + SELECT output_relative_path, source_original_filename, status + FROM video_jobs + WHERE id = %s AND tenant_id = %s + LIMIT 1 + """, + (job_id, tenant_id) + ) + job = cur.fetchone() + + if not job: + return "Job not found", 404 + + if not job["output_relative_path"]: + return "No output file for this job", 404 + + from pathlib import Path + full_path = Path(storage_root) / job["output_relative_path"] + + if not full_path.exists(): + return "Output file missing on disk", 404 + + download_name = Path(job["output_relative_path"]).name + return send_file(full_path, as_attachment=True, download_name=download_name) + + +@bp.route("/api/video/jobs//delete", methods=["POST"]) +def video_job_delete(job_id): + from app.db import get_db + + tenant = session.get("tenant") or "def" + db = get_db() + + with db.cursor() as cur: + cur.execute("SELECT id FROM tenants WHERE slug = %s LIMIT 1", (tenant,)) + tenant_row = cur.fetchone() + if not tenant_row: + return jsonify({"ok": False, "error": "tenant not found"}), 404 + + tenant_id = tenant_row["id"] + + cur.execute( + "DELETE FROM video_jobs WHERE id = %s AND tenant_id = %s", + (job_id, tenant_id) + ) + deleted = cur.rowcount + db.commit() + + if not deleted: + return jsonify({"ok": False, "error": "job not found"}), 404 + + return jsonify({"ok": True, "deleted_id": job_id}) + @bp.route("/api/video/jobs") def video_jobs(): tenant = session.get("tenant") or 'def' diff --git a/app/services/video_jobs.py b/app/services/video_jobs.py index 0d56f62..81b5084 100644 --- a/app/services/video_jobs.py +++ b/app/services/video_jobs.py @@ -12,61 +12,106 @@ def get_tenant_row(db, tenant): return None return row -def get_device_row(db, device_id): - cur = db.cursor() - cur.execute( - "SELECT id, device_name, relative_path FROM devices WHERE id = %s LIMIT 1", - (device_id,) - ) - row = cur.fetchone() - if not row: - return None - return row +def _current_db_name(db): + with db.cursor() as cur: + cur.execute("SELECT DATABASE() AS dbname") + row = cur.fetchone() + return row["dbname"] + +def _table_exists(db, table_name): + dbname = _current_db_name(db) + with db.cursor() as cur: + cur.execute( + """ + SELECT COUNT(*) AS c + FROM information_schema.tables + WHERE table_schema = %s AND table_name = %s + """, + (dbname, table_name) + ) + row = cur.fetchone() + return int(row["c"]) > 0 + +def _table_columns(db, table_name): + dbname = _current_db_name(db) + with db.cursor() as cur: + cur.execute( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + """, + (dbname, table_name) + ) + rows = cur.fetchall() + return {r["column_name"] for r in rows} -def resolve_source_relative_path(storage_root, device_relative_path, input_filename): - base = Path(storage_root) / device_relative_path - if not base.exists(): - raise FileNotFoundError(f"Device base path not found: {base}") +def _pick(cols, *names): + for n in names: + if n in cols: + return n + return None - candidates = [] +def resolve_source_from_file_id(db, tenant_id, device_id, source_file_id): + candidate_tables = ["files", "device_files", "uploaded_files"] - for p in base.rglob("*"): - if not p.is_file(): + for table in candidate_tables: + if not _table_exists(db, table): continue - name = p.name - if name == input_filename or name.endswith("__" + input_filename): - candidates.append(p) - if not candidates: - raise FileNotFoundError( - f"Could not locate source file for {input_filename} under {base}" - ) + cols = _table_columns(db, table) + + id_col = _pick(cols, "id") + rel_col = _pick(cols, "relative_path", "source_relative_path", "path", "storage_relative_path") + orig_col = _pick(cols, "original_filename", "filename", "display_filename", "basename") + tenant_col = _pick(cols, "tenant_id") + device_col = _pick(cols, "device_id") + + if not id_col or not rel_col or not orig_col: + continue + + sql = f"SELECT {id_col} AS id, {rel_col} AS rel_path, {orig_col} AS orig_name FROM {table} WHERE {id_col} = %s" + args = [source_file_id] + + if tenant_col: + sql += f" AND {tenant_col} = %s" + args.append(tenant_id) + + if device_col: + sql += f" AND {device_col} = %s" + args.append(device_id) - candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True) - chosen = candidates[0] + sql += " LIMIT 1" - rel = chosen.relative_to(Path(storage_root)) - return str(rel) + with db.cursor() as cur: + cur.execute(sql, tuple(args)) + row = cur.fetchone() -def create_video_job(tenant, device_id, input_filename, profile="default"): + if row: + return { + "source_relative_path": row["rel_path"], + "source_original_filename": row["orig_name"], + } + + raise RuntimeError( + f"Could not resolve file metadata for source_file_id={source_file_id}. " + f"Tried tables: files, device_files, uploaded_files" + ) + +def create_video_job(tenant, device_id, source_file_id, profile="default", rotation_override=None): db = get_db() tenant_row = get_tenant_row(db, tenant) if not tenant_row: raise Exception(f"Tenant not found: {tenant}") - device_row = get_device_row(db, device_id) - if not device_row: - raise Exception(f"Device not found: {device_id}") - tenant_id = tenant_row["id"] - storage_root = tenant_row["storage_root"] - device_relative_path = device_row["relative_path"] - source_relative_path = resolve_source_relative_path( - storage_root, - device_relative_path, - input_filename + file_meta = resolve_source_from_file_id( + db=db, + tenant_id=tenant_id, + device_id=device_id, + source_file_id=int(source_file_id), ) cur = db.cursor() @@ -79,16 +124,36 @@ def create_video_job(tenant, device_id, input_filename, profile="default"): source_relative_path, source_original_filename, requested_profile, + requested_rotation_degrees, requested_gpu_preference, status, progress_percent - ) VALUES (%s, %s, NULL, %s, %s, %s, 'auto', 'queued', 0) + ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'auto', 'queued', 0) """, - (tenant_id, device_id, source_relative_path, input_filename, profile) + ( + tenant_id, + device_id, + int(source_file_id), + file_meta["source_relative_path"], + file_meta["source_original_filename"], + profile, + rotation_override, + ) ) db.commit() return cur.lastrowid +def _safe_size(storage_root, rel_path): + if not rel_path: + return None + try: + p = Path(storage_root) / rel_path + if p.exists() and p.is_file(): + return p.stat().st_size + except Exception: + return None + return None + def list_jobs_for_tenant(tenant): db = get_db() @@ -97,6 +162,7 @@ def list_jobs_for_tenant(tenant): return [] tenant_id = tenant_row["id"] + storage_root = tenant_row["storage_root"] cur = db.cursor() cur.execute( @@ -104,8 +170,11 @@ def list_jobs_for_tenant(tenant): SELECT id, device_id, + source_file_id, + source_relative_path, source_original_filename, requested_profile, + requested_rotation_degrees, status, progress_percent, assigned_processor, @@ -126,16 +195,23 @@ def list_jobs_for_tenant(tenant): out = [] for r in rows: + original_size = _safe_size(storage_root, r["source_relative_path"]) + processed_size = _safe_size(storage_root, r["output_relative_path"]) + out.append({ "id": r["id"], "device_id": r["device_id"], + "source_file_id": r["source_file_id"], "filename": r["source_original_filename"], "profile": r["requested_profile"], + "rotation_override": r["requested_rotation_degrees"], "status": r["status"], "progress_percent": r["progress_percent"], "assigned_processor": r["assigned_processor"], "output_relative_path": r["output_relative_path"], "error_message": r["error_message"], + "original_size": original_size, + "processed_size": processed_size, "created_at": str(r["created_at"]) if r["created_at"] is not None else None, "started_at": str(r["started_at"]) if r["started_at"] is not None else None, "completed_at": str(r["completed_at"]) if r["completed_at"] is not None else None, diff --git a/app/services/video_worker.py b/app/services/video_worker.py index 56a2089..4273b92 100644 --- a/app/services/video_worker.py +++ b/app/services/video_worker.py @@ -1,17 +1,58 @@ import time import subprocess +import json +import threading +import re from datetime import datetime from pathlib import Path from app import create_app from app.db import get_db -from app.services.gpu_select import select_processor, release INTEL_DEV = "/dev/dri/renderD129" AMD_DEV = "/dev/dri/renderD128" -def run_ffmpeg(cmd): - return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) +def run_ffprobe_json(src): + cmd = [ + "ffprobe", + "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=duration:stream_tags=rotate", + "-of", "json", + src, + ] + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + return {} + try: + return json.loads(result.stdout or "{}") + except Exception: + return {} + +def get_rotation_degrees(src): + data = run_ffprobe_json(src) + streams = data.get("streams", []) + if not streams: + return 0 + tags = streams[0].get("tags", {}) or {} + rotate_tag = tags.get("rotate") + if rotate_tag is not None: + try: + return int(rotate_tag) % 360 + except Exception: + pass + return 0 + +def get_duration_seconds(src): + data = run_ffprobe_json(src) + streams = data.get("streams", []) + if not streams: + return None + duration = streams[0].get("duration") + try: + return float(duration) + except Exception: + return None def build_absolute_source_path(db, job): with db.cursor() as cur: @@ -24,69 +65,215 @@ def build_absolute_source_path(db, job): if not tenant_row: raise RuntimeError(f"Tenant id {job['tenant_id']} not found") - storage_root = tenant_row["storage_root"] - return str(Path(storage_root) / job["source_relative_path"]) + return str(Path(tenant_row["storage_root"]) / job["source_relative_path"]) + +def build_absolute_output_path(db, job, src): + with db.cursor() as cur: + cur.execute( + "SELECT storage_root FROM tenants WHERE id = %s", + (job["tenant_id"],) + ) + tenant_row = cur.fetchone() + + cur.execute( + "SELECT relative_path FROM devices WHERE id = %s", + (job["device_id"],) + ) + device_row = cur.fetchone() + + if not tenant_row: + raise RuntimeError(f"Tenant id {job['tenant_id']} not found") + if not device_row: + raise RuntimeError(f"Device id {job['device_id']} not found") + + storage_root = Path(tenant_row["storage_root"]) + device_relative_path = Path(device_row["relative_path"]) + out_dir = storage_root / device_relative_path / "video" + out_dir.mkdir(parents=True, exist_ok=True) + + profile = (job.get("requested_profile") or "default").lower() + out_name = Path(src).stem + f"_{profile}_processed.mp4" + return str(out_dir / out_name) + +def to_relative_output_path(db, job, absolute_output): + with db.cursor() as cur: + cur.execute( + "SELECT storage_root FROM tenants WHERE id = %s", + (job["tenant_id"],) + ) + tenant_row = cur.fetchone() + + if not tenant_row: + return absolute_output + + try: + return str(Path(absolute_output).relative_to(Path(tenant_row["storage_root"]))) + except Exception: + return absolute_output + +def build_profile_settings(profile): + profile = (profile or "default").lower() + + if profile == "compress": + return {"width": 720, "height": 1280, "va_bitrate": "1200k", "va_maxrate": "1400k", "va_bufsize": "2400k", "crf": "30"} + elif profile == "hq": + return {"width": 1080, "height": 1920, "va_bitrate": "4500k", "va_maxrate": "5000k", "va_bufsize": "9000k", "crf": "18"} + else: + return {"width": 900, "height": 1600, "va_bitrate": "2500k", "va_maxrate": "3000k", "va_bufsize": "5000k", "crf": "23"} + +def build_filter_chain(src, settings, rotation_override=None, use_vaapi=True): + rotation = rotation_override if rotation_override in (90, 180, 270) else get_rotation_degrees(src) + filters = [] + + if rotation == 90: + filters.append("transpose=1") + elif rotation == 270: + filters.append("transpose=2") + elif rotation == 180: + filters.append("hflip,vflip") + + if use_vaapi: + filters.append("format=nv12") + filters.append("hwupload") + filters.append(f"scale_vaapi=w={settings['width']}:h={settings['height']}:force_original_aspect_ratio=decrease") + else: + filters.append(f"scale={settings['width']}:{settings['height']}:force_original_aspect_ratio=decrease") + + return ",".join(filters) + +def ensure_metrics_row(db, tenant_id): + with db.cursor() as cur: + cur.execute( + """ + INSERT INTO tenant_usage_metrics (tenant_id) + VALUES (%s) + ON DUPLICATE KEY UPDATE tenant_id = tenant_id + """, + (tenant_id,) + ) + db.commit() + +def bump_metrics(db, tenant_id, complete=False, failed=False, gpu_seconds=0): + ensure_metrics_row(db, tenant_id) + + with db.cursor() as cur: + cur.execute( + """ + UPDATE tenant_usage_metrics + SET video_jobs_total = video_jobs_total + 1, + video_jobs_complete = video_jobs_complete + %s, + video_jobs_failed = video_jobs_failed + %s, + gpu_seconds_total = gpu_seconds_total + %s + WHERE tenant_id = %s + """, + ( + 1 if complete else 0, + 1 if failed else 0, + int(gpu_seconds or 0), + tenant_id, + ), + ) + db.commit() + +def update_progress(db, job_id, percent): + percent = max(5, min(99, int(percent))) + with db.cursor() as cur: + cur.execute( + "UPDATE video_jobs SET progress_percent = %s WHERE id = %s", + (percent, job_id), + ) + db.commit() + +def run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds): + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + log_lines = [] + time_re = re.compile(r"time=(\d{2}):(\d{2}):(\d{2}(?:\.\d+)?)") + + for line in process.stdout: + log_lines.append(line.rstrip()) + if len(log_lines) > 200: + log_lines = log_lines[-200:] + + if duration_seconds and duration_seconds > 0: + m = time_re.search(line) + if m: + hh = int(m.group(1)) + mm = int(m.group(2)) + ss = float(m.group(3)) + current = hh * 3600 + mm * 60 + ss + percent = (current / duration_seconds) * 100.0 + update_progress(db, job_id, percent) + + process.wait() + return process.returncode, "\n".join(log_lines[-120:]) -def process_job(db, job): +def process_job(db, job, processor): job_id = job["id"] src = build_absolute_source_path(db, job) profile = job["requested_profile"] + rotation_override = job.get("requested_rotation_degrees") - processor = select_processor() device = INTEL_DEV if processor == "intel" else AMD_DEV - - output = str(Path(src).with_name(Path(src).stem + "_processed.mp4")) - - if profile == "portrait_web": - vf = "format=nv12,hwupload,scale_vaapi=w=720:h=1280:force_original_aspect_ratio=decrease" - else: - vf = "format=nv12,hwupload,scale_vaapi=w=1280:h=720:force_original_aspect_ratio=decrease" + output = build_absolute_output_path(db, job, src) + settings = build_profile_settings(profile) + duration_seconds = get_duration_seconds(src) if processor in ("intel", "amd"): + vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=True) cmd = [ "ffmpeg", "-hide_banner", "-y", + "-noautorotate", + "-fflags", "+genpts", "-vaapi_device", device, "-i", src, "-vf", vf, "-c:v", "h264_vaapi", - "-b:v", "3M", - "-maxrate", "3M", - "-bufsize", "6M", - "-c:a", "aac", "-b:a", "128k", + "-b:v", settings["va_bitrate"], + "-maxrate", settings["va_maxrate"], + "-bufsize", settings["va_bufsize"], + "-c:a", "aac", + "-b:a", "160k", + "-ac", "2", + "-ar", "48000", + "-movflags", "+faststart", output ] else: + vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=False) cmd = [ "ffmpeg", "-hide_banner", "-y", + "-noautorotate", + "-fflags", "+genpts", "-i", src, + "-vf", vf, "-c:v", "libx264", "-preset", "medium", - "-crf", "23", - "-c:a", "aac", "-b:a", "128k", + "-crf", settings["crf"], + "-c:a", "aac", + "-b:a", "160k", + "-ac", "2", + "-ar", "48000", + "-movflags", "+faststart", output ] start = datetime.utcnow() try: - result = run_ffmpeg(cmd) + returncode, log_excerpt = run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds) end = datetime.utcnow() + gpu_seconds = max(0, int((end - start).total_seconds())) with db.cursor() as cur: - if result.returncode == 0: - rel_output = None - try: - with db.cursor() as cur2: - cur2.execute( - "SELECT storage_root FROM tenants WHERE id = %s", - (job["tenant_id"],) - ) - tenant_row = cur2.fetchone() - if tenant_row: - rel_output = str(Path(output).relative_to(Path(tenant_row["storage_root"]))) - except Exception: - rel_output = output - + if returncode == 0: + rel_output = to_relative_output_path(db, job, output) cur.execute( """ UPDATE video_jobs @@ -94,6 +281,7 @@ def process_job(db, job): assigned_processor=%s, output_relative_path=%s, progress_percent=100, + gpu_seconds=%s, started_at=COALESCE(started_at, %s), completed_at=%s, log_excerpt=%s, @@ -102,31 +290,40 @@ def process_job(db, job): """, ( processor, - rel_output or output, + rel_output, + gpu_seconds, start, end, - (result.stderr or "")[:1000], + log_excerpt[:4000], job_id, ), ) + db.commit() + bump_metrics(db, job["tenant_id"], complete=True, failed=False, gpu_seconds=gpu_seconds) else: - cur.execute( - """ - UPDATE video_jobs - SET status='failed', - error_message=%s, - log_excerpt=%s, - completed_at=%s - WHERE id=%s - """, - ( - "ffmpeg failed", - (result.stderr or "")[:4000], - end, - job_id, - ), - ) - db.commit() + with db.cursor() as cur2: + cur2.execute( + """ + UPDATE video_jobs + SET status='failed', + assigned_processor=%s, + gpu_seconds=%s, + error_message=%s, + log_excerpt=%s, + completed_at=%s + WHERE id=%s + """, + ( + processor, + gpu_seconds, + "ffmpeg failed", + log_excerpt[:4000], + end, + job_id, + ), + ) + db.commit() + bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=gpu_seconds) except Exception as e: with db.cursor() as cur: @@ -134,64 +331,122 @@ def process_job(db, job): """ UPDATE video_jobs SET status='failed', + assigned_processor=%s, error_message=%s, completed_at=UTC_TIMESTAMP() WHERE id=%s """, - (str(e)[:1000], job_id), + (processor, str(e)[:1000], job_id), ) db.commit() - finally: - if processor in ("intel", "amd"): - release(processor) + bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=0) -def run_worker(): - app = create_app() +def claim_next_job(db, processor): + """ + Intel: prefer default/compress, then anything. + AMD: prefer hq, then anything. + """ + preferred_profile = "hq" if processor == "amd" else None + + with db.cursor() as cur: + cur.execute("START TRANSACTION") + + job = None + + if preferred_profile: + cur.execute( + """ + SELECT * + FROM video_jobs + WHERE status='queued' AND requested_profile = %s + ORDER BY id ASC + LIMIT 1 + FOR UPDATE + """, + (preferred_profile,), + ) + job = cur.fetchone() + + if not job: + if processor == "intel": + cur.execute( + """ + SELECT * + FROM video_jobs + WHERE status='queued' AND requested_profile IN ('default','compress') + ORDER BY id ASC + LIMIT 1 + FOR UPDATE + """ + ) + job = cur.fetchone() + + if not job: + cur.execute( + """ + SELECT * + FROM video_jobs + WHERE status='queued' + ORDER BY id ASC + LIMIT 1 + FOR UPDATE + """ + ) + job = cur.fetchone() + + if not job: + db.rollback() + return None + cur.execute( + """ + UPDATE video_jobs + SET status='processing', + assigned_processor=%s, + started_at=COALESCE(started_at, UTC_TIMESTAMP()), + progress_percent=5 + WHERE id=%s + """, + (processor, job["id"]), + ) + db.commit() + job["assigned_processor"] = processor + return job + +def worker_loop(app, processor): with app.app_context(): - print("video worker started", flush=True) + print(f"{processor} worker started", flush=True) while True: try: db = get_db() - try: db.rollback() except Exception: pass - with db.cursor() as cur: - cur.execute( - """ - SELECT * - FROM video_jobs - WHERE status='queued' - ORDER BY id ASC - LIMIT 1 - """ - ) - job = cur.fetchone() - - if job: - print( - f"worker picked job id={job['id']} source={job['source_relative_path']}", - flush=True - ) - cur.execute( - """ - UPDATE video_jobs - SET status='processing', - started_at=COALESCE(started_at, UTC_TIMESTAMP()), - progress_percent=5 - WHERE id=%s - """, - (job["id"],), - ) - db.commit() - - process_job(db, job) + job = claim_next_job(db, processor) + + if job: + print(f"{processor} worker picked job id={job['id']} source={job['source_relative_path']}", flush=True) + process_job(db, job, processor) + else: + time.sleep(2) except Exception as e: - print(f"worker loop error: {e}", flush=True) + print(f"{processor} worker loop error: {e}", flush=True) + time.sleep(2) + +def run_worker(): + app = create_app() + + threads = [ + threading.Thread(target=worker_loop, args=(app, "intel"), daemon=True), + threading.Thread(target=worker_loop, args=(app, "amd"), daemon=True), + ] + + for t in threads: + t.start() - time.sleep(5) + while True: + time.sleep(60) diff --git a/app/templates/cloud/dashboard.html b/app/templates/cloud/dashboard.html index d809a8c..6699257 100644 --- a/app/templates/cloud/dashboard.html +++ b/app/templates/cloud/dashboard.html @@ -17,6 +17,7 @@ Add Device Add Android Device Archive Workspace + Video Jobs Deleted Files Back to Services Logout @@ -70,6 +71,7 @@ APK Upload Only {% endif %} Browse Files + Video Workspace
diff --git a/app/templates/cloud/device_files.html b/app/templates/cloud/device_files.html index b6eee2c..e5770ae 100644 --- a/app/templates/cloud/device_files.html +++ b/app/templates/cloud/device_files.html @@ -346,7 +346,7 @@ {% set is_image = (file.mime_type and file.mime_type.startswith('image/')) or ext in ['png', 'jpg', 'jpeg', 'webp', 'gif', 'bmp'] %}