You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
59 lines
2.2 KiB
59 lines
2.2 KiB
from flask import Blueprint, current_app, redirect, render_template, request, session, url_for |
|
|
|
from app.db import get_db |
|
from .utils import ensure_user_tenant_and_devices, is_valid_signature, is_valid_timestamp |
|
|
|
bp = Blueprint("auth", __name__, url_prefix="/auth") |
|
|
|
@bp.route("/login-required") |
|
def login_required_notice(): |
|
return render_template("auth/login_required.html") |
|
|
|
@bp.route("/handoff") |
|
def handoff(): |
|
portal_user_id = request.args.get("uid", "").strip() |
|
email = request.args.get("email", "").strip().lower() |
|
ts = request.args.get("ts", "").strip() |
|
sig = request.args.get("sig", "").strip() |
|
|
|
if not portal_user_id or not email or not ts or not sig: |
|
return render_template("auth/handoff_error.html", message="Missing handoff parameters."), 400 |
|
|
|
if not is_valid_timestamp(ts): |
|
return render_template("auth/handoff_error.html", message="Handoff timestamp is invalid or expired."), 403 |
|
|
|
if not is_valid_signature(email=email, ts=ts, portal_user_id=portal_user_id, sig=sig): |
|
return render_template("auth/handoff_error.html", message="Invalid handoff signature."), 403 |
|
|
|
identity = ensure_user_tenant_and_devices(email=email, portal_user_id=int(portal_user_id)) |
|
|
|
session.clear() |
|
session["otb_user_id"] = identity["user_id"] |
|
session["otb_tenant_id"] = identity["tenant_id"] |
|
session["otb_tenant_slug"] = identity["tenant_slug"] |
|
session["otb_email"] = identity["email"] |
|
|
|
db = get_db() |
|
with db.cursor() as cur: |
|
cur.execute( |
|
""" |
|
INSERT INTO audit_logs ( |
|
tenant_id, user_id, actor_type, event_type, ip_address, user_agent, event_detail |
|
) VALUES (%s, %s, 'user', 'handoff_login_success', %s, %s, %s) |
|
""", |
|
( |
|
identity["tenant_id"], |
|
identity["user_id"], |
|
request.headers.get("X-Forwarded-For", request.remote_addr), |
|
request.headers.get("User-Agent", ""), |
|
f"Portal handoff accepted for {email}", |
|
), |
|
) |
|
db.commit() |
|
|
|
return redirect(url_for("main.dashboard")) |
|
|
|
@bp.route("/logout") |
|
def logout(): |
|
session.clear() |
|
return redirect(url_for("auth.login_required_notice"))
|
|
|