billing frontend for mariadb. setup as otb_billing for outsidethebox.top accounting. also involved with outsidethedb
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

2591 lines
80 KiB

from flask import Flask, render_template, request, redirect, send_file, make_response, jsonify
from db import get_db_connection
from utils import generate_client_code, generate_service_code
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from decimal import Decimal, InvalidOperation
from email.message import EmailMessage
from io import BytesIO, StringIO
import csv
import zipfile
import smtplib
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.utils import ImageReader
app = Flask(
__name__,
template_folder="../templates",
static_folder="../static",
)
LOCAL_TZ = ZoneInfo("America/Toronto")
def load_version():
try:
with open("/home/def/otb_billing/VERSION", "r") as f:
return f.read().strip()
except Exception:
return "unknown"
APP_VERSION = load_version()
@app.context_processor
def inject_version():
return {"app_version": APP_VERSION}
@app.context_processor
def inject_app_settings():
return {"app_settings": get_app_settings()}
def fmt_local(dt_value):
if not dt_value:
return ""
if isinstance(dt_value, str):
try:
dt_value = datetime.fromisoformat(dt_value)
except ValueError:
return str(dt_value)
if dt_value.tzinfo is None:
dt_value = dt_value.replace(tzinfo=timezone.utc)
return dt_value.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %I:%M:%S %p")
def to_decimal(value):
if value is None or value == "":
return Decimal("0")
try:
return Decimal(str(value))
except (InvalidOperation, ValueError):
return Decimal("0")
def fmt_money(value, currency_code="CAD"):
amount = to_decimal(value)
if currency_code == "CAD":
return f"{amount:.2f}"
return f"{amount:.8f}"
def refresh_overdue_invoices():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE invoices
SET status = 'overdue'
WHERE due_at IS NOT NULL
AND due_at < UTC_TIMESTAMP()
AND status IN ('pending', 'partial')
""")
conn.commit()
conn.close()
def recalc_invoice_totals(invoice_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, total_amount, due_at, status
FROM invoices
WHERE id = %s
""", (invoice_id,))
invoice = cursor.fetchone()
if not invoice:
conn.close()
return
cursor.execute("""
SELECT COALESCE(SUM(payment_amount), 0) AS total_paid
FROM payments
WHERE invoice_id = %s
AND payment_status = 'confirmed'
""", (invoice_id,))
row = cursor.fetchone()
total_paid = to_decimal(row["total_paid"])
total_amount = to_decimal(invoice["total_amount"])
if invoice["status"] == "cancelled":
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE invoices
SET amount_paid = %s,
paid_at = NULL
WHERE id = %s
""", (
str(total_paid),
invoice_id
))
conn.commit()
conn.close()
return
if total_paid >= total_amount and total_amount > 0:
new_status = "paid"
paid_at_value = "UTC_TIMESTAMP()"
elif total_paid > 0:
new_status = "partial"
paid_at_value = "NULL"
else:
if invoice["due_at"] and invoice["due_at"] < datetime.utcnow():
new_status = "overdue"
else:
new_status = "pending"
paid_at_value = "NULL"
update_cursor = conn.cursor()
update_cursor.execute(f"""
UPDATE invoices
SET amount_paid = %s,
status = %s,
paid_at = {paid_at_value}
WHERE id = %s
""", (
str(total_paid),
new_status,
invoice_id
))
conn.commit()
conn.close()
def get_client_credit_balance(client_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT COALESCE(SUM(amount), 0) AS balance
FROM credit_ledger
WHERE client_id = %s
""", (client_id,))
row = cursor.fetchone()
conn.close()
return to_decimal(row["balance"])
def generate_invoice_number():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT invoice_number
FROM invoices
WHERE invoice_number IS NOT NULL
AND invoice_number LIKE 'INV-%'
ORDER BY id DESC
LIMIT 1
""")
row = cursor.fetchone()
conn.close()
if not row or not row.get("invoice_number"):
return "INV-0001"
invoice_number = str(row["invoice_number"]).strip()
try:
number = int(invoice_number.split("-")[1])
except (IndexError, ValueError):
return "INV-0001"
return f"INV-{number + 1:04d}"
APP_SETTINGS_DEFAULTS = {
"business_name": "OTB Billing",
"business_tagline": "By a contractor, for contractors",
"business_logo_url": "",
"business_email": "",
"business_phone": "",
"business_address": "",
"business_website": "",
"tax_label": "HST",
"tax_rate": "13.00",
"tax_number": "",
"business_number": "",
"default_currency": "CAD",
"report_frequency": "monthly",
"invoice_footer": "",
"payment_terms": "",
"local_country": "Canada",
"apply_local_tax_only": "1",
"smtp_host": "",
"smtp_port": "587",
"smtp_user": "",
"smtp_pass": "",
"smtp_from_email": "",
"smtp_from_name": "",
"smtp_use_tls": "1",
"smtp_use_ssl": "0",
"report_delivery_email": "",
}
def ensure_app_settings_table():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS app_settings (
setting_key VARCHAR(100) NOT NULL PRIMARY KEY,
setting_value TEXT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def get_app_settings():
ensure_app_settings_table()
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT setting_key, setting_value
FROM app_settings
""")
rows = cursor.fetchall()
conn.close()
settings = dict(APP_SETTINGS_DEFAULTS)
for row in rows:
settings[row["setting_key"]] = row["setting_value"] if row["setting_value"] is not None else ""
return settings
def save_app_settings(form_data):
ensure_app_settings_table()
conn = get_db_connection()
cursor = conn.cursor()
for key in APP_SETTINGS_DEFAULTS.keys():
if key in {"apply_local_tax_only", "smtp_use_tls", "smtp_use_ssl"}:
value = "1" if form_data.get(key) else "0"
else:
value = (form_data.get(key) or "").strip()
cursor.execute("""
INSERT INTO app_settings (setting_key, setting_value)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE setting_value = VALUES(setting_value)
""", (key, value))
conn.commit()
conn.close()
@app.template_filter("localtime")
def localtime_filter(value):
return fmt_local(value)
@app.template_filter("money")
def money_filter(value, currency_code="CAD"):
return fmt_money(value, currency_code)
def get_report_period_bounds(frequency):
now_local = datetime.now(LOCAL_TZ)
if frequency == "yearly":
start_local = now_local.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
label = f"{now_local.year}"
elif frequency == "quarterly":
quarter = ((now_local.month - 1) // 3) + 1
start_month = (quarter - 1) * 3 + 1
start_local = now_local.replace(month=start_month, day=1, hour=0, minute=0, second=0, microsecond=0)
label = f"Q{quarter} {now_local.year}"
else:
start_local = now_local.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
label = now_local.strftime("%B %Y")
start_utc = start_local.astimezone(timezone.utc).replace(tzinfo=None)
end_utc = now_local.astimezone(timezone.utc).replace(tzinfo=None)
return start_utc, end_utc, label
def get_revenue_report_data():
settings = get_app_settings()
frequency = (settings.get("report_frequency") or "monthly").strip().lower()
if frequency not in {"monthly", "quarterly", "yearly"}:
frequency = "monthly"
start_utc, end_utc, label = get_report_period_bounds(frequency)
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS collected
FROM payments
WHERE payment_status = 'confirmed'
AND received_at >= %s
AND received_at <= %s
""", (start_utc, end_utc))
collected_row = cursor.fetchone()
cursor.execute("""
SELECT COUNT(*) AS invoice_count,
COALESCE(SUM(total_amount), 0) AS invoiced
FROM invoices
WHERE issued_at >= %s
AND issued_at <= %s
""", (start_utc, end_utc))
invoiced_row = cursor.fetchone()
cursor.execute("""
SELECT COUNT(*) AS overdue_count,
COALESCE(SUM(total_amount - amount_paid), 0) AS overdue_balance
FROM invoices
WHERE status = 'overdue'
""")
overdue_row = cursor.fetchone()
cursor.execute("""
SELECT COUNT(*) AS outstanding_count,
COALESCE(SUM(total_amount - amount_paid), 0) AS outstanding_balance
FROM invoices
WHERE status IN ('pending', 'partial', 'overdue')
""")
outstanding_row = cursor.fetchone()
conn.close()
return {
"frequency": frequency,
"period_label": label,
"period_start": start_utc.isoformat(sep=" "),
"period_end": end_utc.isoformat(sep=" "),
"collected_cad": str(to_decimal(collected_row["collected"])),
"invoice_count": int(invoiced_row["invoice_count"] or 0),
"invoiced_total": str(to_decimal(invoiced_row["invoiced"])),
"overdue_count": int(overdue_row["overdue_count"] or 0),
"overdue_balance": str(to_decimal(overdue_row["overdue_balance"])),
"outstanding_count": int(outstanding_row["outstanding_count"] or 0),
"outstanding_balance": str(to_decimal(outstanding_row["outstanding_balance"])),
}
def ensure_email_log_table():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_log (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
email_type VARCHAR(50) NOT NULL,
invoice_id INT UNSIGNED NULL,
recipient_email VARCHAR(255) NOT NULL,
subject VARCHAR(255) NOT NULL,
status VARCHAR(20) NOT NULL,
error_message TEXT NULL,
sent_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_email_log_invoice_id (invoice_id),
KEY idx_email_log_type (email_type),
KEY idx_email_log_sent_at (sent_at)
)
""")
conn.commit()
conn.close()
def log_email_event(email_type, recipient_email, subject, status, invoice_id=None, error_message=None):
ensure_email_log_table()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_log
(email_type, invoice_id, recipient_email, subject, status, error_message)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
email_type,
invoice_id,
recipient_email,
subject,
status,
error_message
))
conn.commit()
conn.close()
def send_configured_email(to_email, subject, body, attachments=None, email_type="system_email", invoice_id=None):
settings = get_app_settings()
smtp_host = (settings.get("smtp_host") or "").strip()
smtp_port = int((settings.get("smtp_port") or "587").strip() or "587")
smtp_user = (settings.get("smtp_user") or "").strip()
smtp_pass = (settings.get("smtp_pass") or "").strip()
from_email = (settings.get("smtp_from_email") or settings.get("business_email") or "").strip()
from_name = (settings.get("smtp_from_name") or settings.get("business_name") or "").strip()
use_tls = (settings.get("smtp_use_tls") or "0") == "1"
use_ssl = (settings.get("smtp_use_ssl") or "0") == "1"
if not smtp_host:
raise ValueError("SMTP host is not configured.")
if not from_email:
raise ValueError("From email is not configured.")
if not to_email:
raise ValueError("Recipient email is missing.")
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = f"{from_name} <{from_email}>" if from_name else from_email
msg["To"] = to_email
msg.set_content(body)
for attachment in attachments or []:
filename = attachment["filename"]
mime_type = attachment["mime_type"]
data = attachment["data"]
maintype, subtype = mime_type.split("/", 1)
msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename)
try:
if use_ssl:
with smtplib.SMTP_SSL(smtp_host, smtp_port, timeout=30) as server:
if smtp_user:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
else:
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as server:
server.ehlo()
if use_tls:
server.starttls()
server.ehlo()
if smtp_user:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
log_email_event(email_type, to_email, subject, "sent", invoice_id=invoice_id, error_message=None)
except Exception as e:
log_email_event(email_type, to_email, subject, "failed", invoice_id=invoice_id, error_message=str(e))
raise
@app.route("/settings", methods=["GET", "POST"])
def settings():
ensure_app_settings_table()
if request.method == "POST":
save_app_settings(request.form)
return redirect("/settings")
settings = get_app_settings()
return render_template("settings.html", settings=settings)
@app.route("/reports/accounting-package.zip")
def accounting_package_zip():
package_bytes, filename = build_accounting_package_bytes()
return send_file(
BytesIO(package_bytes),
mimetype="application/zip",
as_attachment=True,
download_name=filename
)
@app.route("/reports/revenue")
def revenue_report():
report = get_revenue_report_data()
return render_template("reports/revenue.html", report=report)
@app.route("/reports/revenue.json")
def revenue_report_json():
report = get_revenue_report_data()
return jsonify(report)
@app.route("/reports/revenue/print")
def revenue_report_print():
report = get_revenue_report_data()
return render_template("reports/revenue_print.html", report=report)
@app.route("/invoices/email/<int:invoice_id>", methods=["POST"])
def email_invoice(invoice_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
i.*,
c.client_code,
c.company_name,
c.contact_name,
c.email,
c.phone
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE i.id = %s
""", (invoice_id,))
invoice = cursor.fetchone()
conn.close()
if not invoice:
return "Invoice not found", 404
recipient = (invoice.get("email") or "").strip()
if not recipient:
return "Client email is missing for this invoice.", 400
settings = get_app_settings()
with app.test_client() as client:
pdf_response = client.get(f"/invoices/pdf/{invoice_id}")
if pdf_response.status_code != 200:
return "Could not generate invoice PDF for email.", 500
pdf_bytes = pdf_response.data
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid"))
subject = f"Invoice {invoice['invoice_number']} from {settings.get('business_name') or 'OTB Billing'}"
body = (
f"Hello {invoice.get('contact_name') or invoice.get('company_name') or ''},\n\n"
f"Please find attached invoice {invoice['invoice_number']}.\n"
f"Total: {to_decimal(invoice.get('total_amount')):.2f} {invoice.get('currency_code', 'CAD')}\n"
f"Remaining: {remaining:.2f} {invoice.get('currency_code', 'CAD')}\n"
f"Due: {fmt_local(invoice.get('due_at'))}\n\n"
f"Thank you,\n"
f"{settings.get('business_name') or 'OTB Billing'}"
)
try:
send_configured_email(
recipient,
subject,
body,
email_type="invoice",
invoice_id=invoice_id,
attachments=[{
"filename": f"{invoice['invoice_number']}.pdf",
"mime_type": "application/pdf",
"data": pdf_bytes,
}]
)
return redirect(f"/invoices/view/{invoice_id}?email_sent=1")
except Exception:
return redirect(f"/invoices/view/{invoice_id}?email_failed=1")
@app.route("/reports/revenue/email", methods=["POST"])
def email_revenue_report_json():
settings = get_app_settings()
recipient = (settings.get("report_delivery_email") or settings.get("business_email") or "").strip()
if not recipient:
return "Report delivery email is not configured.", 400
with app.test_client() as client:
json_response = client.get("/reports/revenue.json")
if json_response.status_code != 200:
return "Could not generate revenue report JSON.", 500
report = get_revenue_report_data()
subject = f"Revenue Report {report.get('period_label', '')} from {settings.get('business_name') or 'OTB Billing'}"
body = (
f"Attached is the revenue report JSON for {report.get('period_label', '')}.\n\n"
f"Frequency: {report.get('frequency', '')}\n"
f"Collected CAD: {report.get('collected_cad', '')}\n"
f"Invoices Issued: {report.get('invoice_count', '')}\n"
)
try:
send_configured_email(
recipient,
subject,
body,
email_type="revenue_report",
attachments=[{
"filename": "revenue_report.json",
"mime_type": "application/json",
"data": json_response.data,
}]
)
return redirect("/reports/revenue?email_sent=1")
except Exception:
return redirect("/reports/revenue?email_failed=1")
@app.route("/reports/accounting-package/email", methods=["POST"])
def email_accounting_package():
settings = get_app_settings()
recipient = (settings.get("report_delivery_email") or settings.get("business_email") or "").strip()
if not recipient:
return "Report delivery email is not configured.", 400
with app.test_client() as client:
zip_response = client.get("/reports/accounting-package.zip")
if zip_response.status_code != 200:
return "Could not generate accounting package ZIP.", 500
subject = f"Accounting Package from {settings.get('business_name') or 'OTB Billing'}"
body = "Attached is the latest accounting package export."
try:
send_configured_email(
recipient,
subject,
body,
email_type="accounting_package",
attachments=[{
"filename": "accounting_package.zip",
"mime_type": "application/zip",
"data": zip_response.data,
}]
)
return redirect("/?pkg_email=1")
except Exception:
return redirect("/?pkg_email_failed=1")
@app.route("/")
def index():
refresh_overdue_invoices()
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) AS total_clients FROM clients")
total_clients = cursor.fetchone()["total_clients"]
cursor.execute("SELECT COUNT(*) AS active_services FROM services WHERE status = 'active'")
active_services = cursor.fetchone()["active_services"]
cursor.execute("""
SELECT COUNT(*) AS outstanding_invoices
FROM invoices
WHERE status IN ('pending', 'partial', 'overdue')
""")
outstanding_invoices = cursor.fetchone()["outstanding_invoices"]
cursor.execute("""
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS revenue_received
FROM payments
WHERE payment_status = 'confirmed'
""")
revenue_received = cursor.fetchone()["revenue_received"]
conn.close()
return render_template(
"dashboard.html",
total_clients=total_clients,
active_services=active_services,
outstanding_invoices=outstanding_invoices,
revenue_received=revenue_received,
)
@app.route("/dbtest")
def dbtest():
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT NOW()")
result = cursor.fetchone()
conn.close()
return f"""
<h1>OTB Billing v{APP_VERSION}</h1>
<h2>Database OK</h2>
<p><a href="/">Home</a></p>
<p>DB server time (UTC): {result[0]}</p>
<p>Displayed local time: {fmt_local(result[0])}</p>
"""
except Exception as e:
return f"<h1>Database FAILED</h1><pre>{e}</pre>"
@app.route("/clients/export.csv")
def export_clients_csv():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
id,
client_code,
company_name,
contact_name,
email,
phone,
status,
created_at,
updated_at
FROM clients
ORDER BY id ASC
""")
rows = cursor.fetchall()
conn.close()
output = StringIO()
writer = csv.writer(output)
writer.writerow([
"id",
"client_code",
"company_name",
"contact_name",
"email",
"phone",
"status",
"created_at",
"updated_at",
])
for r in rows:
writer.writerow([
r.get("id", ""),
r.get("client_code", ""),
r.get("company_name", ""),
r.get("contact_name", ""),
r.get("email", ""),
r.get("phone", ""),
r.get("status", ""),
r.get("created_at", ""),
r.get("updated_at", ""),
])
response = make_response(output.getvalue())
response.headers["Content-Type"] = "text/csv; charset=utf-8"
response.headers["Content-Disposition"] = "attachment; filename=clients.csv"
return response
@app.route("/clients")
def clients():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM clients ORDER BY id DESC")
clients = cursor.fetchall()
conn.close()
for client in clients:
client["credit_balance"] = get_client_credit_balance(client["id"])
return render_template("clients/list.html", clients=clients)
@app.route("/clients/new", methods=["GET", "POST"])
def new_client():
if request.method == "POST":
company_name = request.form["company_name"]
contact_name = request.form["contact_name"]
email = request.form["email"]
phone = request.form["phone"]
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT MAX(id) AS last_id FROM clients")
result = cursor.fetchone()
last_number = result["last_id"] if result["last_id"] else 0
client_code = generate_client_code(company_name, last_number)
insert_cursor = conn.cursor()
insert_cursor.execute(
"""
INSERT INTO clients
(client_code, company_name, contact_name, email, phone)
VALUES (%s, %s, %s, %s, %s)
""",
(client_code, company_name, contact_name, email, phone)
)
conn.commit()
conn.close()
return redirect("/clients")
return render_template("clients/new.html")
@app.route("/clients/edit/<int:client_id>", methods=["GET", "POST"])
def edit_client(client_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if request.method == "POST":
company_name = request.form.get("company_name", "").strip()
contact_name = request.form.get("contact_name", "").strip()
email = request.form.get("email", "").strip()
phone = request.form.get("phone", "").strip()
status = request.form.get("status", "").strip()
notes = request.form.get("notes", "").strip()
errors = []
if not company_name:
errors.append("Company name is required.")
if not status:
errors.append("Status is required.")
if errors:
cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,))
client = cursor.fetchone()
client["credit_balance"] = get_client_credit_balance(client_id)
conn.close()
return render_template("clients/edit.html", client=client, errors=errors)
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE clients
SET company_name = %s,
contact_name = %s,
email = %s,
phone = %s,
status = %s,
notes = %s
WHERE id = %s
""", (
company_name,
contact_name or None,
email or None,
phone or None,
status,
notes or None,
client_id
))
conn.commit()
conn.close()
return redirect("/clients")
cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,))
client = cursor.fetchone()
conn.close()
if not client:
return "Client not found", 404
client["credit_balance"] = get_client_credit_balance(client_id)
return render_template("clients/edit.html", client=client, errors=[])
@app.route("/credits/<int:client_id>")
def client_credits(client_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, client_code, company_name
FROM clients
WHERE id = %s
""", (client_id,))
client = cursor.fetchone()
if not client:
conn.close()
return "Client not found", 404
cursor.execute("""
SELECT *
FROM credit_ledger
WHERE client_id = %s
ORDER BY id DESC
""", (client_id,))
entries = cursor.fetchall()
conn.close()
balance = get_client_credit_balance(client_id)
return render_template(
"credits/list.html",
client=client,
entries=entries,
balance=balance,
)
@app.route("/credits/add/<int:client_id>", methods=["GET", "POST"])
def add_credit(client_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, client_code, company_name
FROM clients
WHERE id = %s
""", (client_id,))
client = cursor.fetchone()
if not client:
conn.close()
return "Client not found", 404
if request.method == "POST":
entry_type = request.form.get("entry_type", "").strip()
amount = request.form.get("amount", "").strip()
currency_code = request.form.get("currency_code", "").strip()
notes = request.form.get("notes", "").strip()
errors = []
if not entry_type:
errors.append("Entry type is required.")
if not amount:
errors.append("Amount is required.")
if not currency_code:
errors.append("Currency code is required.")
if not errors:
try:
amount_value = Decimal(str(amount))
if amount_value == 0:
errors.append("Amount cannot be zero.")
except Exception:
errors.append("Amount must be a valid number.")
if errors:
conn.close()
return render_template("credits/add.html", client=client, errors=errors)
insert_cursor = conn.cursor()
insert_cursor.execute("""
INSERT INTO credit_ledger
(
client_id,
entry_type,
amount,
currency_code,
notes
)
VALUES (%s, %s, %s, %s, %s)
""", (
client_id,
entry_type,
amount,
currency_code,
notes or None
))
conn.commit()
conn.close()
return redirect(f"/credits/{client_id}")
conn.close()
return render_template("credits/add.html", client=client, errors=[])
@app.route("/services")
def services():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT s.*, c.client_code, c.company_name
FROM services s
JOIN clients c ON s.client_id = c.id
ORDER BY s.id DESC
""")
services = cursor.fetchall()
conn.close()
return render_template("services/list.html", services=services)
@app.route("/services/new", methods=["GET", "POST"])
def new_service():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if request.method == "POST":
client_id = request.form["client_id"]
service_name = request.form["service_name"]
service_type = request.form["service_type"]
billing_cycle = request.form["billing_cycle"]
currency_code = request.form["currency_code"]
recurring_amount = request.form["recurring_amount"]
status = request.form["status"]
start_date = request.form["start_date"] or None
description = request.form["description"]
cursor.execute("SELECT MAX(id) AS last_id FROM services")
result = cursor.fetchone()
last_number = result["last_id"] if result["last_id"] else 0
service_code = generate_service_code(service_name, last_number)
insert_cursor = conn.cursor()
insert_cursor.execute(
"""
INSERT INTO services
(
client_id,
service_code,
service_name,
service_type,
billing_cycle,
status,
currency_code,
recurring_amount,
start_date,
description
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
client_id,
service_code,
service_name,
service_type,
billing_cycle,
status,
currency_code,
recurring_amount,
start_date,
description
)
)
conn.commit()
conn.close()
return redirect("/services")
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC")
clients = cursor.fetchall()
conn.close()
return render_template("services/new.html", clients=clients)
@app.route("/services/edit/<int:service_id>", methods=["GET", "POST"])
def edit_service(service_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if request.method == "POST":
client_id = request.form.get("client_id", "").strip()
service_name = request.form.get("service_name", "").strip()
service_type = request.form.get("service_type", "").strip()
billing_cycle = request.form.get("billing_cycle", "").strip()
currency_code = request.form.get("currency_code", "").strip()
recurring_amount = request.form.get("recurring_amount", "").strip()
status = request.form.get("status", "").strip()
start_date = request.form.get("start_date", "").strip()
description = request.form.get("description", "").strip()
errors = []
if not client_id:
errors.append("Client is required.")
if not service_name:
errors.append("Service name is required.")
if not service_type:
errors.append("Service type is required.")
if not billing_cycle:
errors.append("Billing cycle is required.")
if not currency_code:
errors.append("Currency code is required.")
if not recurring_amount:
errors.append("Recurring amount is required.")
if not status:
errors.append("Status is required.")
if not errors:
try:
recurring_amount_value = float(recurring_amount)
if recurring_amount_value < 0:
errors.append("Recurring amount cannot be negative.")
except ValueError:
errors.append("Recurring amount must be a valid number.")
if errors:
cursor.execute("""
SELECT s.*, c.company_name
FROM services s
LEFT JOIN clients c ON s.client_id = c.id
WHERE s.id = %s
""", (service_id,))
service = cursor.fetchone()
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC")
clients = cursor.fetchall()
conn.close()
return render_template("services/edit.html", service=service, clients=clients, errors=errors)
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE services
SET client_id = %s,
service_name = %s,
service_type = %s,
billing_cycle = %s,
status = %s,
currency_code = %s,
recurring_amount = %s,
start_date = %s,
description = %s
WHERE id = %s
""", (
client_id,
service_name,
service_type,
billing_cycle,
status,
currency_code,
recurring_amount,
start_date or None,
description or None,
service_id
))
conn.commit()
conn.close()
return redirect("/services")
cursor.execute("""
SELECT s.*, c.company_name
FROM services s
LEFT JOIN clients c ON s.client_id = c.id
WHERE s.id = %s
""", (service_id,))
service = cursor.fetchone()
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC")
clients = cursor.fetchall()
conn.close()
if not service:
return "Service not found", 404
return render_template("services/edit.html", service=service, clients=clients, errors=[])
@app.route("/invoices/export.csv")
def export_invoices_csv():
start_date = (request.args.get("start_date") or "").strip()
end_date = (request.args.get("end_date") or "").strip()
status = (request.args.get("status") or "").strip()
client_id = (request.args.get("client_id") or "").strip()
limit_count = (request.args.get("limit") or "").strip()
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT
i.id,
i.invoice_number,
i.client_id,
c.client_code,
c.company_name,
i.service_id,
i.currency_code,
i.subtotal_amount,
i.tax_amount,
i.total_amount,
i.amount_paid,
i.status,
i.issued_at,
i.due_at,
i.paid_at,
i.notes,
i.created_at,
i.updated_at
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE 1=1
"""
params = []
if start_date:
query += " AND DATE(i.issued_at) >= %s"
params.append(start_date)
if end_date:
query += " AND DATE(i.issued_at) <= %s"
params.append(end_date)
if status:
query += " AND i.status = %s"
params.append(status)
if client_id:
query += " AND i.client_id = %s"
params.append(client_id)
query += " ORDER BY i.id ASC"
if limit_count:
try:
limit_int = int(limit_count)
if limit_int > 0:
query += " LIMIT %s"
params.append(limit_int)
except ValueError:
pass
cursor.execute(query, tuple(params))
rows = cursor.fetchall()
conn.close()
output = StringIO()
writer = csv.writer(output)
writer.writerow([
"id",
"invoice_number",
"client_id",
"client_code",
"company_name",
"service_id",
"currency_code",
"subtotal_amount",
"tax_amount",
"total_amount",
"amount_paid",
"status",
"issued_at",
"due_at",
"paid_at",
"notes",
"created_at",
"updated_at",
])
for r in rows:
writer.writerow([
r.get("id", ""),
r.get("invoice_number", ""),
r.get("client_id", ""),
r.get("client_code", ""),
r.get("company_name", ""),
r.get("service_id", ""),
r.get("currency_code", ""),
r.get("subtotal_amount", ""),
r.get("tax_amount", ""),
r.get("total_amount", ""),
r.get("amount_paid", ""),
r.get("status", ""),
r.get("issued_at", ""),
r.get("due_at", ""),
r.get("paid_at", ""),
r.get("notes", ""),
r.get("created_at", ""),
r.get("updated_at", ""),
])
filename = "invoices"
if start_date or end_date or status or client_id or limit_count:
filename += "_filtered"
filename += ".csv"
response = make_response(output.getvalue())
response.headers["Content-Type"] = "text/csv; charset=utf-8"
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
return response
@app.route("/invoices/export-pdf.zip")
def export_invoices_pdf_zip():
start_date = (request.args.get("start_date") or "").strip()
end_date = (request.args.get("end_date") or "").strip()
status = (request.args.get("status") or "").strip()
client_id = (request.args.get("client_id") or "").strip()
limit_count = (request.args.get("limit") or "").strip()
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT
i.*,
c.client_code,
c.company_name,
c.contact_name,
c.email,
c.phone,
s.service_code,
s.service_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
LEFT JOIN services s ON i.service_id = s.id
WHERE 1=1
"""
params = []
if start_date:
query += " AND DATE(i.issued_at) >= %s"
params.append(start_date)
if end_date:
query += " AND DATE(i.issued_at) <= %s"
params.append(end_date)
if status:
query += " AND i.status = %s"
params.append(status)
if client_id:
query += " AND i.client_id = %s"
params.append(client_id)
query += " ORDER BY i.id ASC"
if limit_count:
try:
limit_int = int(limit_count)
if limit_int > 0:
query += " LIMIT %s"
params.append(limit_int)
except ValueError:
pass
cursor.execute(query, tuple(params))
invoices = cursor.fetchall()
conn.close()
settings = get_app_settings()
def build_invoice_pdf_bytes(invoice, settings):
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=letter)
width, height = letter
left = 50
right = 560
y = height - 50
def money(value, currency="CAD"):
return f"{to_decimal(value):.2f} {currency}"
pdf.setTitle(f"Invoice {invoice['invoice_number']}")
logo_url = (settings.get("business_logo_url") or "").strip()
if logo_url.startswith("/static/"):
local_logo_path = "/home/def/otb_billing" + logo_url
try:
pdf.drawImage(ImageReader(local_logo_path), left, y - 35, width=42, height=42, preserveAspectRatio=True, mask='auto')
except Exception:
pass
pdf.setFont("Helvetica-Bold", 22)
pdf.drawString(left + 60, y, f"Invoice {invoice['invoice_number']}")
pdf.setFont("Helvetica-Bold", 14)
pdf.drawRightString(right, y, settings.get("business_name") or "OTB Billing")
y -= 18
pdf.setFont("Helvetica", 12)
pdf.drawRightString(right, y, settings.get("business_tagline") or "")
y -= 15
right_lines = [
settings.get("business_address", ""),
settings.get("business_email", ""),
settings.get("business_phone", ""),
settings.get("business_website", ""),
]
for item in right_lines:
if item:
pdf.drawRightString(right, y, item[:80])
y -= 14
y -= 10
pdf.setFont("Helvetica-Bold", 12)
pdf.drawString(left, y, "Status:")
pdf.setFont("Helvetica", 12)
pdf.drawString(left + 45, y, str(invoice["status"]).upper())
y -= 28
pdf.setFont("Helvetica-Bold", 13)
pdf.drawString(left, y, "Bill To")
y -= 20
pdf.setFont("Helvetica-Bold", 12)
pdf.drawString(left, y, invoice["company_name"] or "")
y -= 16
pdf.setFont("Helvetica", 11)
if invoice.get("contact_name"):
pdf.drawString(left, y, str(invoice["contact_name"]))
y -= 15
if invoice.get("email"):
pdf.drawString(left, y, str(invoice["email"]))
y -= 15
if invoice.get("phone"):
pdf.drawString(left, y, str(invoice["phone"]))
y -= 15
pdf.drawString(left, y, f"Client Code: {invoice.get('client_code', '')}")
y -= 28
pdf.setFont("Helvetica-Bold", 13)
pdf.drawString(left, y, "Invoice Details")
y -= 20
pdf.setFont("Helvetica", 11)
pdf.drawString(left, y, f"Invoice #: {invoice['invoice_number']}")
y -= 15
pdf.drawString(left, y, f"Issued: {fmt_local(invoice.get('issued_at'))}")
y -= 15
pdf.drawString(left, y, f"Due: {fmt_local(invoice.get('due_at'))}")
y -= 15
if invoice.get("paid_at"):
pdf.drawString(left, y, f"Paid: {fmt_local(invoice.get('paid_at'))}")
y -= 15
pdf.drawString(left, y, f"Currency: {invoice.get('currency_code', 'CAD')}")
y -= 28
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(left, y, "Service Code")
pdf.drawString(180, y, "Service")
pdf.drawString(330, y, "Description")
pdf.drawRightString(right, y, "Total")
y -= 14
pdf.line(left, y, right, y)
y -= 18
pdf.setFont("Helvetica", 11)
pdf.drawString(left, y, str(invoice.get("service_code") or "-"))
pdf.drawString(180, y, str(invoice.get("service_name") or "-"))
pdf.drawString(330, y, str(invoice.get("notes") or "-")[:28])
pdf.drawRightString(right, y, money(invoice.get("total_amount"), invoice.get("currency_code", "CAD")))
y -= 28
totals_x_label = 360
totals_x_value = right
totals = [
("Subtotal", money(invoice.get("subtotal_amount"), invoice.get("currency_code", "CAD"))),
((settings.get("tax_label") or "Tax"), money(invoice.get("tax_amount"), invoice.get("currency_code", "CAD"))),
("Total", money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))),
("Paid", money(invoice.get("amount_paid"), invoice.get("currency_code", "CAD"))),
]
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid"))
for label, value in totals:
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(totals_x_label, y, label)
pdf.setFont("Helvetica", 11)
pdf.drawRightString(totals_x_value, y, value)
y -= 18
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(totals_x_label, y, "Remaining")
pdf.drawRightString(totals_x_value, y, f"{remaining:.2f} {invoice.get('currency_code', 'CAD')}")
y -= 25
if settings.get("tax_number"):
pdf.setFont("Helvetica", 10)
pdf.drawString(left, y, f"{settings.get('tax_label') or 'Tax'} Number: {settings.get('tax_number')}")
y -= 14
if settings.get("business_number"):
pdf.setFont("Helvetica", 10)
pdf.drawString(left, y, f"Business Number: {settings.get('business_number')}")
y -= 14
if settings.get("payment_terms"):
y -= 8
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(left, y, "Payment Terms")
y -= 15
pdf.setFont("Helvetica", 10)
terms = settings.get("payment_terms", "")
for chunk_start in range(0, len(terms), 90):
line_text = terms[chunk_start:chunk_start+90]
pdf.drawString(left, y, line_text)
y -= 13
if settings.get("invoice_footer"):
y -= 8
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(left, y, "Footer")
y -= 15
pdf.setFont("Helvetica", 10)
footer = settings.get("invoice_footer", "")
for chunk_start in range(0, len(footer), 90):
line_text = footer[chunk_start:chunk_start+90]
pdf.drawString(left, y, line_text)
y -= 13
pdf.showPage()
pdf.save()
buffer.seek(0)
return buffer.getvalue()
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
for invoice in invoices:
pdf_bytes = build_invoice_pdf_bytes(invoice, settings)
zipf.writestr(f"{invoice['invoice_number']}.pdf", pdf_bytes)
zip_buffer.seek(0)
filename = "invoices_export"
if start_date:
filename += f"_{start_date}"
if end_date:
filename += f"_to_{end_date}"
if status:
filename += f"_{status}"
if client_id:
filename += f"_client_{client_id}"
if limit_count:
filename += f"_limit_{limit_count}"
filename += ".zip"
return send_file(
zip_buffer,
mimetype="application/zip",
as_attachment=True,
download_name=filename
)
@app.route("/invoices/print")
def print_invoices():
refresh_overdue_invoices()
start_date = (request.args.get("start_date") or "").strip()
end_date = (request.args.get("end_date") or "").strip()
status = (request.args.get("status") or "").strip()
client_id = (request.args.get("client_id") or "").strip()
limit_count = (request.args.get("limit") or "").strip()
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT
i.*,
c.client_code,
c.company_name,
c.contact_name,
c.email,
c.phone,
s.service_code,
s.service_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
LEFT JOIN services s ON i.service_id = s.id
WHERE 1=1
"""
params = []
if start_date:
query += " AND DATE(i.issued_at) >= %s"
params.append(start_date)
if end_date:
query += " AND DATE(i.issued_at) <= %s"
params.append(end_date)
if status:
query += " AND i.status = %s"
params.append(status)
if client_id:
query += " AND i.client_id = %s"
params.append(client_id)
query += " ORDER BY i.id ASC"
if limit_count:
try:
limit_int = int(limit_count)
if limit_int > 0:
query += " LIMIT %s"
params.append(limit_int)
except ValueError:
pass
cursor.execute(query, tuple(params))
invoices = cursor.fetchall()
conn.close()
settings = get_app_settings()
filters = {
"start_date": start_date,
"end_date": end_date,
"status": status,
"client_id": client_id,
"limit": limit_count,
}
return render_template("invoices/print_batch.html", invoices=invoices, settings=settings, filters=filters)
@app.route("/invoices")
def invoices():
refresh_overdue_invoices()
start_date = (request.args.get("start_date") or "").strip()
end_date = (request.args.get("end_date") or "").strip()
status = (request.args.get("status") or "").strip()
client_id = (request.args.get("client_id") or "").strip()
limit_count = (request.args.get("limit") or "").strip()
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT
i.*,
c.client_code,
c.company_name,
COALESCE((SELECT COUNT(*) FROM payments p WHERE p.invoice_id = i.id AND p.payment_status = 'confirmed'), 0) AS payment_count
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE 1=1
"""
params = []
if start_date:
query += " AND DATE(i.issued_at) >= %s"
params.append(start_date)
if end_date:
query += " AND DATE(i.issued_at) <= %s"
params.append(end_date)
if status:
query += " AND i.status = %s"
params.append(status)
if client_id:
query += " AND i.client_id = %s"
params.append(client_id)
query += " ORDER BY i.id DESC"
if limit_count:
try:
limit_int = int(limit_count)
if limit_int > 0:
query += " LIMIT %s"
params.append(limit_int)
except ValueError:
pass
cursor.execute(query, tuple(params))
invoices = cursor.fetchall()
cursor.execute("""
SELECT id, client_code, company_name
FROM clients
ORDER BY company_name ASC
""")
clients = cursor.fetchall()
conn.close()
filters = {
"start_date": start_date,
"end_date": end_date,
"status": status,
"client_id": client_id,
"limit": limit_count,
}
return render_template("invoices/list.html", invoices=invoices, filters=filters, clients=clients)
@app.route("/invoices/new", methods=["GET", "POST"])
def new_invoice():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if request.method == "POST":
client_id = request.form.get("client_id", "").strip()
service_id = request.form.get("service_id", "").strip()
currency_code = request.form.get("currency_code", "").strip()
total_amount = request.form.get("total_amount", "").strip()
due_at = request.form.get("due_at", "").strip()
notes = request.form.get("notes", "").strip()
errors = []
if not client_id:
errors.append("Client is required.")
if not service_id:
errors.append("Service is required.")
if not currency_code:
errors.append("Currency is required.")
if not total_amount:
errors.append("Total amount is required.")
if not due_at:
errors.append("Due date is required.")
if not errors:
try:
amount_value = float(total_amount)
if amount_value <= 0:
errors.append("Total amount must be greater than zero.")
except ValueError:
errors.append("Total amount must be a valid number.")
if errors:
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name")
clients = cursor.fetchall()
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name")
services = cursor.fetchall()
conn.close()
form_data = {
"client_id": client_id,
"service_id": service_id,
"currency_code": currency_code,
"total_amount": total_amount,
"due_at": due_at,
"notes": notes,
}
return render_template(
"invoices/new.html",
clients=clients,
services=services,
errors=errors,
form_data=form_data,
)
invoice_number = generate_invoice_number()
insert_cursor = conn.cursor()
insert_cursor.execute("""
INSERT INTO invoices
(
client_id,
service_id,
invoice_number,
currency_code,
total_amount,
subtotal_amount,
issued_at,
due_at,
status,
notes
)
VALUES (%s, %s, %s, %s, %s, %s, UTC_TIMESTAMP(), %s, 'pending', %s)
""", (
client_id,
service_id,
invoice_number,
currency_code,
total_amount,
total_amount,
due_at,
notes
))
conn.commit()
conn.close()
return redirect("/invoices")
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name")
clients = cursor.fetchall()
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name")
services = cursor.fetchall()
conn.close()
return render_template(
"invoices/new.html",
clients=clients,
services=services,
errors=[],
form_data={},
)
@app.route("/invoices/pdf/<int:invoice_id>")
def invoice_pdf(invoice_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
i.*,
c.client_code,
c.company_name,
c.contact_name,
c.email,
c.phone,
s.service_code,
s.service_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
LEFT JOIN services s ON i.service_id = s.id
WHERE i.id = %s
""", (invoice_id,))
invoice = cursor.fetchone()
if not invoice:
conn.close()
return "Invoice not found", 404
conn.close()
settings = get_app_settings()
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=letter)
width, height = letter
left = 50
right = 560
y = height - 50
def draw_line(txt, x=left, font="Helvetica", size=11):
nonlocal y
pdf.setFont(font, size)
pdf.drawString(x, y, str(txt) if txt is not None else "")
y -= 16
def money(value, currency="CAD"):
return f"{to_decimal(value):.2f} {currency}"
pdf.setTitle(f"Invoice {invoice['invoice_number']}")
logo_url = (settings.get("business_logo_url") or "").strip()
if logo_url.startswith("/static/"):
local_logo_path = "/home/def/otb_billing" + logo_url
try:
pdf.drawImage(ImageReader(local_logo_path), left, y - 35, width=42, height=42, preserveAspectRatio=True, mask='auto')
except Exception:
pass
pdf.setFont("Helvetica-Bold", 22)
pdf.drawString(left + 60, y, f"Invoice {invoice['invoice_number']}")
pdf.setFont("Helvetica-Bold", 14)
pdf.drawRightString(right, y, settings.get("business_name") or "OTB Billing")
y -= 18
pdf.setFont("Helvetica", 12)
pdf.drawRightString(right, y, settings.get("business_tagline") or "")
y -= 15
right_lines = [
settings.get("business_address", ""),
settings.get("business_email", ""),
settings.get("business_phone", ""),
settings.get("business_website", ""),
]
for item in right_lines:
if item:
pdf.drawRightString(right, y, item[:80])
y -= 14
y -= 10
pdf.setFont("Helvetica-Bold", 12)
pdf.drawString(left, y, "Status:")
pdf.setFont("Helvetica", 12)
pdf.drawString(left + 45, y, str(invoice["status"]).upper())
y -= 28
pdf.setFont("Helvetica-Bold", 13)
pdf.drawString(left, y, "Bill To")
y -= 20
pdf.setFont("Helvetica-Bold", 12)
pdf.drawString(left, y, invoice["company_name"] or "")
y -= 16
pdf.setFont("Helvetica", 11)
if invoice.get("contact_name"):
pdf.drawString(left, y, str(invoice["contact_name"]))
y -= 15
if invoice.get("email"):
pdf.drawString(left, y, str(invoice["email"]))
y -= 15
if invoice.get("phone"):
pdf.drawString(left, y, str(invoice["phone"]))
y -= 15
pdf.drawString(left, y, f"Client Code: {invoice.get('client_code', '')}")
y -= 28
pdf.setFont("Helvetica-Bold", 13)
pdf.drawString(left, y, "Invoice Details")
y -= 20
pdf.setFont("Helvetica", 11)
pdf.drawString(left, y, f"Invoice #: {invoice['invoice_number']}")
y -= 15
pdf.drawString(left, y, f"Issued: {fmt_local(invoice.get('issued_at'))}")
y -= 15
pdf.drawString(left, y, f"Due: {fmt_local(invoice.get('due_at'))}")
y -= 15
if invoice.get("paid_at"):
pdf.drawString(left, y, f"Paid: {fmt_local(invoice.get('paid_at'))}")
y -= 15
pdf.drawString(left, y, f"Currency: {invoice.get('currency_code', 'CAD')}")
y -= 28
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(left, y, "Service Code")
pdf.drawString(180, y, "Service")
pdf.drawString(330, y, "Description")
pdf.drawRightString(right, y, "Total")
y -= 14
pdf.line(left, y, right, y)
y -= 18
pdf.setFont("Helvetica", 11)
pdf.drawString(left, y, str(invoice.get("service_code") or "-"))
pdf.drawString(180, y, str(invoice.get("service_name") or "-"))
pdf.drawString(330, y, str(invoice.get("notes") or "-")[:28])
pdf.drawRightString(right, y, money(invoice.get("total_amount"), invoice.get("currency_code", "CAD")))
y -= 28
totals_x_label = 360
totals_x_value = right
totals = [
("Subtotal", money(invoice.get("subtotal_amount"), invoice.get("currency_code", "CAD"))),
((settings.get("tax_label") or "Tax"), money(invoice.get("tax_amount"), invoice.get("currency_code", "CAD"))),
("Total", money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))),
("Paid", money(invoice.get("amount_paid"), invoice.get("currency_code", "CAD"))),
]
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid"))
for label, value in totals:
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(totals_x_label, y, label)
pdf.setFont("Helvetica", 11)
pdf.drawRightString(totals_x_value, y, value)
y -= 18
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(totals_x_label, y, "Remaining")
pdf.drawRightString(totals_x_value, y, f"{remaining:.2f} {invoice.get('currency_code', 'CAD')}")
y -= 25
if settings.get("tax_number"):
pdf.setFont("Helvetica", 10)
pdf.drawString(left, y, f"{settings.get('tax_label') or 'Tax'} Number: {settings.get('tax_number')}")
y -= 14
if settings.get("business_number"):
pdf.setFont("Helvetica", 10)
pdf.drawString(left, y, f"Business Number: {settings.get('business_number')}")
y -= 14
if settings.get("payment_terms"):
y -= 8
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(left, y, "Payment Terms")
y -= 15
pdf.setFont("Helvetica", 10)
for chunk_start in range(0, len(settings.get("payment_terms", "")), 90):
line_text = settings.get("payment_terms", "")[chunk_start:chunk_start+90]
pdf.drawString(left, y, line_text)
y -= 13
if settings.get("invoice_footer"):
y -= 8
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(left, y, "Footer")
y -= 15
pdf.setFont("Helvetica", 10)
for chunk_start in range(0, len(settings.get("invoice_footer", "")), 90):
line_text = settings.get("invoice_footer", "")[chunk_start:chunk_start+90]
pdf.drawString(left, y, line_text)
y -= 13
pdf.showPage()
pdf.save()
buffer.seek(0)
return send_file(
buffer,
mimetype="application/pdf",
as_attachment=True,
download_name=f"{invoice['invoice_number']}.pdf"
)
@app.route("/invoices/view/<int:invoice_id>")
def view_invoice(invoice_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
i.*,
c.client_code,
c.company_name,
c.contact_name,
c.email,
c.phone,
s.service_code,
s.service_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
LEFT JOIN services s ON i.service_id = s.id
WHERE i.id = %s
""", (invoice_id,))
invoice = cursor.fetchone()
if not invoice:
conn.close()
return "Invoice not found", 404
conn.close()
settings = get_app_settings()
return render_template("invoices/view.html", invoice=invoice, settings=settings)
@app.route("/invoices/edit/<int:invoice_id>", methods=["GET", "POST"])
def edit_invoice(invoice_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT i.*,
COALESCE((SELECT COUNT(*) FROM payments p WHERE p.invoice_id = i.id), 0) AS payment_count
FROM invoices i
WHERE i.id = %s
""", (invoice_id,))
invoice = cursor.fetchone()
if not invoice:
conn.close()
return "Invoice not found", 404
locked = invoice["payment_count"] > 0 or float(invoice["amount_paid"]) > 0
if request.method == "POST":
due_at = request.form.get("due_at", "").strip()
notes = request.form.get("notes", "").strip()
if locked:
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE invoices
SET due_at = %s,
notes = %s
WHERE id = %s
""", (
due_at or None,
notes or None,
invoice_id
))
conn.commit()
conn.close()
return redirect("/invoices")
client_id = request.form.get("client_id", "").strip()
service_id = request.form.get("service_id", "").strip()
currency_code = request.form.get("currency_code", "").strip()
total_amount = request.form.get("total_amount", "").strip()
status = request.form.get("status", "").strip()
errors = []
if not client_id:
errors.append("Client is required.")
if not service_id:
errors.append("Service is required.")
if not currency_code:
errors.append("Currency is required.")
if not total_amount:
errors.append("Total amount is required.")
if not due_at:
errors.append("Due date is required.")
if not status:
errors.append("Status is required.")
manual_statuses = {"draft", "pending", "cancelled"}
if status and status not in manual_statuses:
errors.append("Manual invoice status must be draft, pending, or cancelled.")
if not errors:
try:
amount_value = float(total_amount)
if amount_value < 0:
errors.append("Total amount cannot be negative.")
except ValueError:
errors.append("Total amount must be a valid number.")
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name")
clients = cursor.fetchall()
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name")
services = cursor.fetchall()
if errors:
invoice["client_id"] = int(client_id) if client_id else invoice["client_id"]
invoice["service_id"] = int(service_id) if service_id else invoice["service_id"]
invoice["currency_code"] = currency_code or invoice["currency_code"]
invoice["total_amount"] = total_amount or invoice["total_amount"]
invoice["due_at"] = due_at or invoice["due_at"]
invoice["status"] = status or invoice["status"]
invoice["notes"] = notes
conn.close()
return render_template("invoices/edit.html", invoice=invoice, clients=clients, services=services, errors=errors, locked=locked)
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE invoices
SET client_id = %s,
service_id = %s,
currency_code = %s,
total_amount = %s,
subtotal_amount = %s,
due_at = %s,
status = %s,
notes = %s
WHERE id = %s
""", (
client_id,
service_id,
currency_code,
total_amount,
total_amount,
due_at,
status,
notes or None,
invoice_id
))
conn.commit()
conn.close()
return redirect("/invoices")
clients = []
services = []
if not locked:
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name")
clients = cursor.fetchall()
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name")
services = cursor.fetchall()
conn.close()
return render_template("invoices/edit.html", invoice=invoice, clients=clients, services=services, errors=[], locked=locked)
@app.route("/payments/export.csv")
def export_payments_csv():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
p.id,
p.invoice_id,
i.invoice_number,
p.client_id,
c.client_code,
c.company_name,
p.payment_method,
p.payment_currency,
p.payment_amount,
p.cad_value_at_payment,
p.reference,
p.sender_name,
p.txid,
p.wallet_address,
p.payment_status,
p.received_at,
p.notes
FROM payments p
JOIN invoices i ON p.invoice_id = i.id
JOIN clients c ON p.client_id = c.id
ORDER BY p.id ASC
""")
rows = cursor.fetchall()
conn.close()
output = StringIO()
writer = csv.writer(output)
writer.writerow([
"id",
"invoice_id",
"invoice_number",
"client_id",
"client_code",
"company_name",
"payment_method",
"payment_currency",
"payment_amount",
"cad_value_at_payment",
"reference",
"sender_name",
"txid",
"wallet_address",
"payment_status",
"received_at",
"notes",
])
for r in rows:
writer.writerow([
r.get("id", ""),
r.get("invoice_id", ""),
r.get("invoice_number", ""),
r.get("client_id", ""),
r.get("client_code", ""),
r.get("company_name", ""),
r.get("payment_method", ""),
r.get("payment_currency", ""),
r.get("payment_amount", ""),
r.get("cad_value_at_payment", ""),
r.get("reference", ""),
r.get("sender_name", ""),
r.get("txid", ""),
r.get("wallet_address", ""),
r.get("payment_status", ""),
r.get("received_at", ""),
r.get("notes", ""),
])
response = make_response(output.getvalue())
response.headers["Content-Type"] = "text/csv; charset=utf-8"
response.headers["Content-Disposition"] = "attachment; filename=payments.csv"
return response
@app.route("/payments")
def payments():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
p.*,
i.invoice_number,
i.status AS invoice_status,
i.total_amount,
i.amount_paid,
i.currency_code AS invoice_currency_code,
c.client_code,
c.company_name
FROM payments p
JOIN invoices i ON p.invoice_id = i.id
JOIN clients c ON p.client_id = c.id
ORDER BY p.id DESC
""")
payments = cursor.fetchall()
conn.close()
return render_template("payments/list.html", payments=payments)
@app.route("/payments/new", methods=["GET", "POST"])
def new_payment():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
if request.method == "POST":
invoice_id = request.form.get("invoice_id", "").strip()
payment_method = request.form.get("payment_method", "").strip()
payment_currency = request.form.get("payment_currency", "").strip()
payment_amount = request.form.get("payment_amount", "").strip()
cad_value_at_payment = request.form.get("cad_value_at_payment", "").strip()
reference = request.form.get("reference", "").strip()
sender_name = request.form.get("sender_name", "").strip()
txid = request.form.get("txid", "").strip()
wallet_address = request.form.get("wallet_address", "").strip()
notes = request.form.get("notes", "").strip()
errors = []
if not invoice_id:
errors.append("Invoice is required.")
if not payment_method:
errors.append("Payment method is required.")
if not payment_currency:
errors.append("Payment currency is required.")
if not payment_amount:
errors.append("Payment amount is required.")
if not cad_value_at_payment:
errors.append("CAD value at payment is required.")
if not errors:
try:
payment_amount_value = Decimal(str(payment_amount))
if payment_amount_value <= Decimal("0"):
errors.append("Payment amount must be greater than zero.")
except Exception:
errors.append("Payment amount must be a valid number.")
if not errors:
try:
cad_value_value = Decimal(str(cad_value_at_payment))
if cad_value_value < Decimal("0"):
errors.append("CAD value at payment cannot be negative.")
except Exception:
errors.append("CAD value at payment must be a valid number.")
invoice_row = None
if not errors:
cursor.execute("""
SELECT
i.id,
i.client_id,
i.invoice_number,
i.currency_code,
i.total_amount,
i.amount_paid,
i.status,
c.client_code,
c.company_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE i.id = %s
""", (invoice_id,))
invoice_row = cursor.fetchone()
if not invoice_row:
errors.append("Selected invoice was not found.")
else:
allowed_statuses = {"pending", "partial", "overdue"}
if invoice_row["status"] not in allowed_statuses:
errors.append("Payments can only be recorded against pending, partial, or overdue invoices.")
else:
remaining_balance = to_decimal(invoice_row["total_amount"]) - to_decimal(invoice_row["amount_paid"])
entered_amount = to_decimal(payment_amount)
if remaining_balance <= Decimal("0"):
errors.append("This invoice has no remaining balance.")
elif entered_amount > remaining_balance:
errors.append(
f"Payment amount exceeds remaining balance. Remaining balance is {fmt_money(remaining_balance, invoice_row['currency_code'])} {invoice_row['currency_code']}."
)
if errors:
cursor.execute("""
SELECT
i.id,
i.invoice_number,
i.currency_code,
i.total_amount,
i.amount_paid,
i.status,
c.client_code,
c.company_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE i.status IN ('pending', 'partial', 'overdue')
AND (i.total_amount - i.amount_paid) > 0
ORDER BY i.id DESC
""")
invoices = cursor.fetchall()
conn.close()
form_data = {
"invoice_id": invoice_id,
"payment_method": payment_method,
"payment_currency": payment_currency,
"payment_amount": payment_amount,
"cad_value_at_payment": cad_value_at_payment,
"reference": reference,
"sender_name": sender_name,
"txid": txid,
"wallet_address": wallet_address,
"notes": notes,
}
return render_template(
"payments/new.html",
invoices=invoices,
errors=errors,
form_data=form_data,
)
client_id = invoice_row["client_id"]
insert_cursor = conn.cursor()
insert_cursor.execute("""
INSERT INTO payments
(
invoice_id,
client_id,
payment_method,
payment_currency,
payment_amount,
cad_value_at_payment,
reference,
sender_name,
txid,
wallet_address,
payment_status,
received_at,
notes
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'confirmed', UTC_TIMESTAMP(), %s)
""", (
invoice_id,
client_id,
payment_method,
payment_currency,
payment_amount,
cad_value_at_payment,
reference or None,
sender_name or None,
txid or None,
wallet_address or None,
notes or None
))
conn.commit()
conn.close()
recalc_invoice_totals(invoice_id)
return redirect("/payments")
cursor.execute("""
SELECT
i.id,
i.invoice_number,
i.currency_code,
i.total_amount,
i.amount_paid,
i.status,
c.client_code,
c.company_name
FROM invoices i
JOIN clients c ON i.client_id = c.id
WHERE i.status IN ('pending', 'partial', 'overdue')
AND (i.total_amount - i.amount_paid) > 0
ORDER BY i.id DESC
""")
invoices = cursor.fetchall()
conn.close()
return render_template(
"payments/new.html",
invoices=invoices,
errors=[],
form_data={},
)
@app.route("/payments/void/<int:payment_id>", methods=["POST"])
def void_payment(payment_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, invoice_id, payment_status
FROM payments
WHERE id = %s
""", (payment_id,))
payment = cursor.fetchone()
if not payment:
conn.close()
return "Payment not found", 404
if payment["payment_status"] != "confirmed":
conn.close()
return redirect("/payments")
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE payments
SET payment_status = 'reversed'
WHERE id = %s
""", (payment_id,))
conn.commit()
conn.close()
recalc_invoice_totals(payment["invoice_id"])
return redirect("/payments")
recalc_invoice_totals(payment["invoice_id"])
return redirect("/payments")
@app.route("/payments/edit/<int:payment_id>", methods=["GET", "POST"])
def edit_payment(payment_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
p.*,
i.invoice_number,
c.client_code,
c.company_name
FROM payments p
JOIN invoices i ON p.invoice_id = i.id
JOIN clients c ON p.client_id = c.id
WHERE p.id = %s
""", (payment_id,))
payment = cursor.fetchone()
if not payment:
conn.close()
return "Payment not found", 404
if request.method == "POST":
payment_method = request.form.get("payment_method", "").strip()
payment_currency = request.form.get("payment_currency", "").strip()
payment_amount = request.form.get("payment_amount", "").strip()
cad_value_at_payment = request.form.get("cad_value_at_payment", "").strip()
reference = request.form.get("reference", "").strip()
sender_name = request.form.get("sender_name", "").strip()
txid = request.form.get("txid", "").strip()
wallet_address = request.form.get("wallet_address", "").strip()
notes = request.form.get("notes", "").strip()
errors = []
if not payment_method:
errors.append("Payment method is required.")
if not payment_currency:
errors.append("Payment currency is required.")
if not payment_amount:
errors.append("Payment amount is required.")
if not cad_value_at_payment:
errors.append("CAD value at payment is required.")
if not errors:
try:
amount_value = float(payment_amount)
if amount_value <= 0:
errors.append("Payment amount must be greater than zero.")
except ValueError:
errors.append("Payment amount must be a valid number.")
try:
cad_value = float(cad_value_at_payment)
if cad_value < 0:
errors.append("CAD value at payment cannot be negative.")
except ValueError:
errors.append("CAD value at payment must be a valid number.")
if errors:
payment["payment_method"] = payment_method or payment["payment_method"]
payment["payment_currency"] = payment_currency or payment["payment_currency"]
payment["payment_amount"] = payment_amount or payment["payment_amount"]
payment["cad_value_at_payment"] = cad_value_at_payment or payment["cad_value_at_payment"]
payment["reference"] = reference
payment["sender_name"] = sender_name
payment["txid"] = txid
payment["wallet_address"] = wallet_address
payment["notes"] = notes
conn.close()
return render_template("payments/edit.html", payment=payment, errors=errors)
update_cursor = conn.cursor()
update_cursor.execute("""
UPDATE payments
SET payment_method = %s,
payment_currency = %s,
payment_amount = %s,
cad_value_at_payment = %s,
reference = %s,
sender_name = %s,
txid = %s,
wallet_address = %s,
notes = %s
WHERE id = %s
""", (
payment_method,
payment_currency,
payment_amount,
cad_value_at_payment,
reference or None,
sender_name or None,
txid or None,
wallet_address or None,
notes or None,
payment_id
))
conn.commit()
invoice_id = payment["invoice_id"]
conn.close()
recalc_invoice_totals(invoice_id)
return redirect("/payments")
conn.close()
return render_template("payments/edit.html", payment=payment, errors=[])
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5050, debug=True)