import time import subprocess from datetime import datetime from pathlib import Path from app import create_app from app.db import get_db from app.services.gpu_select import select_processor, release INTEL_DEV = "/dev/dri/renderD129" AMD_DEV = "/dev/dri/renderD128" def run_ffmpeg(cmd): return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) def build_absolute_source_path(db, job): with db.cursor() as cur: cur.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur.fetchone() if not tenant_row: raise RuntimeError(f"Tenant id {job['tenant_id']} not found") storage_root = tenant_row["storage_root"] return str(Path(storage_root) / job["source_relative_path"]) def process_job(db, job): job_id = job["id"] src = build_absolute_source_path(db, job) profile = job["requested_profile"] processor = select_processor() device = INTEL_DEV if processor == "intel" else AMD_DEV output = str(Path(src).with_name(Path(src).stem + "_processed.mp4")) if profile == "portrait_web": vf = "format=nv12,hwupload,scale_vaapi=w=720:h=1280:force_original_aspect_ratio=decrease" else: vf = "format=nv12,hwupload,scale_vaapi=w=1280:h=720:force_original_aspect_ratio=decrease" if processor in ("intel", "amd"): cmd = [ "ffmpeg", "-hide_banner", "-y", "-vaapi_device", device, "-i", src, "-vf", vf, "-c:v", "h264_vaapi", "-b:v", "3M", "-maxrate", "3M", "-bufsize", "6M", "-c:a", "aac", "-b:a", "128k", output ] else: cmd = [ "ffmpeg", "-hide_banner", "-y", "-i", src, "-c:v", "libx264", "-preset", "medium", "-crf", "23", "-c:a", "aac", "-b:a", "128k", output ] start = datetime.utcnow() try: result = run_ffmpeg(cmd) end = datetime.utcnow() with db.cursor() as cur: if result.returncode == 0: rel_output = None try: with db.cursor() as cur2: cur2.execute( "SELECT storage_root FROM tenants WHERE id = %s", (job["tenant_id"],) ) tenant_row = cur2.fetchone() if tenant_row: rel_output = str(Path(output).relative_to(Path(tenant_row["storage_root"]))) except Exception: rel_output = output cur.execute( """ UPDATE video_jobs SET status='complete', assigned_processor=%s, output_relative_path=%s, progress_percent=100, started_at=COALESCE(started_at, %s), completed_at=%s, log_excerpt=%s, error_message=NULL WHERE id=%s """, ( processor, rel_output or output, start, end, (result.stderr or "")[:1000], job_id, ), ) else: cur.execute( """ UPDATE video_jobs SET status='failed', error_message=%s, log_excerpt=%s, completed_at=%s WHERE id=%s """, ( "ffmpeg failed", (result.stderr or "")[:4000], end, job_id, ), ) db.commit() except Exception as e: with db.cursor() as cur: cur.execute( """ UPDATE video_jobs SET status='failed', error_message=%s, completed_at=UTC_TIMESTAMP() WHERE id=%s """, (str(e)[:1000], job_id), ) db.commit() finally: if processor in ("intel", "amd"): release(processor) def run_worker(): app = create_app() with app.app_context(): print("video worker started", flush=True) while True: try: db = get_db() try: db.rollback() except Exception: pass with db.cursor() as cur: cur.execute( """ SELECT * FROM video_jobs WHERE status='queued' ORDER BY id ASC LIMIT 1 """ ) job = cur.fetchone() if job: print( f"worker picked job id={job['id']} source={job['source_relative_path']}", flush=True ) cur.execute( """ UPDATE video_jobs SET status='processing', started_at=COALESCE(started_at, UTC_TIMESTAMP()), progress_percent=5 WHERE id=%s """, (job["id"],), ) db.commit() process_job(db, job) except Exception as e: print(f"worker loop error: {e}", flush=True) time.sleep(5)