You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2342 lines
71 KiB
2342 lines
71 KiB
from flask import Flask, render_template, request, redirect, send_file, make_response, jsonify |
|
from db import get_db_connection |
|
from utils import generate_client_code, generate_service_code |
|
from datetime import datetime, timezone |
|
from zoneinfo import ZoneInfo |
|
from decimal import Decimal, InvalidOperation |
|
|
|
from io import BytesIO, StringIO |
|
import csv |
|
import zipfile |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.pdfgen import canvas |
|
from reportlab.lib.utils import ImageReader |
|
|
|
app = Flask( |
|
__name__, |
|
template_folder="../templates", |
|
static_folder="../static", |
|
) |
|
|
|
LOCAL_TZ = ZoneInfo("America/Toronto") |
|
|
|
def load_version(): |
|
try: |
|
with open("/home/def/otb_billing/VERSION", "r") as f: |
|
return f.read().strip() |
|
except Exception: |
|
return "unknown" |
|
|
|
APP_VERSION = load_version() |
|
|
|
@app.context_processor |
|
def inject_version(): |
|
return {"app_version": APP_VERSION} |
|
|
|
@app.context_processor |
|
def inject_app_settings(): |
|
return {"app_settings": get_app_settings()} |
|
|
|
def fmt_local(dt_value): |
|
if not dt_value: |
|
return "" |
|
if isinstance(dt_value, str): |
|
try: |
|
dt_value = datetime.fromisoformat(dt_value) |
|
except ValueError: |
|
return str(dt_value) |
|
if dt_value.tzinfo is None: |
|
dt_value = dt_value.replace(tzinfo=timezone.utc) |
|
return dt_value.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %I:%M:%S %p") |
|
|
|
def to_decimal(value): |
|
if value is None or value == "": |
|
return Decimal("0") |
|
try: |
|
return Decimal(str(value)) |
|
except (InvalidOperation, ValueError): |
|
return Decimal("0") |
|
|
|
def fmt_money(value, currency_code="CAD"): |
|
amount = to_decimal(value) |
|
if currency_code == "CAD": |
|
return f"{amount:.2f}" |
|
return f"{amount:.8f}" |
|
|
|
def refresh_overdue_invoices(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
UPDATE invoices |
|
SET status = 'overdue' |
|
WHERE due_at IS NOT NULL |
|
AND due_at < UTC_TIMESTAMP() |
|
AND status IN ('pending', 'partial') |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
def recalc_invoice_totals(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, total_amount, due_at, status |
|
FROM invoices |
|
WHERE id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(payment_amount), 0) AS total_paid |
|
FROM payments |
|
WHERE invoice_id = %s |
|
AND payment_status = 'confirmed' |
|
""", (invoice_id,)) |
|
row = cursor.fetchone() |
|
|
|
total_paid = to_decimal(row["total_paid"]) |
|
total_amount = to_decimal(invoice["total_amount"]) |
|
|
|
if invoice["status"] == "cancelled": |
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE invoices |
|
SET amount_paid = %s, |
|
paid_at = NULL |
|
WHERE id = %s |
|
""", ( |
|
str(total_paid), |
|
invoice_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return |
|
|
|
if total_paid >= total_amount and total_amount > 0: |
|
new_status = "paid" |
|
paid_at_value = "UTC_TIMESTAMP()" |
|
elif total_paid > 0: |
|
new_status = "partial" |
|
paid_at_value = "NULL" |
|
else: |
|
if invoice["due_at"] and invoice["due_at"] < datetime.utcnow(): |
|
new_status = "overdue" |
|
else: |
|
new_status = "pending" |
|
paid_at_value = "NULL" |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(f""" |
|
UPDATE invoices |
|
SET amount_paid = %s, |
|
status = %s, |
|
paid_at = {paid_at_value} |
|
WHERE id = %s |
|
""", ( |
|
str(total_paid), |
|
new_status, |
|
invoice_id |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
def get_client_credit_balance(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(amount), 0) AS balance |
|
FROM credit_ledger |
|
WHERE client_id = %s |
|
""", (client_id,)) |
|
row = cursor.fetchone() |
|
conn.close() |
|
return to_decimal(row["balance"]) |
|
|
|
|
|
def generate_invoice_number(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT invoice_number |
|
FROM invoices |
|
WHERE invoice_number IS NOT NULL |
|
AND invoice_number LIKE 'INV-%' |
|
ORDER BY id DESC |
|
LIMIT 1 |
|
""") |
|
row = cursor.fetchone() |
|
conn.close() |
|
|
|
if not row or not row.get("invoice_number"): |
|
return "INV-0001" |
|
|
|
invoice_number = str(row["invoice_number"]).strip() |
|
|
|
try: |
|
number = int(invoice_number.split("-")[1]) |
|
except (IndexError, ValueError): |
|
return "INV-0001" |
|
|
|
return f"INV-{number + 1:04d}" |
|
|
|
|
|
APP_SETTINGS_DEFAULTS = { |
|
"business_name": "OTB Billing", |
|
"business_tagline": "By a contractor, for contractors", |
|
"business_logo_url": "", |
|
"business_email": "", |
|
"business_phone": "", |
|
"business_address": "", |
|
"business_website": "", |
|
"tax_label": "HST", |
|
"tax_rate": "13.00", |
|
"tax_number": "", |
|
"business_number": "", |
|
"default_currency": "CAD", |
|
"report_frequency": "monthly", |
|
"invoice_footer": "", |
|
"payment_terms": "", |
|
"local_country": "Canada", |
|
"apply_local_tax_only": "1", |
|
"smtp_host": "", |
|
"smtp_port": "587", |
|
"smtp_user": "", |
|
"smtp_pass": "", |
|
"smtp_from_email": "", |
|
"smtp_from_name": "", |
|
"smtp_use_tls": "1", |
|
"smtp_use_ssl": "0", |
|
} |
|
|
|
def ensure_app_settings_table(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS app_settings ( |
|
setting_key VARCHAR(100) NOT NULL PRIMARY KEY, |
|
setting_value TEXT NULL, |
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP |
|
) |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
def get_app_settings(): |
|
ensure_app_settings_table() |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT setting_key, setting_value |
|
FROM app_settings |
|
""") |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
settings = dict(APP_SETTINGS_DEFAULTS) |
|
for row in rows: |
|
settings[row["setting_key"]] = row["setting_value"] if row["setting_value"] is not None else "" |
|
|
|
return settings |
|
|
|
def save_app_settings(form_data): |
|
ensure_app_settings_table() |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
|
|
for key in APP_SETTINGS_DEFAULTS.keys(): |
|
if key in {"apply_local_tax_only", "smtp_use_tls", "smtp_use_ssl"}: |
|
value = "1" if form_data.get(key) else "0" |
|
else: |
|
value = (form_data.get(key) or "").strip() |
|
|
|
cursor.execute(""" |
|
INSERT INTO app_settings (setting_key, setting_value) |
|
VALUES (%s, %s) |
|
ON DUPLICATE KEY UPDATE setting_value = VALUES(setting_value) |
|
""", (key, value)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
|
|
@app.template_filter("localtime") |
|
def localtime_filter(value): |
|
return fmt_local(value) |
|
|
|
@app.template_filter("money") |
|
def money_filter(value, currency_code="CAD"): |
|
return fmt_money(value, currency_code) |
|
|
|
|
|
|
|
|
|
def get_report_period_bounds(frequency): |
|
now_local = datetime.now(LOCAL_TZ) |
|
|
|
if frequency == "yearly": |
|
start_local = now_local.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) |
|
label = f"{now_local.year}" |
|
elif frequency == "quarterly": |
|
quarter = ((now_local.month - 1) // 3) + 1 |
|
start_month = (quarter - 1) * 3 + 1 |
|
start_local = now_local.replace(month=start_month, day=1, hour=0, minute=0, second=0, microsecond=0) |
|
label = f"Q{quarter} {now_local.year}" |
|
else: |
|
start_local = now_local.replace(day=1, hour=0, minute=0, second=0, microsecond=0) |
|
label = now_local.strftime("%B %Y") |
|
|
|
start_utc = start_local.astimezone(timezone.utc).replace(tzinfo=None) |
|
end_utc = now_local.astimezone(timezone.utc).replace(tzinfo=None) |
|
|
|
return start_utc, end_utc, label |
|
|
|
def get_revenue_report_data(): |
|
settings = get_app_settings() |
|
frequency = (settings.get("report_frequency") or "monthly").strip().lower() |
|
if frequency not in {"monthly", "quarterly", "yearly"}: |
|
frequency = "monthly" |
|
|
|
start_utc, end_utc, label = get_report_period_bounds(frequency) |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS collected |
|
FROM payments |
|
WHERE payment_status = 'confirmed' |
|
AND received_at >= %s |
|
AND received_at <= %s |
|
""", (start_utc, end_utc)) |
|
collected_row = cursor.fetchone() |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS invoice_count, |
|
COALESCE(SUM(total_amount), 0) AS invoiced |
|
FROM invoices |
|
WHERE issued_at >= %s |
|
AND issued_at <= %s |
|
""", (start_utc, end_utc)) |
|
invoiced_row = cursor.fetchone() |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS overdue_count, |
|
COALESCE(SUM(total_amount - amount_paid), 0) AS overdue_balance |
|
FROM invoices |
|
WHERE status = 'overdue' |
|
""") |
|
overdue_row = cursor.fetchone() |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS outstanding_count, |
|
COALESCE(SUM(total_amount - amount_paid), 0) AS outstanding_balance |
|
FROM invoices |
|
WHERE status IN ('pending', 'partial', 'overdue') |
|
""") |
|
outstanding_row = cursor.fetchone() |
|
|
|
conn.close() |
|
|
|
return { |
|
"frequency": frequency, |
|
"period_label": label, |
|
"period_start": start_utc.isoformat(sep=" "), |
|
"period_end": end_utc.isoformat(sep=" "), |
|
"collected_cad": str(to_decimal(collected_row["collected"])), |
|
"invoice_count": int(invoiced_row["invoice_count"] or 0), |
|
"invoiced_total": str(to_decimal(invoiced_row["invoiced"])), |
|
"overdue_count": int(overdue_row["overdue_count"] or 0), |
|
"overdue_balance": str(to_decimal(overdue_row["overdue_balance"])), |
|
"outstanding_count": int(outstanding_row["outstanding_count"] or 0), |
|
"outstanding_balance": str(to_decimal(outstanding_row["outstanding_balance"])), |
|
} |
|
|
|
@app.route("/settings", methods=["GET", "POST"]) |
|
def settings(): |
|
ensure_app_settings_table() |
|
|
|
if request.method == "POST": |
|
save_app_settings(request.form) |
|
return redirect("/settings") |
|
|
|
settings = get_app_settings() |
|
return render_template("settings.html", settings=settings) |
|
|
|
|
|
@app.route("/reports/revenue") |
|
def revenue_report(): |
|
report = get_revenue_report_data() |
|
return render_template("reports/revenue.html", report=report) |
|
|
|
@app.route("/reports/revenue.json") |
|
def revenue_report_json(): |
|
report = get_revenue_report_data() |
|
return jsonify(report) |
|
|
|
@app.route("/reports/revenue/print") |
|
def revenue_report_print(): |
|
report = get_revenue_report_data() |
|
return render_template("reports/revenue_print.html", report=report) |
|
|
|
@app.route("/") |
|
def index(): |
|
refresh_overdue_invoices() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute("SELECT COUNT(*) AS total_clients FROM clients") |
|
total_clients = cursor.fetchone()["total_clients"] |
|
|
|
cursor.execute("SELECT COUNT(*) AS active_services FROM services WHERE status = 'active'") |
|
active_services = cursor.fetchone()["active_services"] |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS outstanding_invoices |
|
FROM invoices |
|
WHERE status IN ('pending', 'partial', 'overdue') |
|
""") |
|
outstanding_invoices = cursor.fetchone()["outstanding_invoices"] |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS revenue_received |
|
FROM payments |
|
WHERE payment_status = 'confirmed' |
|
""") |
|
revenue_received = cursor.fetchone()["revenue_received"] |
|
|
|
conn.close() |
|
|
|
return render_template( |
|
"dashboard.html", |
|
total_clients=total_clients, |
|
active_services=active_services, |
|
outstanding_invoices=outstanding_invoices, |
|
revenue_received=revenue_received, |
|
) |
|
|
|
@app.route("/dbtest") |
|
def dbtest(): |
|
try: |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT NOW()") |
|
result = cursor.fetchone() |
|
conn.close() |
|
return f""" |
|
<h1>OTB Billing v{APP_VERSION}</h1> |
|
<h2>Database OK</h2> |
|
<p><a href="/">Home</a></p> |
|
<p>DB server time (UTC): {result[0]}</p> |
|
<p>Displayed local time: {fmt_local(result[0])}</p> |
|
""" |
|
except Exception as e: |
|
return f"<h1>Database FAILED</h1><pre>{e}</pre>" |
|
|
|
|
|
|
|
@app.route("/clients/export.csv") |
|
def export_clients_csv(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT |
|
id, |
|
client_code, |
|
company_name, |
|
contact_name, |
|
email, |
|
phone, |
|
status, |
|
created_at, |
|
updated_at |
|
FROM clients |
|
ORDER BY id ASC |
|
""") |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
output = StringIO() |
|
writer = csv.writer(output) |
|
writer.writerow([ |
|
"id", |
|
"client_code", |
|
"company_name", |
|
"contact_name", |
|
"email", |
|
"phone", |
|
"status", |
|
"created_at", |
|
"updated_at", |
|
]) |
|
|
|
for r in rows: |
|
writer.writerow([ |
|
r.get("id", ""), |
|
r.get("client_code", ""), |
|
r.get("company_name", ""), |
|
r.get("contact_name", ""), |
|
r.get("email", ""), |
|
r.get("phone", ""), |
|
r.get("status", ""), |
|
r.get("created_at", ""), |
|
r.get("updated_at", ""), |
|
]) |
|
|
|
response = make_response(output.getvalue()) |
|
response.headers["Content-Type"] = "text/csv; charset=utf-8" |
|
response.headers["Content-Disposition"] = "attachment; filename=clients.csv" |
|
return response |
|
|
|
@app.route("/clients") |
|
def clients(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute("SELECT * FROM clients ORDER BY id DESC") |
|
clients = cursor.fetchall() |
|
conn.close() |
|
|
|
for client in clients: |
|
client["credit_balance"] = get_client_credit_balance(client["id"]) |
|
|
|
return render_template("clients/list.html", clients=clients) |
|
|
|
@app.route("/clients/new", methods=["GET", "POST"]) |
|
def new_client(): |
|
if request.method == "POST": |
|
company_name = request.form["company_name"] |
|
contact_name = request.form["contact_name"] |
|
email = request.form["email"] |
|
phone = request.form["phone"] |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute("SELECT MAX(id) AS last_id FROM clients") |
|
result = cursor.fetchone() |
|
last_number = result["last_id"] if result["last_id"] else 0 |
|
|
|
client_code = generate_client_code(company_name, last_number) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute( |
|
""" |
|
INSERT INTO clients |
|
(client_code, company_name, contact_name, email, phone) |
|
VALUES (%s, %s, %s, %s, %s) |
|
""", |
|
(client_code, company_name, contact_name, email, phone) |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect("/clients") |
|
|
|
return render_template("clients/new.html") |
|
|
|
@app.route("/clients/edit/<int:client_id>", methods=["GET", "POST"]) |
|
def edit_client(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
company_name = request.form.get("company_name", "").strip() |
|
contact_name = request.form.get("contact_name", "").strip() |
|
email = request.form.get("email", "").strip() |
|
phone = request.form.get("phone", "").strip() |
|
status = request.form.get("status", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not company_name: |
|
errors.append("Company name is required.") |
|
if not status: |
|
errors.append("Status is required.") |
|
|
|
if errors: |
|
cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,)) |
|
client = cursor.fetchone() |
|
client["credit_balance"] = get_client_credit_balance(client_id) |
|
conn.close() |
|
return render_template("clients/edit.html", client=client, errors=errors) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE clients |
|
SET company_name = %s, |
|
contact_name = %s, |
|
email = %s, |
|
phone = %s, |
|
status = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
company_name, |
|
contact_name or None, |
|
email or None, |
|
phone or None, |
|
status, |
|
notes or None, |
|
client_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/clients") |
|
|
|
cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,)) |
|
client = cursor.fetchone() |
|
conn.close() |
|
|
|
if not client: |
|
return "Client not found", 404 |
|
|
|
client["credit_balance"] = get_client_credit_balance(client_id) |
|
|
|
return render_template("clients/edit.html", client=client, errors=[]) |
|
|
|
@app.route("/credits/<int:client_id>") |
|
def client_credits(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, client_code, company_name |
|
FROM clients |
|
WHERE id = %s |
|
""", (client_id,)) |
|
client = cursor.fetchone() |
|
|
|
if not client: |
|
conn.close() |
|
return "Client not found", 404 |
|
|
|
cursor.execute(""" |
|
SELECT * |
|
FROM credit_ledger |
|
WHERE client_id = %s |
|
ORDER BY id DESC |
|
""", (client_id,)) |
|
entries = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
balance = get_client_credit_balance(client_id) |
|
|
|
return render_template( |
|
"credits/list.html", |
|
client=client, |
|
entries=entries, |
|
balance=balance, |
|
) |
|
|
|
@app.route("/credits/add/<int:client_id>", methods=["GET", "POST"]) |
|
def add_credit(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, client_code, company_name |
|
FROM clients |
|
WHERE id = %s |
|
""", (client_id,)) |
|
client = cursor.fetchone() |
|
|
|
if not client: |
|
conn.close() |
|
return "Client not found", 404 |
|
|
|
if request.method == "POST": |
|
entry_type = request.form.get("entry_type", "").strip() |
|
amount = request.form.get("amount", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not entry_type: |
|
errors.append("Entry type is required.") |
|
if not amount: |
|
errors.append("Amount is required.") |
|
if not currency_code: |
|
errors.append("Currency code is required.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = Decimal(str(amount)) |
|
if amount_value == 0: |
|
errors.append("Amount cannot be zero.") |
|
except Exception: |
|
errors.append("Amount must be a valid number.") |
|
|
|
if errors: |
|
conn.close() |
|
return render_template("credits/add.html", client=client, errors=errors) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO credit_ledger |
|
( |
|
client_id, |
|
entry_type, |
|
amount, |
|
currency_code, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s) |
|
""", ( |
|
client_id, |
|
entry_type, |
|
amount, |
|
currency_code, |
|
notes or None |
|
)) |
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect(f"/credits/{client_id}") |
|
|
|
conn.close() |
|
return render_template("credits/add.html", client=client, errors=[]) |
|
|
|
@app.route("/services") |
|
def services(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT s.*, c.client_code, c.company_name |
|
FROM services s |
|
JOIN clients c ON s.client_id = c.id |
|
ORDER BY s.id DESC |
|
""") |
|
services = cursor.fetchall() |
|
conn.close() |
|
return render_template("services/list.html", services=services) |
|
|
|
@app.route("/services/new", methods=["GET", "POST"]) |
|
def new_service(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form["client_id"] |
|
service_name = request.form["service_name"] |
|
service_type = request.form["service_type"] |
|
billing_cycle = request.form["billing_cycle"] |
|
currency_code = request.form["currency_code"] |
|
recurring_amount = request.form["recurring_amount"] |
|
status = request.form["status"] |
|
start_date = request.form["start_date"] or None |
|
description = request.form["description"] |
|
|
|
cursor.execute("SELECT MAX(id) AS last_id FROM services") |
|
result = cursor.fetchone() |
|
last_number = result["last_id"] if result["last_id"] else 0 |
|
service_code = generate_service_code(service_name, last_number) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute( |
|
""" |
|
INSERT INTO services |
|
( |
|
client_id, |
|
service_code, |
|
service_name, |
|
service_type, |
|
billing_cycle, |
|
status, |
|
currency_code, |
|
recurring_amount, |
|
start_date, |
|
description |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
|
""", |
|
( |
|
client_id, |
|
service_code, |
|
service_name, |
|
service_type, |
|
billing_cycle, |
|
status, |
|
currency_code, |
|
recurring_amount, |
|
start_date, |
|
description |
|
) |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect("/services") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC") |
|
clients = cursor.fetchall() |
|
conn.close() |
|
return render_template("services/new.html", clients=clients) |
|
|
|
@app.route("/services/edit/<int:service_id>", methods=["GET", "POST"]) |
|
def edit_service(service_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form.get("client_id", "").strip() |
|
service_name = request.form.get("service_name", "").strip() |
|
service_type = request.form.get("service_type", "").strip() |
|
billing_cycle = request.form.get("billing_cycle", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
recurring_amount = request.form.get("recurring_amount", "").strip() |
|
status = request.form.get("status", "").strip() |
|
start_date = request.form.get("start_date", "").strip() |
|
description = request.form.get("description", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not service_name: |
|
errors.append("Service name is required.") |
|
if not service_type: |
|
errors.append("Service type is required.") |
|
if not billing_cycle: |
|
errors.append("Billing cycle is required.") |
|
if not currency_code: |
|
errors.append("Currency code is required.") |
|
if not recurring_amount: |
|
errors.append("Recurring amount is required.") |
|
if not status: |
|
errors.append("Status is required.") |
|
|
|
if not errors: |
|
try: |
|
recurring_amount_value = float(recurring_amount) |
|
if recurring_amount_value < 0: |
|
errors.append("Recurring amount cannot be negative.") |
|
except ValueError: |
|
errors.append("Recurring amount must be a valid number.") |
|
|
|
if errors: |
|
cursor.execute(""" |
|
SELECT s.*, c.company_name |
|
FROM services s |
|
LEFT JOIN clients c ON s.client_id = c.id |
|
WHERE s.id = %s |
|
""", (service_id,)) |
|
service = cursor.fetchone() |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC") |
|
clients = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("services/edit.html", service=service, clients=clients, errors=errors) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE services |
|
SET client_id = %s, |
|
service_name = %s, |
|
service_type = %s, |
|
billing_cycle = %s, |
|
status = %s, |
|
currency_code = %s, |
|
recurring_amount = %s, |
|
start_date = %s, |
|
description = %s |
|
WHERE id = %s |
|
""", ( |
|
client_id, |
|
service_name, |
|
service_type, |
|
billing_cycle, |
|
status, |
|
currency_code, |
|
recurring_amount, |
|
start_date or None, |
|
description or None, |
|
service_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/services") |
|
|
|
cursor.execute(""" |
|
SELECT s.*, c.company_name |
|
FROM services s |
|
LEFT JOIN clients c ON s.client_id = c.id |
|
WHERE s.id = %s |
|
""", (service_id,)) |
|
service = cursor.fetchone() |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC") |
|
clients = cursor.fetchall() |
|
conn.close() |
|
|
|
if not service: |
|
return "Service not found", 404 |
|
|
|
return render_template("services/edit.html", service=service, clients=clients, errors=[]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/invoices/export.csv") |
|
def export_invoices_csv(): |
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.client_id, |
|
c.client_code, |
|
c.company_name, |
|
i.service_id, |
|
i.currency_code, |
|
i.subtotal_amount, |
|
i.tax_amount, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
i.issued_at, |
|
i.due_at, |
|
i.paid_at, |
|
i.notes, |
|
i.created_at, |
|
i.updated_at |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id ASC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
output = StringIO() |
|
writer = csv.writer(output) |
|
writer.writerow([ |
|
"id", |
|
"invoice_number", |
|
"client_id", |
|
"client_code", |
|
"company_name", |
|
"service_id", |
|
"currency_code", |
|
"subtotal_amount", |
|
"tax_amount", |
|
"total_amount", |
|
"amount_paid", |
|
"status", |
|
"issued_at", |
|
"due_at", |
|
"paid_at", |
|
"notes", |
|
"created_at", |
|
"updated_at", |
|
]) |
|
|
|
for r in rows: |
|
writer.writerow([ |
|
r.get("id", ""), |
|
r.get("invoice_number", ""), |
|
r.get("client_id", ""), |
|
r.get("client_code", ""), |
|
r.get("company_name", ""), |
|
r.get("service_id", ""), |
|
r.get("currency_code", ""), |
|
r.get("subtotal_amount", ""), |
|
r.get("tax_amount", ""), |
|
r.get("total_amount", ""), |
|
r.get("amount_paid", ""), |
|
r.get("status", ""), |
|
r.get("issued_at", ""), |
|
r.get("due_at", ""), |
|
r.get("paid_at", ""), |
|
r.get("notes", ""), |
|
r.get("created_at", ""), |
|
r.get("updated_at", ""), |
|
]) |
|
|
|
filename = "invoices" |
|
if start_date or end_date or status or client_id or limit_count: |
|
filename += "_filtered" |
|
filename += ".csv" |
|
|
|
response = make_response(output.getvalue()) |
|
response.headers["Content-Type"] = "text/csv; charset=utf-8" |
|
response.headers["Content-Disposition"] = f"attachment; filename={filename}" |
|
return response |
|
|
|
|
|
@app.route("/invoices/export-pdf.zip") |
|
def export_invoices_pdf_zip(): |
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id ASC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
settings = get_app_settings() |
|
|
|
def build_invoice_pdf_bytes(invoice, settings): |
|
buffer = BytesIO() |
|
pdf = canvas.Canvas(buffer, pagesize=letter) |
|
width, height = letter |
|
|
|
left = 50 |
|
right = 560 |
|
y = height - 50 |
|
|
|
def money(value, currency="CAD"): |
|
return f"{to_decimal(value):.2f} {currency}" |
|
|
|
pdf.setTitle(f"Invoice {invoice['invoice_number']}") |
|
|
|
logo_url = (settings.get("business_logo_url") or "").strip() |
|
if logo_url.startswith("/static/"): |
|
local_logo_path = "/home/def/otb_billing" + logo_url |
|
try: |
|
pdf.drawImage(ImageReader(local_logo_path), left, y - 35, width=42, height=42, preserveAspectRatio=True, mask='auto') |
|
except Exception: |
|
pass |
|
|
|
pdf.setFont("Helvetica-Bold", 22) |
|
pdf.drawString(left + 60, y, f"Invoice {invoice['invoice_number']}") |
|
|
|
pdf.setFont("Helvetica-Bold", 14) |
|
pdf.drawRightString(right, y, settings.get("business_name") or "OTB Billing") |
|
y -= 18 |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawRightString(right, y, settings.get("business_tagline") or "") |
|
y -= 15 |
|
|
|
right_lines = [ |
|
settings.get("business_address", ""), |
|
settings.get("business_email", ""), |
|
settings.get("business_phone", ""), |
|
settings.get("business_website", ""), |
|
] |
|
for item in right_lines: |
|
if item: |
|
pdf.drawRightString(right, y, item[:80]) |
|
y -= 14 |
|
|
|
y -= 10 |
|
|
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, "Status:") |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawString(left + 45, y, str(invoice["status"]).upper()) |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Bill To") |
|
y -= 20 |
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, invoice["company_name"] or "") |
|
y -= 16 |
|
pdf.setFont("Helvetica", 11) |
|
if invoice.get("contact_name"): |
|
pdf.drawString(left, y, str(invoice["contact_name"])) |
|
y -= 15 |
|
if invoice.get("email"): |
|
pdf.drawString(left, y, str(invoice["email"])) |
|
y -= 15 |
|
if invoice.get("phone"): |
|
pdf.drawString(left, y, str(invoice["phone"])) |
|
y -= 15 |
|
pdf.drawString(left, y, f"Client Code: {invoice.get('client_code', '')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Invoice Details") |
|
y -= 20 |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, f"Invoice #: {invoice['invoice_number']}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Issued: {fmt_local(invoice.get('issued_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Due: {fmt_local(invoice.get('due_at'))}") |
|
y -= 15 |
|
if invoice.get("paid_at"): |
|
pdf.drawString(left, y, f"Paid: {fmt_local(invoice.get('paid_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Currency: {invoice.get('currency_code', 'CAD')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Service Code") |
|
pdf.drawString(180, y, "Service") |
|
pdf.drawString(330, y, "Description") |
|
pdf.drawRightString(right, y, "Total") |
|
y -= 14 |
|
pdf.line(left, y, right, y) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, str(invoice.get("service_code") or "-")) |
|
pdf.drawString(180, y, str(invoice.get("service_name") or "-")) |
|
pdf.drawString(330, y, str(invoice.get("notes") or "-")[:28]) |
|
pdf.drawRightString(right, y, money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))) |
|
y -= 28 |
|
|
|
totals_x_label = 360 |
|
totals_x_value = right |
|
|
|
totals = [ |
|
("Subtotal", money(invoice.get("subtotal_amount"), invoice.get("currency_code", "CAD"))), |
|
((settings.get("tax_label") or "Tax"), money(invoice.get("tax_amount"), invoice.get("currency_code", "CAD"))), |
|
("Total", money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))), |
|
("Paid", money(invoice.get("amount_paid"), invoice.get("currency_code", "CAD"))), |
|
] |
|
|
|
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid")) |
|
|
|
for label, value in totals: |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, label) |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawRightString(totals_x_value, y, value) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, "Remaining") |
|
pdf.drawRightString(totals_x_value, y, f"{remaining:.2f} {invoice.get('currency_code', 'CAD')}") |
|
y -= 25 |
|
|
|
if settings.get("tax_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"{settings.get('tax_label') or 'Tax'} Number: {settings.get('tax_number')}") |
|
y -= 14 |
|
|
|
if settings.get("business_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"Business Number: {settings.get('business_number')}") |
|
y -= 14 |
|
|
|
if settings.get("payment_terms"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Payment Terms") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
terms = settings.get("payment_terms", "") |
|
for chunk_start in range(0, len(terms), 90): |
|
line_text = terms[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
if settings.get("invoice_footer"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Footer") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
footer = settings.get("invoice_footer", "") |
|
for chunk_start in range(0, len(footer), 90): |
|
line_text = footer[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
pdf.showPage() |
|
pdf.save() |
|
buffer.seek(0) |
|
return buffer.getvalue() |
|
|
|
zip_buffer = BytesIO() |
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: |
|
for invoice in invoices: |
|
pdf_bytes = build_invoice_pdf_bytes(invoice, settings) |
|
zipf.writestr(f"{invoice['invoice_number']}.pdf", pdf_bytes) |
|
|
|
zip_buffer.seek(0) |
|
|
|
filename = "invoices_export" |
|
if start_date: |
|
filename += f"_{start_date}" |
|
if end_date: |
|
filename += f"_to_{end_date}" |
|
if status: |
|
filename += f"_{status}" |
|
if client_id: |
|
filename += f"_client_{client_id}" |
|
if limit_count: |
|
filename += f"_limit_{limit_count}" |
|
filename += ".zip" |
|
|
|
return send_file( |
|
zip_buffer, |
|
mimetype="application/zip", |
|
as_attachment=True, |
|
download_name=filename |
|
) |
|
|
|
|
|
@app.route("/invoices/print") |
|
def print_invoices(): |
|
refresh_overdue_invoices() |
|
|
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id ASC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
settings = get_app_settings() |
|
|
|
filters = { |
|
"start_date": start_date, |
|
"end_date": end_date, |
|
"status": status, |
|
"client_id": client_id, |
|
"limit": limit_count, |
|
} |
|
|
|
return render_template("invoices/print_batch.html", invoices=invoices, settings=settings, filters=filters) |
|
|
|
@app.route("/invoices") |
|
def invoices(): |
|
refresh_overdue_invoices() |
|
|
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
COALESCE((SELECT COUNT(*) FROM payments p WHERE p.invoice_id = i.id AND p.payment_status = 'confirmed'), 0) AS payment_count |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id DESC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
invoices = cursor.fetchall() |
|
|
|
cursor.execute(""" |
|
SELECT id, client_code, company_name |
|
FROM clients |
|
ORDER BY company_name ASC |
|
""") |
|
clients = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
filters = { |
|
"start_date": start_date, |
|
"end_date": end_date, |
|
"status": status, |
|
"client_id": client_id, |
|
"limit": limit_count, |
|
} |
|
|
|
return render_template("invoices/list.html", invoices=invoices, filters=filters, clients=clients) |
|
|
|
@app.route("/invoices/new", methods=["GET", "POST"]) |
|
def new_invoice(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form.get("client_id", "").strip() |
|
service_id = request.form.get("service_id", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
total_amount = request.form.get("total_amount", "").strip() |
|
due_at = request.form.get("due_at", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not service_id: |
|
errors.append("Service is required.") |
|
if not currency_code: |
|
errors.append("Currency is required.") |
|
if not total_amount: |
|
errors.append("Total amount is required.") |
|
if not due_at: |
|
errors.append("Due date is required.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = float(total_amount) |
|
if amount_value <= 0: |
|
errors.append("Total amount must be greater than zero.") |
|
except ValueError: |
|
errors.append("Total amount must be a valid number.") |
|
|
|
if errors: |
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
form_data = { |
|
"client_id": client_id, |
|
"service_id": service_id, |
|
"currency_code": currency_code, |
|
"total_amount": total_amount, |
|
"due_at": due_at, |
|
"notes": notes, |
|
} |
|
|
|
return render_template( |
|
"invoices/new.html", |
|
clients=clients, |
|
services=services, |
|
errors=errors, |
|
form_data=form_data, |
|
) |
|
|
|
invoice_number = generate_invoice_number() |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO invoices |
|
( |
|
client_id, |
|
service_id, |
|
invoice_number, |
|
currency_code, |
|
total_amount, |
|
subtotal_amount, |
|
issued_at, |
|
due_at, |
|
status, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, UTC_TIMESTAMP(), %s, 'pending', %s) |
|
""", ( |
|
client_id, |
|
service_id, |
|
invoice_number, |
|
currency_code, |
|
total_amount, |
|
total_amount, |
|
due_at, |
|
notes |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect("/invoices") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
return render_template( |
|
"invoices/new.html", |
|
clients=clients, |
|
services=services, |
|
errors=[], |
|
form_data={}, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
@app.route("/invoices/pdf/<int:invoice_id>") |
|
def invoice_pdf(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return "Invoice not found", 404 |
|
|
|
conn.close() |
|
|
|
settings = get_app_settings() |
|
|
|
buffer = BytesIO() |
|
pdf = canvas.Canvas(buffer, pagesize=letter) |
|
width, height = letter |
|
|
|
left = 50 |
|
right = 560 |
|
y = height - 50 |
|
|
|
def draw_line(txt, x=left, font="Helvetica", size=11): |
|
nonlocal y |
|
pdf.setFont(font, size) |
|
pdf.drawString(x, y, str(txt) if txt is not None else "") |
|
y -= 16 |
|
|
|
def money(value, currency="CAD"): |
|
return f"{to_decimal(value):.2f} {currency}" |
|
|
|
pdf.setTitle(f"Invoice {invoice['invoice_number']}") |
|
|
|
logo_url = (settings.get("business_logo_url") or "").strip() |
|
if logo_url.startswith("/static/"): |
|
local_logo_path = "/home/def/otb_billing" + logo_url |
|
try: |
|
pdf.drawImage(ImageReader(local_logo_path), left, y - 35, width=42, height=42, preserveAspectRatio=True, mask='auto') |
|
except Exception: |
|
pass |
|
|
|
pdf.setFont("Helvetica-Bold", 22) |
|
pdf.drawString(left + 60, y, f"Invoice {invoice['invoice_number']}") |
|
|
|
pdf.setFont("Helvetica-Bold", 14) |
|
pdf.drawRightString(right, y, settings.get("business_name") or "OTB Billing") |
|
y -= 18 |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawRightString(right, y, settings.get("business_tagline") or "") |
|
y -= 15 |
|
|
|
right_lines = [ |
|
settings.get("business_address", ""), |
|
settings.get("business_email", ""), |
|
settings.get("business_phone", ""), |
|
settings.get("business_website", ""), |
|
] |
|
for item in right_lines: |
|
if item: |
|
pdf.drawRightString(right, y, item[:80]) |
|
y -= 14 |
|
|
|
y -= 10 |
|
|
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, "Status:") |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawString(left + 45, y, str(invoice["status"]).upper()) |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Bill To") |
|
y -= 20 |
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, invoice["company_name"] or "") |
|
y -= 16 |
|
pdf.setFont("Helvetica", 11) |
|
if invoice.get("contact_name"): |
|
pdf.drawString(left, y, str(invoice["contact_name"])) |
|
y -= 15 |
|
if invoice.get("email"): |
|
pdf.drawString(left, y, str(invoice["email"])) |
|
y -= 15 |
|
if invoice.get("phone"): |
|
pdf.drawString(left, y, str(invoice["phone"])) |
|
y -= 15 |
|
pdf.drawString(left, y, f"Client Code: {invoice.get('client_code', '')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Invoice Details") |
|
y -= 20 |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, f"Invoice #: {invoice['invoice_number']}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Issued: {fmt_local(invoice.get('issued_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Due: {fmt_local(invoice.get('due_at'))}") |
|
y -= 15 |
|
if invoice.get("paid_at"): |
|
pdf.drawString(left, y, f"Paid: {fmt_local(invoice.get('paid_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Currency: {invoice.get('currency_code', 'CAD')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Service Code") |
|
pdf.drawString(180, y, "Service") |
|
pdf.drawString(330, y, "Description") |
|
pdf.drawRightString(right, y, "Total") |
|
y -= 14 |
|
pdf.line(left, y, right, y) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, str(invoice.get("service_code") or "-")) |
|
pdf.drawString(180, y, str(invoice.get("service_name") or "-")) |
|
pdf.drawString(330, y, str(invoice.get("notes") or "-")[:28]) |
|
pdf.drawRightString(right, y, money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))) |
|
y -= 28 |
|
|
|
totals_x_label = 360 |
|
totals_x_value = right |
|
|
|
totals = [ |
|
("Subtotal", money(invoice.get("subtotal_amount"), invoice.get("currency_code", "CAD"))), |
|
((settings.get("tax_label") or "Tax"), money(invoice.get("tax_amount"), invoice.get("currency_code", "CAD"))), |
|
("Total", money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))), |
|
("Paid", money(invoice.get("amount_paid"), invoice.get("currency_code", "CAD"))), |
|
] |
|
|
|
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid")) |
|
|
|
for label, value in totals: |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, label) |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawRightString(totals_x_value, y, value) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, "Remaining") |
|
pdf.drawRightString(totals_x_value, y, f"{remaining:.2f} {invoice.get('currency_code', 'CAD')}") |
|
y -= 25 |
|
|
|
if settings.get("tax_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"{settings.get('tax_label') or 'Tax'} Number: {settings.get('tax_number')}") |
|
y -= 14 |
|
|
|
if settings.get("business_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"Business Number: {settings.get('business_number')}") |
|
y -= 14 |
|
|
|
if settings.get("payment_terms"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Payment Terms") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
for chunk_start in range(0, len(settings.get("payment_terms", "")), 90): |
|
line_text = settings.get("payment_terms", "")[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
if settings.get("invoice_footer"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Footer") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
for chunk_start in range(0, len(settings.get("invoice_footer", "")), 90): |
|
line_text = settings.get("invoice_footer", "")[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
pdf.showPage() |
|
pdf.save() |
|
buffer.seek(0) |
|
|
|
return send_file( |
|
buffer, |
|
mimetype="application/pdf", |
|
as_attachment=True, |
|
download_name=f"{invoice['invoice_number']}.pdf" |
|
) |
|
|
|
|
|
@app.route("/invoices/view/<int:invoice_id>") |
|
def view_invoice(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return "Invoice not found", 404 |
|
|
|
conn.close() |
|
settings = get_app_settings() |
|
return render_template("invoices/view.html", invoice=invoice, settings=settings) |
|
|
|
|
|
@app.route("/invoices/edit/<int:invoice_id>", methods=["GET", "POST"]) |
|
def edit_invoice(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT i.*, |
|
COALESCE((SELECT COUNT(*) FROM payments p WHERE p.invoice_id = i.id), 0) AS payment_count |
|
FROM invoices i |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return "Invoice not found", 404 |
|
|
|
locked = invoice["payment_count"] > 0 or float(invoice["amount_paid"]) > 0 |
|
|
|
if request.method == "POST": |
|
due_at = request.form.get("due_at", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
if locked: |
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE invoices |
|
SET due_at = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
due_at or None, |
|
notes or None, |
|
invoice_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/invoices") |
|
|
|
client_id = request.form.get("client_id", "").strip() |
|
service_id = request.form.get("service_id", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
total_amount = request.form.get("total_amount", "").strip() |
|
status = request.form.get("status", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not service_id: |
|
errors.append("Service is required.") |
|
if not currency_code: |
|
errors.append("Currency is required.") |
|
if not total_amount: |
|
errors.append("Total amount is required.") |
|
if not due_at: |
|
errors.append("Due date is required.") |
|
if not status: |
|
errors.append("Status is required.") |
|
|
|
manual_statuses = {"draft", "pending", "cancelled"} |
|
if status and status not in manual_statuses: |
|
errors.append("Manual invoice status must be draft, pending, or cancelled.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = float(total_amount) |
|
if amount_value < 0: |
|
errors.append("Total amount cannot be negative.") |
|
except ValueError: |
|
errors.append("Total amount must be a valid number.") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
if errors: |
|
invoice["client_id"] = int(client_id) if client_id else invoice["client_id"] |
|
invoice["service_id"] = int(service_id) if service_id else invoice["service_id"] |
|
invoice["currency_code"] = currency_code or invoice["currency_code"] |
|
invoice["total_amount"] = total_amount or invoice["total_amount"] |
|
invoice["due_at"] = due_at or invoice["due_at"] |
|
invoice["status"] = status or invoice["status"] |
|
invoice["notes"] = notes |
|
conn.close() |
|
return render_template("invoices/edit.html", invoice=invoice, clients=clients, services=services, errors=errors, locked=locked) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE invoices |
|
SET client_id = %s, |
|
service_id = %s, |
|
currency_code = %s, |
|
total_amount = %s, |
|
subtotal_amount = %s, |
|
due_at = %s, |
|
status = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
client_id, |
|
service_id, |
|
currency_code, |
|
total_amount, |
|
total_amount, |
|
due_at, |
|
status, |
|
notes or None, |
|
invoice_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/invoices") |
|
|
|
clients = [] |
|
services = [] |
|
|
|
if not locked: |
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("invoices/edit.html", invoice=invoice, clients=clients, services=services, errors=[], locked=locked) |
|
|
|
|
|
|
|
@app.route("/payments/export.csv") |
|
def export_payments_csv(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT |
|
p.id, |
|
p.invoice_id, |
|
i.invoice_number, |
|
p.client_id, |
|
c.client_code, |
|
c.company_name, |
|
p.payment_method, |
|
p.payment_currency, |
|
p.payment_amount, |
|
p.cad_value_at_payment, |
|
p.reference, |
|
p.sender_name, |
|
p.txid, |
|
p.wallet_address, |
|
p.payment_status, |
|
p.received_at, |
|
p.notes |
|
FROM payments p |
|
JOIN invoices i ON p.invoice_id = i.id |
|
JOIN clients c ON p.client_id = c.id |
|
ORDER BY p.id ASC |
|
""") |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
output = StringIO() |
|
writer = csv.writer(output) |
|
writer.writerow([ |
|
"id", |
|
"invoice_id", |
|
"invoice_number", |
|
"client_id", |
|
"client_code", |
|
"company_name", |
|
"payment_method", |
|
"payment_currency", |
|
"payment_amount", |
|
"cad_value_at_payment", |
|
"reference", |
|
"sender_name", |
|
"txid", |
|
"wallet_address", |
|
"payment_status", |
|
"received_at", |
|
"notes", |
|
]) |
|
|
|
for r in rows: |
|
writer.writerow([ |
|
r.get("id", ""), |
|
r.get("invoice_id", ""), |
|
r.get("invoice_number", ""), |
|
r.get("client_id", ""), |
|
r.get("client_code", ""), |
|
r.get("company_name", ""), |
|
r.get("payment_method", ""), |
|
r.get("payment_currency", ""), |
|
r.get("payment_amount", ""), |
|
r.get("cad_value_at_payment", ""), |
|
r.get("reference", ""), |
|
r.get("sender_name", ""), |
|
r.get("txid", ""), |
|
r.get("wallet_address", ""), |
|
r.get("payment_status", ""), |
|
r.get("received_at", ""), |
|
r.get("notes", ""), |
|
]) |
|
|
|
response = make_response(output.getvalue()) |
|
response.headers["Content-Type"] = "text/csv; charset=utf-8" |
|
response.headers["Content-Disposition"] = "attachment; filename=payments.csv" |
|
return response |
|
|
|
@app.route("/payments") |
|
def payments(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
p.*, |
|
i.invoice_number, |
|
i.status AS invoice_status, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.currency_code AS invoice_currency_code, |
|
c.client_code, |
|
c.company_name |
|
FROM payments p |
|
JOIN invoices i ON p.invoice_id = i.id |
|
JOIN clients c ON p.client_id = c.id |
|
ORDER BY p.id DESC |
|
""") |
|
payments = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("payments/list.html", payments=payments) |
|
|
|
@app.route("/payments/new", methods=["GET", "POST"]) |
|
def new_payment(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
invoice_id = request.form.get("invoice_id", "").strip() |
|
payment_method = request.form.get("payment_method", "").strip() |
|
payment_currency = request.form.get("payment_currency", "").strip() |
|
payment_amount = request.form.get("payment_amount", "").strip() |
|
cad_value_at_payment = request.form.get("cad_value_at_payment", "").strip() |
|
reference = request.form.get("reference", "").strip() |
|
sender_name = request.form.get("sender_name", "").strip() |
|
txid = request.form.get("txid", "").strip() |
|
wallet_address = request.form.get("wallet_address", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not invoice_id: |
|
errors.append("Invoice is required.") |
|
if not payment_method: |
|
errors.append("Payment method is required.") |
|
if not payment_currency: |
|
errors.append("Payment currency is required.") |
|
if not payment_amount: |
|
errors.append("Payment amount is required.") |
|
if not cad_value_at_payment: |
|
errors.append("CAD value at payment is required.") |
|
|
|
if not errors: |
|
try: |
|
payment_amount_value = Decimal(str(payment_amount)) |
|
if payment_amount_value <= Decimal("0"): |
|
errors.append("Payment amount must be greater than zero.") |
|
except Exception: |
|
errors.append("Payment amount must be a valid number.") |
|
|
|
if not errors: |
|
try: |
|
cad_value_value = Decimal(str(cad_value_at_payment)) |
|
if cad_value_value < Decimal("0"): |
|
errors.append("CAD value at payment cannot be negative.") |
|
except Exception: |
|
errors.append("CAD value at payment must be a valid number.") |
|
|
|
invoice_row = None |
|
|
|
if not errors: |
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.client_id, |
|
i.invoice_number, |
|
i.currency_code, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
c.client_code, |
|
c.company_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice_row = cursor.fetchone() |
|
|
|
if not invoice_row: |
|
errors.append("Selected invoice was not found.") |
|
else: |
|
allowed_statuses = {"pending", "partial", "overdue"} |
|
if invoice_row["status"] not in allowed_statuses: |
|
errors.append("Payments can only be recorded against pending, partial, or overdue invoices.") |
|
else: |
|
remaining_balance = to_decimal(invoice_row["total_amount"]) - to_decimal(invoice_row["amount_paid"]) |
|
entered_amount = to_decimal(payment_amount) |
|
|
|
if remaining_balance <= Decimal("0"): |
|
errors.append("This invoice has no remaining balance.") |
|
elif entered_amount > remaining_balance: |
|
errors.append( |
|
f"Payment amount exceeds remaining balance. Remaining balance is {fmt_money(remaining_balance, invoice_row['currency_code'])} {invoice_row['currency_code']}." |
|
) |
|
|
|
if errors: |
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.currency_code, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
c.client_code, |
|
c.company_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.status IN ('pending', 'partial', 'overdue') |
|
AND (i.total_amount - i.amount_paid) > 0 |
|
ORDER BY i.id DESC |
|
""") |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
form_data = { |
|
"invoice_id": invoice_id, |
|
"payment_method": payment_method, |
|
"payment_currency": payment_currency, |
|
"payment_amount": payment_amount, |
|
"cad_value_at_payment": cad_value_at_payment, |
|
"reference": reference, |
|
"sender_name": sender_name, |
|
"txid": txid, |
|
"wallet_address": wallet_address, |
|
"notes": notes, |
|
} |
|
|
|
return render_template( |
|
"payments/new.html", |
|
invoices=invoices, |
|
errors=errors, |
|
form_data=form_data, |
|
) |
|
|
|
client_id = invoice_row["client_id"] |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO payments |
|
( |
|
invoice_id, |
|
client_id, |
|
payment_method, |
|
payment_currency, |
|
payment_amount, |
|
cad_value_at_payment, |
|
reference, |
|
sender_name, |
|
txid, |
|
wallet_address, |
|
payment_status, |
|
received_at, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'confirmed', UTC_TIMESTAMP(), %s) |
|
""", ( |
|
invoice_id, |
|
client_id, |
|
payment_method, |
|
payment_currency, |
|
payment_amount, |
|
cad_value_at_payment, |
|
reference or None, |
|
sender_name or None, |
|
txid or None, |
|
wallet_address or None, |
|
notes or None |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
recalc_invoice_totals(invoice_id) |
|
|
|
return redirect("/payments") |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.currency_code, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
c.client_code, |
|
c.company_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.status IN ('pending', 'partial', 'overdue') |
|
AND (i.total_amount - i.amount_paid) > 0 |
|
ORDER BY i.id DESC |
|
""") |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
return render_template( |
|
"payments/new.html", |
|
invoices=invoices, |
|
errors=[], |
|
form_data={}, |
|
) |
|
|
|
|
|
|
|
@app.route("/payments/void/<int:payment_id>", methods=["POST"]) |
|
def void_payment(payment_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, invoice_id, payment_status |
|
FROM payments |
|
WHERE id = %s |
|
""", (payment_id,)) |
|
payment = cursor.fetchone() |
|
|
|
if not payment: |
|
conn.close() |
|
return "Payment not found", 404 |
|
|
|
if payment["payment_status"] != "confirmed": |
|
conn.close() |
|
return redirect("/payments") |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE payments |
|
SET payment_status = 'reversed' |
|
WHERE id = %s |
|
""", (payment_id,)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
recalc_invoice_totals(payment["invoice_id"]) |
|
|
|
return redirect("/payments") |
|
|
|
recalc_invoice_totals(payment["invoice_id"]) |
|
|
|
return redirect("/payments") |
|
|
|
@app.route("/payments/edit/<int:payment_id>", methods=["GET", "POST"]) |
|
def edit_payment(payment_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
p.*, |
|
i.invoice_number, |
|
c.client_code, |
|
c.company_name |
|
FROM payments p |
|
JOIN invoices i ON p.invoice_id = i.id |
|
JOIN clients c ON p.client_id = c.id |
|
WHERE p.id = %s |
|
""", (payment_id,)) |
|
payment = cursor.fetchone() |
|
|
|
if not payment: |
|
conn.close() |
|
return "Payment not found", 404 |
|
|
|
if request.method == "POST": |
|
payment_method = request.form.get("payment_method", "").strip() |
|
payment_currency = request.form.get("payment_currency", "").strip() |
|
payment_amount = request.form.get("payment_amount", "").strip() |
|
cad_value_at_payment = request.form.get("cad_value_at_payment", "").strip() |
|
reference = request.form.get("reference", "").strip() |
|
sender_name = request.form.get("sender_name", "").strip() |
|
txid = request.form.get("txid", "").strip() |
|
wallet_address = request.form.get("wallet_address", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not payment_method: |
|
errors.append("Payment method is required.") |
|
if not payment_currency: |
|
errors.append("Payment currency is required.") |
|
if not payment_amount: |
|
errors.append("Payment amount is required.") |
|
if not cad_value_at_payment: |
|
errors.append("CAD value at payment is required.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = float(payment_amount) |
|
if amount_value <= 0: |
|
errors.append("Payment amount must be greater than zero.") |
|
except ValueError: |
|
errors.append("Payment amount must be a valid number.") |
|
|
|
try: |
|
cad_value = float(cad_value_at_payment) |
|
if cad_value < 0: |
|
errors.append("CAD value at payment cannot be negative.") |
|
except ValueError: |
|
errors.append("CAD value at payment must be a valid number.") |
|
|
|
if errors: |
|
payment["payment_method"] = payment_method or payment["payment_method"] |
|
payment["payment_currency"] = payment_currency or payment["payment_currency"] |
|
payment["payment_amount"] = payment_amount or payment["payment_amount"] |
|
payment["cad_value_at_payment"] = cad_value_at_payment or payment["cad_value_at_payment"] |
|
payment["reference"] = reference |
|
payment["sender_name"] = sender_name |
|
payment["txid"] = txid |
|
payment["wallet_address"] = wallet_address |
|
payment["notes"] = notes |
|
conn.close() |
|
return render_template("payments/edit.html", payment=payment, errors=errors) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE payments |
|
SET payment_method = %s, |
|
payment_currency = %s, |
|
payment_amount = %s, |
|
cad_value_at_payment = %s, |
|
reference = %s, |
|
sender_name = %s, |
|
txid = %s, |
|
wallet_address = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
payment_method, |
|
payment_currency, |
|
payment_amount, |
|
cad_value_at_payment, |
|
reference or None, |
|
sender_name or None, |
|
txid or None, |
|
wallet_address or None, |
|
notes or None, |
|
payment_id |
|
)) |
|
conn.commit() |
|
invoice_id = payment["invoice_id"] |
|
conn.close() |
|
|
|
recalc_invoice_totals(invoice_id) |
|
|
|
return redirect("/payments") |
|
|
|
conn.close() |
|
return render_template("payments/edit.html", payment=payment, errors=[]) |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=5050, debug=True)
|
|
|