import time import subprocess import json import threading import re from datetime import datetime from pathlib import Path from app import create_app from app.db import get_db INTEL_DEV = "/dev/dri/renderD129" AMD_DEV = "/dev/dri/renderD128" def run_ffprobe_json(src): cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=duration:stream_tags=rotate", "-of", "json", src, ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: return {} try: return json.loads(result.stdout or "{}") except Exception: return {} def get_rotation_degrees(src): data = run_ffprobe_json(src) streams = data.get("streams", []) if not streams: return 0 tags = streams[0].get("tags", {}) or {} rotate_tag = tags.get("rotate") if rotate_tag is not None: try: return int(rotate_tag) % 360 except Exception: pass return 0 def get_duration_seconds(src): data = run_ffprobe_json(src) streams = data.get("streams", []) if not streams: return None duration = streams[0].get("duration") try: return float(duration) except Exception: return None def build_absolute_source_path(db, job): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() if not tenant_row: raise RuntimeError(f"Tenant id {job['tenant_id']} not found") return str(Path(tenant_row["storage_root"]) / job["source_relative_path"]) def build_absolute_output_path(db, job, src): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() cur.execute( "SELECT relative_path FROM devices WHERE id = %s", (job["device_id"],) ) device_row = cur.fetchone() if not tenant_row: raise RuntimeError(f"Tenant id {job['tenant_id']} not found") if not device_row: raise RuntimeError(f"Device id {job['device_id']} not found") storage_root = Path(tenant_row["storage_root"]) device_relative_path = Path(device_row["relative_path"]) out_dir = storage_root / device_relative_path / "video" out_dir.mkdir(parents=True, exist_ok=True) profile = (job.get("requested_profile") or "default").lower() out_name = Path(src).stem + f"_{profile}_processed.mp4" return str(out_dir / out_name) def to_relative_output_path(db, job, absolute_output): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() if not tenant_row: return absolute_output try: return str(Path(absolute_output).relative_to(Path(tenant_row["storage_root"]))) except Exception: return absolute_output def build_profile_settings(profile): profile = (profile or "default").lower() if profile == "compress": return {"width": 720, "height": 1280, "va_bitrate": "1200k", "va_maxrate": "1400k", "va_bufsize": "2400k", "crf": "30"} elif profile == "hq": return {"width": 1080, "height": 1920, "va_bitrate": "4500k", "va_maxrate": "5000k", "va_bufsize": "9000k", "crf": "18"} else: return {"width": 900, "height": 1600, "va_bitrate": "2500k", "va_maxrate": "3000k", "va_bufsize": "5000k", "crf": "23"} def build_filter_chain(src, settings, rotation_override=None, use_vaapi=True): rotation = rotation_override if rotation_override in (90, 180, 270) else get_rotation_degrees(src) filters = [] if rotation == 90: filters.append("transpose=1") elif rotation == 270: filters.append("transpose=2") elif rotation == 180: filters.append("hflip,vflip") if use_vaapi: filters.append("format=nv12") filters.append("hwupload") filters.append(f"scale_vaapi=w={settings['width']}:h={settings['height']}:force_original_aspect_ratio=decrease") else: filters.append(f"scale={settings['width']}:{settings['height']}:force_original_aspect_ratio=decrease") return ",".join(filters) def ensure_metrics_row(db, tenant_id): with db.cursor() as cur: cur.execute( """ INSERT INTO tenant_usage_metrics (tenant_id) VALUES (%s) ON DUPLICATE KEY UPDATE tenant_id = tenant_id """, (tenant_id,) ) db.commit() def bump_metrics(db, tenant_id, complete=False, failed=False, gpu_seconds=0): ensure_metrics_row(db, tenant_id) with db.cursor() as cur: cur.execute( """ UPDATE tenant_usage_metrics SET video_jobs_total = video_jobs_total + 1, video_jobs_complete = video_jobs_complete + %s, video_jobs_failed = video_jobs_failed + %s, gpu_seconds_total = gpu_seconds_total + %s WHERE tenant_id = %s """, ( 1 if complete else 0, 1 if failed else 0, int(gpu_seconds or 0), tenant_id, ), ) db.commit() def update_progress(db, job_id, percent): percent = max(5, min(99, int(percent))) with db.cursor() as cur: cur.execute( "UPDATE video_jobs SET progress_percent = %s WHERE id = %s", (percent, job_id), ) db.commit() def run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds): process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) log_lines = [] time_re = re.compile(r"time=(\d{2}):(\d{2}):(\d{2}(?:\.\d+)?)") for line in process.stdout: log_lines.append(line.rstrip()) if len(log_lines) > 200: log_lines = log_lines[-200:] if duration_seconds and duration_seconds > 0: m = time_re.search(line) if m: hh = int(m.group(1)) mm = int(m.group(2)) ss = float(m.group(3)) current = hh * 3600 + mm * 60 + ss percent = (current / duration_seconds) * 100.0 update_progress(db, job_id, percent) process.wait() return process.returncode, "\n".join(log_lines[-120:]) def process_job(db, job, processor): job_id = job["id"] src = build_absolute_source_path(db, job) profile = job["requested_profile"] rotation_override = job.get("requested_rotation_degrees") device = INTEL_DEV if processor == "intel" else AMD_DEV output = build_absolute_output_path(db, job, src) settings = build_profile_settings(profile) duration_seconds = get_duration_seconds(src) if processor in ("intel", "amd"): vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=True) cmd = [ "ffmpeg", "-hide_banner", "-y", "-noautorotate", "-fflags", "+genpts", "-vaapi_device", device, "-i", src, "-vf", vf, "-c:v", "h264_vaapi", "-b:v", settings["va_bitrate"], "-maxrate", settings["va_maxrate"], "-bufsize", settings["va_bufsize"], "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "48000", "-movflags", "+faststart", "-metadata:s:v:0", "rotate=0", "-metadata:s:v:0", "rotate=0", output ] else: vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=False) cmd = [ "ffmpeg", "-hide_banner", "-y", "-noautorotate", "-fflags", "+genpts", "-i", src, "-vf", vf, "-c:v", "libx264", "-preset", "medium", "-crf", settings["crf"], "-c:a", "aac", "-b:a", "160k", "-ac", "2", "-ar", "48000", "-movflags", "+faststart", "-metadata:s:v:0", "rotate=0", "-metadata:s:v:0", "rotate=0", output ] start = datetime.utcnow() try: returncode, log_excerpt = run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds) end = datetime.utcnow() gpu_seconds = max(0, int((end - start).total_seconds())) with db.cursor() as cur: if returncode == 0: rel_output = to_relative_output_path(db, job, output) cur.execute( """ UPDATE video_jobs SET status='complete', assigned_processor=%s, output_relative_path=%s, progress_percent=100, gpu_seconds=%s, started_at=COALESCE(started_at, %s), completed_at=%s, log_excerpt=%s, error_message=NULL WHERE id=%s """, ( processor, rel_output, gpu_seconds, start, end, log_excerpt[:4000], job_id, ), ) db.commit() bump_metrics(db, job["tenant_id"], complete=True, failed=False, gpu_seconds=gpu_seconds) else: with db.cursor() as cur2: cur2.execute( """ UPDATE video_jobs SET status='failed', assigned_processor=%s, gpu_seconds=%s, error_message=%s, log_excerpt=%s, completed_at=%s WHERE id=%s """, ( processor, gpu_seconds, "ffmpeg failed", log_excerpt[:4000], end, job_id, ), ) db.commit() bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=gpu_seconds) except Exception as e: with db.cursor() as cur: cur.execute( """ UPDATE video_jobs SET status='failed', assigned_processor=%s, error_message=%s, completed_at=UTC_TIMESTAMP() WHERE id=%s """, (processor, str(e)[:1000], job_id), ) db.commit() bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=0) def _claim_from_query(cur, processor, where_sql="", args=()): """ Scheduling policy: - Intel prefers default/compress and can fall back to any queued job. - AMD prefers HQ. - AMD may help with default/compress only when there is light-job backlog or Intel is already processing a job. This avoids AMD stealing single default/compress jobs when Intel is free. """ job = None if processor == "amd": # 1) AMD always prefers HQ first. cur.execute( f""" SELECT * FROM video_jobs WHERE status='queued' {where_sql} AND requested_profile = %s ORDER BY id ASC LIMIT 1 FOR UPDATE """, tuple(args) + ("hq",), ) job = cur.fetchone() if job: return job # 2) Only let AMD help with light jobs if there is backlog # or Intel is already busy. cur.execute( f""" SELECT COUNT(*) AS c FROM video_jobs WHERE status='queued' {where_sql} AND requested_profile IN ('default','compress') """, tuple(args), ) row = cur.fetchone() light_backlog = row["c"] if isinstance(row, dict) else row[0] cur.execute( """ SELECT COUNT(*) AS c FROM video_jobs WHERE status='processing' AND assigned_processor = 'intel' """ ) row = cur.fetchone() intel_busy = (row["c"] if isinstance(row, dict) else row[0]) > 0 if light_backlog >= 2 or intel_busy: cur.execute( f""" SELECT * FROM video_jobs WHERE status='queued' {where_sql} AND requested_profile IN ('default','compress') ORDER BY id ASC LIMIT 1 FOR UPDATE """, tuple(args), ) job = cur.fetchone() return job # Intel path: default/compress first. cur.execute( f""" SELECT * FROM video_jobs WHERE status='queued' {where_sql} AND requested_profile IN ('default','compress') ORDER BY id ASC LIMIT 1 FOR UPDATE """, tuple(args), ) job = cur.fetchone() # Intel may fall back to HQ/anything if no light work exists. if not job: cur.execute( f""" SELECT * FROM video_jobs WHERE status='queued' {where_sql} ORDER BY id ASC LIMIT 1 FOR UPDATE """, tuple(args), ) job = cur.fetchone() return job def claim_next_job(db, processor, current_batch_id=None, current_tenant_id=None): with db.cursor() as cur: cur.execute("START TRANSACTION") # 1) Keep this worker on its current batch until the batch is finished. if current_batch_id: job = _claim_from_query(cur, processor, "AND batch_id = %s", (current_batch_id,)) if job: cur.execute( """ UPDATE video_jobs SET status='processing', assigned_processor=%s, started_at=COALESCE(started_at, UTC_TIMESTAMP()), progress_percent=5 WHERE id=%s """, (processor, job["id"]), ) db.commit() job["assigned_processor"] = processor return job, current_batch_id, job["tenant_id"] # 2) Count active tenants. If only one tenant is active, let them use all GPUs. cur.execute(""" SELECT COUNT(DISTINCT tenant_id) AS c FROM video_jobs WHERE status IN ('queued','processing') """) row = cur.fetchone() active_tenants = row["c"] if isinstance(row, dict) else row[0] if active_tenants <= 1: job = _claim_from_query(cur, processor) if not job: db.rollback() return None, None, None cur.execute( """ UPDATE video_jobs SET status='processing', assigned_processor=%s, started_at=COALESCE(started_at, UTC_TIMESTAMP()), progress_percent=5 WHERE id=%s """, (processor, job["id"]), ) db.commit() job["assigned_processor"] = processor return job, job.get("batch_id"), job["tenant_id"] # 3) Multiple active tenants: # only allow a tenant without an already-processing job to get a GPU slot. cur.execute(""" SELECT DISTINCT tenant_id FROM video_jobs WHERE status='processing' """) busy_rows = cur.fetchall() busy_tenants = {r["tenant_id"] if isinstance(r, dict) else r[0] for r in busy_rows} if busy_tenants: placeholders = ",".join(["%s"] * len(busy_tenants)) cur.execute( f""" SELECT tenant_id FROM video_jobs WHERE status='queued' AND tenant_id NOT IN ({placeholders}) ORDER BY id ASC LIMIT 1 FOR UPDATE """, tuple(busy_tenants), ) else: cur.execute( """ SELECT tenant_id FROM video_jobs WHERE status='queued' ORDER BY id ASC LIMIT 1 FOR UPDATE """ ) tenant_row = cur.fetchone() if not tenant_row: db.rollback() return None, None, None tenant_id = tenant_row["tenant_id"] if isinstance(tenant_row, dict) else tenant_row[0] job = _claim_from_query(cur, processor, "AND tenant_id = %s", (tenant_id,)) if not job: db.rollback() return None, None, None cur.execute( """ UPDATE video_jobs SET status='processing', assigned_processor=%s, started_at=COALESCE(started_at, UTC_TIMESTAMP()), progress_percent=5 WHERE id=%s """, (processor, job["id"]), ) db.commit() job["assigned_processor"] = processor return job, job.get("batch_id"), job["tenant_id"] def worker_loop(app, processor): current_batch_id = None current_tenant_id = None with app.app_context(): print(f"{processor} worker started", flush=True) while True: try: db = get_db() try: db.rollback() except Exception: pass job, new_batch_id, new_tenant_id = claim_next_job( db, processor, current_batch_id=current_batch_id, current_tenant_id=current_tenant_id, ) if job: current_batch_id = new_batch_id current_tenant_id = new_tenant_id print(f"{processor} worker picked job id={job['id']} batch={job.get('batch_id')} tenant={job['tenant_id']} source={job['source_relative_path']}", flush=True) process_job(db, job, processor) else: current_batch_id = None current_tenant_id = None time.sleep(2) except Exception as e: print(f"{processor} worker loop error: {e}", flush=True) time.sleep(2) def run_worker(): app = create_app() threads = [ threading.Thread(target=worker_loop, args=(app, "intel"), daemon=True), threading.Thread(target=worker_loop, args=(app, "amd"), daemon=True), ] for t in threads: t.start() while True: time.sleep(60)