import time import subprocess import json import threading import re from datetime import datetime from pathlib import Path from app import create_app from app.db import get_db INTEL_DEV = "/dev/dri/renderD129" AMD_DEV = "/dev/dri/renderD128" def run_ffprobe_json(src): cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=duration:stream_tags=rotate", "-of", "json", src, ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: return {} try: return json.loads(result.stdout or "{}") except Exception: return {} def get_rotation_degrees(src): data = run_ffprobe_json(src) streams = data.get("streams", []) if not streams: return 0 tags = streams[0].get("tags", {}) or {} rotate_tag = tags.get("rotate") if rotate_tag is not None: try: return int(rotate_tag) % 360 except Exception: pass return 0 def get_duration_seconds(src): data = run_ffprobe_json(src) streams = data.get("streams", []) if not streams: return None duration = streams[0].get("duration") try: return float(duration) except Exception: return None def build_absolute_source_path(db, job): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() if not tenant_row: raise RuntimeError(f"Tenant id {job['tenant_id']} not found") return str(Path(tenant_row["storage_root"]) / job["source_relative_path"]) def build_absolute_output_path(db, job, src): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() cur.execute( "SELECT relative_path FROM devices WHERE id = %s", (job["device_id"],) ) device_row = cur.fetchone() if not tenant_row: raise RuntimeError(f"Tenant id {job['tenant_id']} not found") if not device_row: raise RuntimeError(f"Device id {job['device_id']} not found") storage_root = Path(tenant_row["storage_root"]) device_relative_path = Path(device_row["relative_path"]) out_dir = storage_root / device_relative_path / "video" out_dir.mkdir(parents=True, exist_ok=True) profile = (job.get("requested_profile") or "default").lower() out_name = Path(src).stem + f"_{profile}_processed.mp4" return str(out_dir / out_name) def to_relative_output_path(db, job, absolute_output): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() if not tenant_row: return absolute_output try: return str(Path(absolute_output).relative_to(Path(tenant_row["storage_root"]))) except Exception: return absolute_output def build_profile_settings(profile): profile = (profile or "default").lower() if profile == "compress": return {"width": 720, "height": 1280, "va_bitrate": "1200k", "va_maxrate": "1400k", "va_bufsize": "2400k", "crf": "30"} elif profile == "hq": return {"width": 1080, "height": 1920, "va_bitrate": "4500k", "va_maxrate": "5000k", "va_bufsize": "9000k", "crf": "18"} else: return {"width": 900, "height": 1600, "va_bitrate": "2500k", "va_maxrate": "3000k", "va_bufsize": "5000k", "crf": "23"} def build_filter_chain(src, settings, rotation_override=None, use_vaapi=True): rotation = rotation_override if rotation_override in (90, 180, 270) else get_rotation_degrees(src) filters = [] if rotation == 90: filters.append("transpose=1") elif rotation == 270: filters.append("transpose=2") elif rotation == 180: filters.append("hflip,vflip") if use_vaapi: filters.append("format=nv12") filters.append("hwupload") filters.append(f"scale_vaapi=w={settings['width']}:h={settings['height']}:force_original_aspect_ratio=decrease") else: filters.append(f"scale={settings['width']}:{settings['height']}:force_original_aspect_ratio=decrease") return ",".join(filters) def ensure_metrics_row(db, tenant_id): with db.cursor() as cur: cur.execute( """ INSERT INTO tenant_usage_metrics (tenant_id) VALUES (%s) ON DUPLICATE KEY UPDATE tenant_id = tenant_id """, (tenant_id,) ) db.commit() def bump_metrics(db, tenant_id, complete=False, failed=False, gpu_seconds=0): ensure_metrics_row(db, tenant_id) with db.cursor() as cur: cur.execute( """ UPDATE tenant_usage_metrics SET video_jobs_total = video_jobs_total + 1, video_jobs_complete = video_jobs_complete + %s, video_jobs_failed = video_jobs_failed + %s, gpu_seconds_total = gpu_seconds_total + %s WHERE tenant_id = %s """, ( 1 if complete else 0, 1 if failed else 0, int(gpu_seconds or 0), tenant_id, ), ) db.commit() def update_progress(db, job_id, percent): percent = max(5, min(99, int(percent))) with db.cursor() as cur: cur.execute( "UPDATE video_jobs SET progress_percent = %s WHERE id = %s", (percent, job_id), ) db.commit() def run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds): process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) log_lines = [] time_re = re.compile(r"time=(\d{2}):(\d{2}):(\d{2}(?:\.\d+)?)") for line in process.stdout: log_lines.append(line.rstrip()) if len(log_lines) > 200: log_lines = log_lines[-200:] if duration_seconds and duration_seconds > 0: m = time_re.search(line) if m: hh = int(m.group(1)) mm = int(m.group(2)) ss = float(m.group(3)) current = hh * 3600 + mm * 60 + ss percent = (current / duration_seconds) * 100.0 update_progress(db, job_id, percent) process.wait() return process.returncode, "\n".join(log_lines[-120:]) def process_job(db, job, processor): job_id = job["id"] src = build_absolute_source_path(db, job) profile = job["requested_profile"] rotation_override = job.get("requested_rotation_degrees") device = INTEL_DEV if processor == "intel" else AMD_DEV output = build_absolute_output_path(db, job, src) settings = build_profile_settings(profile) duration_seconds = get_duration_seconds(src) if processor in ("intel", "amd"): vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=True) cmd = [ "ffmpeg", "-hide_banner", "-y", "-noautorotate", "-fflags", "+genpts", "-vaapi_device", device, "-i", src, "-vf", vf, "-c:v", "h264_vaapi", "-b:v", settings["va_bitrate"], "-maxrate", settings["va_maxrate"], "-bufsize", settings["va_bufsize"], "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "48000", "-movflags", "+faststart", output ] else: vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=False) cmd = [ "ffmpeg", "-hide_banner", "-y", "-noautorotate", "-fflags", "+genpts", "-i", src, "-vf", vf, "-c:v", "libx264", "-preset", "medium", "-crf", settings["crf"], "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "48000", "-movflags", "+faststart", output ] start = datetime.utcnow() try: returncode, log_excerpt = run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds) end = datetime.utcnow() gpu_seconds = max(0, int((end - start).total_seconds())) with db.cursor() as cur: if returncode == 0: rel_output = to_relative_output_path(db, job, output) cur.execute( """ UPDATE video_jobs SET status='complete', assigned_processor=%s, output_relative_path=%s, progress_percent=100, gpu_seconds=%s, started_at=COALESCE(started_at, %s), completed_at=%s, log_excerpt=%s, error_message=NULL WHERE id=%s """, ( processor, rel_output, gpu_seconds, start, end, log_excerpt[:4000], job_id, ), ) db.commit() bump_metrics(db, job["tenant_id"], complete=True, failed=False, gpu_seconds=gpu_seconds) else: with db.cursor() as cur2: cur2.execute( """ UPDATE video_jobs SET status='failed', assigned_processor=%s, gpu_seconds=%s, error_message=%s, log_excerpt=%s, completed_at=%s WHERE id=%s """, ( processor, gpu_seconds, "ffmpeg failed", log_excerpt[:4000], end, job_id, ), ) db.commit() bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=gpu_seconds) except Exception as e: with db.cursor() as cur: cur.execute( """ UPDATE video_jobs SET status='failed', assigned_processor=%s, error_message=%s, completed_at=UTC_TIMESTAMP() WHERE id=%s """, (processor, str(e)[:1000], job_id), ) db.commit() bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=0) def claim_next_job(db, processor): """ Intel: prefer default/compress, then anything. AMD: prefer hq, then anything. """ preferred_profile = "hq" if processor == "amd" else None with db.cursor() as cur: cur.execute("START TRANSACTION") job = None if preferred_profile: cur.execute( """ SELECT * FROM video_jobs WHERE status='queued' AND requested_profile = %s ORDER BY id ASC LIMIT 1 FOR UPDATE """, (preferred_profile,), ) job = cur.fetchone() if not job: if processor == "intel": cur.execute( """ SELECT * FROM video_jobs WHERE status='queued' AND requested_profile IN ('default','compress') ORDER BY id ASC LIMIT 1 FOR UPDATE """ ) job = cur.fetchone() if not job: cur.execute( """ SELECT * FROM video_jobs WHERE status='queued' ORDER BY id ASC LIMIT 1 FOR UPDATE """ ) job = cur.fetchone() if not job: db.rollback() return None cur.execute( """ UPDATE video_jobs SET status='processing', assigned_processor=%s, started_at=COALESCE(started_at, UTC_TIMESTAMP()), progress_percent=5 WHERE id=%s """, (processor, job["id"]), ) db.commit() job["assigned_processor"] = processor return job def worker_loop(app, processor): with app.app_context(): print(f"{processor} worker started", flush=True) while True: try: db = get_db() try: db.rollback() except Exception: pass job = claim_next_job(db, processor) if job: print(f"{processor} worker picked job id={job['id']} source={job['source_relative_path']}", flush=True) process_job(db, job, processor) else: time.sleep(2) except Exception as e: print(f"{processor} worker loop error: {e}", flush=True) time.sleep(2) def run_worker(): app = create_app() threads = [ threading.Thread(target=worker_loop, args=(app, "intel"), daemon=True), threading.Thread(target=worker_loop, args=(app, "amd"), daemon=True), ] for t in threads: t.start() while True: time.sleep(60)