otb-cloud secure encrypted backups
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

614 lines
19 KiB

import time
import subprocess
import json
import threading
import re
from datetime import datetime
from pathlib import Path
from app import create_app
from app.db import get_db
INTEL_DEV = "/dev/dri/renderD129"
AMD_DEV = "/dev/dri/renderD128"
def run_ffprobe_json(src):
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=duration:stream_tags=rotate",
"-of", "json",
src,
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
return {}
try:
return json.loads(result.stdout or "{}")
except Exception:
return {}
def get_rotation_degrees(src):
data = run_ffprobe_json(src)
streams = data.get("streams", [])
if not streams:
return 0
tags = streams[0].get("tags", {}) or {}
rotate_tag = tags.get("rotate")
if rotate_tag is not None:
try:
return int(rotate_tag) % 360
except Exception:
pass
return 0
def get_duration_seconds(src):
data = run_ffprobe_json(src)
streams = data.get("streams", [])
if not streams:
return None
duration = streams[0].get("duration")
try:
return float(duration)
except Exception:
return None
def build_absolute_source_path(db, job):
with db.cursor() as cur:
cur.execute(
"SELECT storage_root FROM tenants WHERE id = %s",
(job["tenant_id"],)
)
tenant_row = cur.fetchone()
if not tenant_row:
raise RuntimeError(f"Tenant id {job['tenant_id']} not found")
return str(Path(tenant_row["storage_root"]) / job["source_relative_path"])
def build_absolute_output_path(db, job, src):
with db.cursor() as cur:
cur.execute(
"SELECT storage_root FROM tenants WHERE id = %s",
(job["tenant_id"],)
)
tenant_row = cur.fetchone()
cur.execute(
"SELECT relative_path FROM devices WHERE id = %s",
(job["device_id"],)
)
device_row = cur.fetchone()
if not tenant_row:
raise RuntimeError(f"Tenant id {job['tenant_id']} not found")
if not device_row:
raise RuntimeError(f"Device id {job['device_id']} not found")
storage_root = Path(tenant_row["storage_root"])
device_relative_path = Path(device_row["relative_path"])
out_dir = storage_root / device_relative_path / "video"
out_dir.mkdir(parents=True, exist_ok=True)
profile = (job.get("requested_profile") or "default").lower()
out_name = Path(src).stem + f"_{profile}_processed.mp4"
return str(out_dir / out_name)
def to_relative_output_path(db, job, absolute_output):
with db.cursor() as cur:
cur.execute(
"SELECT storage_root FROM tenants WHERE id = %s",
(job["tenant_id"],)
)
tenant_row = cur.fetchone()
if not tenant_row:
return absolute_output
try:
return str(Path(absolute_output).relative_to(Path(tenant_row["storage_root"])))
except Exception:
return absolute_output
def build_profile_settings(profile):
profile = (profile or "default").lower()
if profile == "compress":
return {"width": 720, "height": 1280, "va_bitrate": "1200k", "va_maxrate": "1400k", "va_bufsize": "2400k", "crf": "30"}
elif profile == "hq":
return {"width": 1080, "height": 1920, "va_bitrate": "4500k", "va_maxrate": "5000k", "va_bufsize": "9000k", "crf": "18"}
else:
return {"width": 900, "height": 1600, "va_bitrate": "2500k", "va_maxrate": "3000k", "va_bufsize": "5000k", "crf": "23"}
def build_filter_chain(src, settings, rotation_override=None, use_vaapi=True):
rotation = rotation_override if rotation_override in (90, 180, 270) else get_rotation_degrees(src)
filters = []
if rotation == 90:
filters.append("transpose=1")
elif rotation == 270:
filters.append("transpose=2")
elif rotation == 180:
filters.append("hflip,vflip")
if use_vaapi:
filters.append("format=nv12")
filters.append("hwupload")
filters.append(f"scale_vaapi=w={settings['width']}:h={settings['height']}:force_original_aspect_ratio=decrease")
else:
filters.append(f"scale={settings['width']}:{settings['height']}:force_original_aspect_ratio=decrease")
return ",".join(filters)
def ensure_metrics_row(db, tenant_id):
with db.cursor() as cur:
cur.execute(
"""
INSERT INTO tenant_usage_metrics (tenant_id)
VALUES (%s)
ON DUPLICATE KEY UPDATE tenant_id = tenant_id
""",
(tenant_id,)
)
db.commit()
def bump_metrics(db, tenant_id, complete=False, failed=False, gpu_seconds=0):
ensure_metrics_row(db, tenant_id)
with db.cursor() as cur:
cur.execute(
"""
UPDATE tenant_usage_metrics
SET video_jobs_total = video_jobs_total + 1,
video_jobs_complete = video_jobs_complete + %s,
video_jobs_failed = video_jobs_failed + %s,
gpu_seconds_total = gpu_seconds_total + %s
WHERE tenant_id = %s
""",
(
1 if complete else 0,
1 if failed else 0,
int(gpu_seconds or 0),
tenant_id,
),
)
db.commit()
def update_progress(db, job_id, percent):
percent = max(5, min(99, int(percent)))
with db.cursor() as cur:
cur.execute(
"UPDATE video_jobs SET progress_percent = %s WHERE id = %s",
(percent, job_id),
)
db.commit()
def run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds):
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
log_lines = []
time_re = re.compile(r"time=(\d{2}):(\d{2}):(\d{2}(?:\.\d+)?)")
for line in process.stdout:
log_lines.append(line.rstrip())
if len(log_lines) > 200:
log_lines = log_lines[-200:]
if duration_seconds and duration_seconds > 0:
m = time_re.search(line)
if m:
hh = int(m.group(1))
mm = int(m.group(2))
ss = float(m.group(3))
current = hh * 3600 + mm * 60 + ss
percent = (current / duration_seconds) * 100.0
update_progress(db, job_id, percent)
process.wait()
return process.returncode, "\n".join(log_lines[-120:])
def process_job(db, job, processor):
job_id = job["id"]
src = build_absolute_source_path(db, job)
profile = job["requested_profile"]
rotation_override = job.get("requested_rotation_degrees")
device = INTEL_DEV if processor == "intel" else AMD_DEV
output = build_absolute_output_path(db, job, src)
settings = build_profile_settings(profile)
duration_seconds = get_duration_seconds(src)
if processor in ("intel", "amd"):
vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=True)
cmd = [
"ffmpeg", "-hide_banner", "-y",
"-noautorotate",
"-fflags", "+genpts",
"-vaapi_device", device,
"-i", src,
"-vf", vf,
"-c:v", "h264_vaapi",
"-b:v", settings["va_bitrate"],
"-maxrate", settings["va_maxrate"],
"-bufsize", settings["va_bufsize"],
"-c:a", "aac",
"-b:a", "160k",
"-ac", "2",
"-ar", "48000",
"-movflags", "+faststart",
"-metadata:s:v:0", "rotate=0",
"-metadata:s:v:0", "rotate=0",
output
]
else:
vf = build_filter_chain(src, settings, rotation_override=rotation_override, use_vaapi=False)
cmd = [
"ffmpeg", "-hide_banner", "-y",
"-noautorotate",
"-fflags", "+genpts",
"-i", src,
"-vf", vf,
"-c:v", "libx264",
"-preset", "medium",
"-crf", settings["crf"],
"-c:a", "aac",
"-b:a", "160k",
"-ac", "2",
"-ar", "48000",
"-movflags", "+faststart",
"-metadata:s:v:0", "rotate=0",
"-metadata:s:v:0", "rotate=0",
output
]
start = datetime.utcnow()
try:
returncode, log_excerpt = run_ffmpeg_with_progress(db, job_id, cmd, duration_seconds)
end = datetime.utcnow()
gpu_seconds = max(0, int((end - start).total_seconds()))
with db.cursor() as cur:
if returncode == 0:
rel_output = to_relative_output_path(db, job, output)
cur.execute(
"""
UPDATE video_jobs
SET status='complete',
assigned_processor=%s,
output_relative_path=%s,
progress_percent=100,
gpu_seconds=%s,
started_at=COALESCE(started_at, %s),
completed_at=%s,
log_excerpt=%s,
error_message=NULL
WHERE id=%s
""",
(
processor,
rel_output,
gpu_seconds,
start,
end,
log_excerpt[:4000],
job_id,
),
)
db.commit()
bump_metrics(db, job["tenant_id"], complete=True, failed=False, gpu_seconds=gpu_seconds)
else:
with db.cursor() as cur2:
cur2.execute(
"""
UPDATE video_jobs
SET status='failed',
assigned_processor=%s,
gpu_seconds=%s,
error_message=%s,
log_excerpt=%s,
completed_at=%s
WHERE id=%s
""",
(
processor,
gpu_seconds,
"ffmpeg failed",
log_excerpt[:4000],
end,
job_id,
),
)
db.commit()
bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=gpu_seconds)
except Exception as e:
with db.cursor() as cur:
cur.execute(
"""
UPDATE video_jobs
SET status='failed',
assigned_processor=%s,
error_message=%s,
completed_at=UTC_TIMESTAMP()
WHERE id=%s
""",
(processor, str(e)[:1000], job_id),
)
db.commit()
bump_metrics(db, job["tenant_id"], complete=False, failed=True, gpu_seconds=0)
def _claim_from_query(cur, processor, where_sql="", args=()):
"""
Scheduling policy:
- Intel prefers default/compress and can fall back to any queued job.
- AMD prefers HQ.
- AMD may help with default/compress only when there is light-job backlog
or Intel is already processing a job. This avoids AMD stealing single
default/compress jobs when Intel is free.
"""
job = None
if processor == "amd":
# 1) AMD always prefers HQ first.
cur.execute(
f"""
SELECT *
FROM video_jobs
WHERE status='queued' {where_sql} AND requested_profile = %s
ORDER BY id ASC
LIMIT 1
FOR UPDATE
""",
tuple(args) + ("hq",),
)
job = cur.fetchone()
if job:
return job
# 2) Only let AMD help with light jobs if there is backlog
# or Intel is already busy.
cur.execute(
f"""
SELECT COUNT(*) AS c
FROM video_jobs
WHERE status='queued' {where_sql}
AND requested_profile IN ('default','compress')
""",
tuple(args),
)
row = cur.fetchone()
light_backlog = row["c"] if isinstance(row, dict) else row[0]
cur.execute(
"""
SELECT COUNT(*) AS c
FROM video_jobs
WHERE status='processing'
AND assigned_processor = 'intel'
"""
)
row = cur.fetchone()
intel_busy = (row["c"] if isinstance(row, dict) else row[0]) > 0
if light_backlog >= 2 or intel_busy:
cur.execute(
f"""
SELECT *
FROM video_jobs
WHERE status='queued' {where_sql}
AND requested_profile IN ('default','compress')
ORDER BY id ASC
LIMIT 1
FOR UPDATE
""",
tuple(args),
)
job = cur.fetchone()
return job
# Intel path: default/compress first.
cur.execute(
f"""
SELECT *
FROM video_jobs
WHERE status='queued' {where_sql} AND requested_profile IN ('default','compress')
ORDER BY id ASC
LIMIT 1
FOR UPDATE
""",
tuple(args),
)
job = cur.fetchone()
# Intel may fall back to HQ/anything if no light work exists.
if not job:
cur.execute(
f"""
SELECT *
FROM video_jobs
WHERE status='queued' {where_sql}
ORDER BY id ASC
LIMIT 1
FOR UPDATE
""",
tuple(args),
)
job = cur.fetchone()
return job
def claim_next_job(db, processor, current_batch_id=None, current_tenant_id=None):
with db.cursor() as cur:
cur.execute("START TRANSACTION")
# 1) Keep this worker on its current batch until the batch is finished.
if current_batch_id:
job = _claim_from_query(cur, processor, "AND batch_id = %s", (current_batch_id,))
if job:
cur.execute(
"""
UPDATE video_jobs
SET status='processing',
assigned_processor=%s,
started_at=COALESCE(started_at, UTC_TIMESTAMP()),
progress_percent=5
WHERE id=%s
""",
(processor, job["id"]),
)
db.commit()
job["assigned_processor"] = processor
return job, current_batch_id, job["tenant_id"]
# 2) Count active tenants. If only one tenant is active, let them use all GPUs.
cur.execute("""
SELECT COUNT(DISTINCT tenant_id) AS c
FROM video_jobs
WHERE status IN ('queued','processing')
""")
row = cur.fetchone()
active_tenants = row["c"] if isinstance(row, dict) else row[0]
if active_tenants <= 1:
job = _claim_from_query(cur, processor)
if not job:
db.rollback()
return None, None, None
cur.execute(
"""
UPDATE video_jobs
SET status='processing',
assigned_processor=%s,
started_at=COALESCE(started_at, UTC_TIMESTAMP()),
progress_percent=5
WHERE id=%s
""",
(processor, job["id"]),
)
db.commit()
job["assigned_processor"] = processor
return job, job.get("batch_id"), job["tenant_id"]
# 3) Multiple active tenants:
# only allow a tenant without an already-processing job to get a GPU slot.
cur.execute("""
SELECT DISTINCT tenant_id
FROM video_jobs
WHERE status='processing'
""")
busy_rows = cur.fetchall()
busy_tenants = {r["tenant_id"] if isinstance(r, dict) else r[0] for r in busy_rows}
if busy_tenants:
placeholders = ",".join(["%s"] * len(busy_tenants))
cur.execute(
f"""
SELECT tenant_id
FROM video_jobs
WHERE status='queued'
AND tenant_id NOT IN ({placeholders})
ORDER BY id ASC
LIMIT 1
FOR UPDATE
""",
tuple(busy_tenants),
)
else:
cur.execute(
"""
SELECT tenant_id
FROM video_jobs
WHERE status='queued'
ORDER BY id ASC
LIMIT 1
FOR UPDATE
"""
)
tenant_row = cur.fetchone()
if not tenant_row:
db.rollback()
return None, None, None
tenant_id = tenant_row["tenant_id"] if isinstance(tenant_row, dict) else tenant_row[0]
job = _claim_from_query(cur, processor, "AND tenant_id = %s", (tenant_id,))
if not job:
db.rollback()
return None, None, None
cur.execute(
"""
UPDATE video_jobs
SET status='processing',
assigned_processor=%s,
started_at=COALESCE(started_at, UTC_TIMESTAMP()),
progress_percent=5
WHERE id=%s
""",
(processor, job["id"]),
)
db.commit()
job["assigned_processor"] = processor
return job, job.get("batch_id"), job["tenant_id"]
def worker_loop(app, processor):
current_batch_id = None
current_tenant_id = None
with app.app_context():
print(f"{processor} worker started", flush=True)
while True:
try:
db = get_db()
try:
db.rollback()
except Exception:
pass
job, new_batch_id, new_tenant_id = claim_next_job(
db,
processor,
current_batch_id=current_batch_id,
current_tenant_id=current_tenant_id,
)
if job:
current_batch_id = new_batch_id
current_tenant_id = new_tenant_id
print(f"{processor} worker picked job id={job['id']} batch={job.get('batch_id')} tenant={job['tenant_id']} source={job['source_relative_path']}", flush=True)
process_job(db, job, processor)
else:
current_batch_id = None
current_tenant_id = None
time.sleep(2)
except Exception as e:
print(f"{processor} worker loop error: {e}", flush=True)
time.sleep(2)
def run_worker():
app = create_app()
threads = [
threading.Thread(target=worker_loop, args=(app, "intel"), daemon=True),
threading.Thread(target=worker_loop, args=(app, "amd"), daemon=True),
]
for t in threads:
t.start()
while True:
time.sleep(60)