You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2974 lines
91 KiB
2974 lines
91 KiB
from flask import Flask, render_template, request, redirect, send_file, make_response, jsonify |
|
from db import get_db_connection |
|
from utils import generate_client_code, generate_service_code |
|
from datetime import datetime, timezone, date, timedelta |
|
from zoneinfo import ZoneInfo |
|
from decimal import Decimal, InvalidOperation |
|
from pathlib import Path |
|
from email.message import EmailMessage |
|
from dateutil.relativedelta import relativedelta |
|
|
|
from io import BytesIO, StringIO |
|
import csv |
|
import zipfile |
|
import smtplib |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.pdfgen import canvas |
|
from reportlab.lib.utils import ImageReader |
|
|
|
app = Flask( |
|
__name__, |
|
template_folder="../templates", |
|
static_folder="../static", |
|
) |
|
|
|
LOCAL_TZ = ZoneInfo("America/Toronto") |
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent |
|
|
|
|
|
def load_version(): |
|
try: |
|
with open(BASE_DIR / "VERSION", "r") as f: |
|
return f.read().strip() |
|
except Exception: |
|
return "unknown" |
|
|
|
APP_VERSION = load_version() |
|
|
|
@app.context_processor |
|
def inject_version(): |
|
return {"app_version": APP_VERSION} |
|
|
|
@app.context_processor |
|
def inject_app_settings(): |
|
return {"app_settings": get_app_settings()} |
|
|
|
def fmt_local(dt_value): |
|
if not dt_value: |
|
return "" |
|
if isinstance(dt_value, str): |
|
try: |
|
dt_value = datetime.fromisoformat(dt_value) |
|
except ValueError: |
|
return str(dt_value) |
|
if dt_value.tzinfo is None: |
|
dt_value = dt_value.replace(tzinfo=timezone.utc) |
|
return dt_value.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %I:%M:%S %p") |
|
|
|
def to_decimal(value): |
|
if value is None or value == "": |
|
return Decimal("0") |
|
try: |
|
return Decimal(str(value)) |
|
except (InvalidOperation, ValueError): |
|
return Decimal("0") |
|
|
|
def fmt_money(value, currency_code="CAD"): |
|
amount = to_decimal(value) |
|
if currency_code == "CAD": |
|
return f"{amount:.2f}" |
|
return f"{amount:.8f}" |
|
|
|
def refresh_overdue_invoices(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
UPDATE invoices |
|
SET status = 'overdue' |
|
WHERE due_at IS NOT NULL |
|
AND due_at < UTC_TIMESTAMP() |
|
AND status IN ('pending', 'partial') |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
def recalc_invoice_totals(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, total_amount, due_at, status |
|
FROM invoices |
|
WHERE id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(payment_amount), 0) AS total_paid |
|
FROM payments |
|
WHERE invoice_id = %s |
|
AND payment_status = 'confirmed' |
|
""", (invoice_id,)) |
|
row = cursor.fetchone() |
|
|
|
total_paid = to_decimal(row["total_paid"]) |
|
total_amount = to_decimal(invoice["total_amount"]) |
|
|
|
if invoice["status"] == "cancelled": |
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE invoices |
|
SET amount_paid = %s, |
|
paid_at = NULL |
|
WHERE id = %s |
|
""", ( |
|
str(total_paid), |
|
invoice_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return |
|
|
|
if total_paid >= total_amount and total_amount > 0: |
|
new_status = "paid" |
|
paid_at_value = "UTC_TIMESTAMP()" |
|
elif total_paid > 0: |
|
new_status = "partial" |
|
paid_at_value = "NULL" |
|
else: |
|
if invoice["due_at"] and invoice["due_at"] < datetime.utcnow(): |
|
new_status = "overdue" |
|
else: |
|
new_status = "pending" |
|
paid_at_value = "NULL" |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(f""" |
|
UPDATE invoices |
|
SET amount_paid = %s, |
|
status = %s, |
|
paid_at = {paid_at_value} |
|
WHERE id = %s |
|
""", ( |
|
str(total_paid), |
|
new_status, |
|
invoice_id |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
def get_client_credit_balance(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(amount), 0) AS balance |
|
FROM credit_ledger |
|
WHERE client_id = %s |
|
""", (client_id,)) |
|
row = cursor.fetchone() |
|
conn.close() |
|
return to_decimal(row["balance"]) |
|
|
|
|
|
def generate_invoice_number(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT invoice_number |
|
FROM invoices |
|
WHERE invoice_number IS NOT NULL |
|
AND invoice_number LIKE 'INV-%' |
|
ORDER BY id DESC |
|
LIMIT 1 |
|
""") |
|
row = cursor.fetchone() |
|
conn.close() |
|
|
|
if not row or not row.get("invoice_number"): |
|
return "INV-0001" |
|
|
|
invoice_number = str(row["invoice_number"]).strip() |
|
|
|
try: |
|
number = int(invoice_number.split("-")[1]) |
|
except (IndexError, ValueError): |
|
return "INV-0001" |
|
|
|
return f"INV-{number + 1:04d}" |
|
|
|
|
|
def ensure_subscriptions_table(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS subscriptions ( |
|
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, |
|
client_id INT UNSIGNED NOT NULL, |
|
service_id INT UNSIGNED NULL, |
|
subscription_name VARCHAR(255) NOT NULL, |
|
billing_interval ENUM('monthly','quarterly','yearly') NOT NULL DEFAULT 'monthly', |
|
price DECIMAL(18,8) NOT NULL DEFAULT 0.00000000, |
|
currency_code VARCHAR(16) NOT NULL DEFAULT 'CAD', |
|
start_date DATE NOT NULL, |
|
next_invoice_date DATE NOT NULL, |
|
status ENUM('active','paused','cancelled') NOT NULL DEFAULT 'active', |
|
notes TEXT NULL, |
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, |
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, |
|
KEY idx_subscriptions_client_id (client_id), |
|
KEY idx_subscriptions_service_id (service_id), |
|
KEY idx_subscriptions_status (status), |
|
KEY idx_subscriptions_next_invoice_date (next_invoice_date) |
|
) |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
|
|
def get_next_subscription_date(current_date, billing_interval): |
|
if isinstance(current_date, str): |
|
current_date = datetime.strptime(current_date, "%Y-%m-%d").date() |
|
|
|
if billing_interval == "yearly": |
|
return current_date + relativedelta(years=1) |
|
if billing_interval == "quarterly": |
|
return current_date + relativedelta(months=3) |
|
return current_date + relativedelta(months=1) |
|
|
|
|
|
def generate_due_subscription_invoices(run_date=None): |
|
ensure_subscriptions_table() |
|
|
|
today = run_date or date.today() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
s.*, |
|
c.client_code, |
|
c.company_name, |
|
srv.service_code, |
|
srv.service_name |
|
FROM subscriptions s |
|
JOIN clients c ON s.client_id = c.id |
|
LEFT JOIN services srv ON s.service_id = srv.id |
|
WHERE s.status = 'active' |
|
AND s.next_invoice_date <= %s |
|
ORDER BY s.next_invoice_date ASC, s.id ASC |
|
""", (today,)) |
|
due_subscriptions = cursor.fetchall() |
|
|
|
created_count = 0 |
|
created_invoice_numbers = [] |
|
|
|
for sub in due_subscriptions: |
|
invoice_number = generate_invoice_number() |
|
due_dt = datetime.combine(today + timedelta(days=14), datetime.min.time()) |
|
|
|
note_parts = [f"Recurring subscription: {sub['subscription_name']}"] |
|
if sub.get("service_code"): |
|
note_parts.append(f"Service: {sub['service_code']}") |
|
if sub.get("service_name"): |
|
note_parts.append(f"({sub['service_name']})") |
|
if sub.get("notes"): |
|
note_parts.append(f"Notes: {sub['notes']}") |
|
|
|
note_text = " ".join(note_parts) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO invoices |
|
( |
|
client_id, |
|
service_id, |
|
invoice_number, |
|
currency_code, |
|
total_amount, |
|
subtotal_amount, |
|
tax_amount, |
|
issued_at, |
|
due_at, |
|
status, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, 0, UTC_TIMESTAMP(), %s, 'pending', %s) |
|
""", ( |
|
sub["client_id"], |
|
sub["service_id"], |
|
invoice_number, |
|
sub["currency_code"], |
|
str(sub["price"]), |
|
str(sub["price"]), |
|
due_dt, |
|
note_text, |
|
)) |
|
|
|
next_date = get_next_subscription_date(sub["next_invoice_date"], sub["billing_interval"]) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE subscriptions |
|
SET next_invoice_date = %s |
|
WHERE id = %s |
|
""", (next_date, sub["id"])) |
|
|
|
created_count += 1 |
|
created_invoice_numbers.append(invoice_number) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
return { |
|
"created_count": created_count, |
|
"invoice_numbers": created_invoice_numbers, |
|
"run_date": str(today), |
|
} |
|
|
|
|
|
APP_SETTINGS_DEFAULTS = { |
|
"business_name": "OTB Billing", |
|
"business_tagline": "By a contractor, for contractors", |
|
"business_logo_url": "", |
|
"business_email": "", |
|
"business_phone": "", |
|
"business_address": "", |
|
"business_website": "", |
|
"tax_label": "HST", |
|
"tax_rate": "13.00", |
|
"tax_number": "", |
|
"business_number": "", |
|
"default_currency": "CAD", |
|
"report_frequency": "monthly", |
|
"invoice_footer": "", |
|
"payment_terms": "", |
|
"local_country": "Canada", |
|
"apply_local_tax_only": "1", |
|
"smtp_host": "", |
|
"smtp_port": "587", |
|
"smtp_user": "", |
|
"smtp_pass": "", |
|
"smtp_from_email": "", |
|
"smtp_from_name": "", |
|
"smtp_use_tls": "1", |
|
"smtp_use_ssl": "0", |
|
"report_delivery_email": "", |
|
} |
|
|
|
def ensure_app_settings_table(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS app_settings ( |
|
setting_key VARCHAR(100) NOT NULL PRIMARY KEY, |
|
setting_value TEXT NULL, |
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP |
|
) |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
def get_app_settings(): |
|
ensure_app_settings_table() |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT setting_key, setting_value |
|
FROM app_settings |
|
""") |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
settings = dict(APP_SETTINGS_DEFAULTS) |
|
for row in rows: |
|
settings[row["setting_key"]] = row["setting_value"] if row["setting_value"] is not None else "" |
|
|
|
return settings |
|
|
|
def save_app_settings(form_data): |
|
ensure_app_settings_table() |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
|
|
for key in APP_SETTINGS_DEFAULTS.keys(): |
|
if key in {"apply_local_tax_only", "smtp_use_tls", "smtp_use_ssl"}: |
|
value = "1" if form_data.get(key) else "0" |
|
else: |
|
value = (form_data.get(key) or "").strip() |
|
|
|
cursor.execute(""" |
|
INSERT INTO app_settings (setting_key, setting_value) |
|
VALUES (%s, %s) |
|
ON DUPLICATE KEY UPDATE setting_value = VALUES(setting_value) |
|
""", (key, value)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
|
|
@app.template_filter("localtime") |
|
def localtime_filter(value): |
|
return fmt_local(value) |
|
|
|
@app.template_filter("money") |
|
def money_filter(value, currency_code="CAD"): |
|
return fmt_money(value, currency_code) |
|
|
|
|
|
|
|
|
|
def get_report_period_bounds(frequency): |
|
now_local = datetime.now(LOCAL_TZ) |
|
|
|
if frequency == "yearly": |
|
start_local = now_local.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) |
|
label = f"{now_local.year}" |
|
elif frequency == "quarterly": |
|
quarter = ((now_local.month - 1) // 3) + 1 |
|
start_month = (quarter - 1) * 3 + 1 |
|
start_local = now_local.replace(month=start_month, day=1, hour=0, minute=0, second=0, microsecond=0) |
|
label = f"Q{quarter} {now_local.year}" |
|
else: |
|
start_local = now_local.replace(day=1, hour=0, minute=0, second=0, microsecond=0) |
|
label = now_local.strftime("%B %Y") |
|
|
|
start_utc = start_local.astimezone(timezone.utc).replace(tzinfo=None) |
|
end_utc = now_local.astimezone(timezone.utc).replace(tzinfo=None) |
|
|
|
return start_utc, end_utc, label |
|
|
|
|
|
|
|
def build_accounting_package_bytes(): |
|
import json |
|
import zipfile |
|
from io import BytesIO |
|
|
|
report = get_revenue_report_data() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.status, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.created_at, |
|
c.company_name, |
|
c.contact_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
ORDER BY i.created_at DESC |
|
""") |
|
invoices = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
payload = { |
|
"report": report, |
|
"invoices": invoices |
|
} |
|
|
|
json_bytes = json.dumps(payload, indent=2, default=str).encode() |
|
|
|
zip_buffer = BytesIO() |
|
|
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as z: |
|
z.writestr("revenue_report.json", json.dumps(report, indent=2)) |
|
z.writestr("invoices.json", json.dumps(invoices, indent=2, default=str)) |
|
|
|
zip_buffer.seek(0) |
|
|
|
filename = f"accounting_package_{report.get('period_label','report')}.zip" |
|
|
|
return zip_buffer.read(), filename |
|
|
|
|
|
|
|
def get_revenue_report_data(): |
|
settings = get_app_settings() |
|
frequency = (settings.get("report_frequency") or "monthly").strip().lower() |
|
if frequency not in {"monthly", "quarterly", "yearly"}: |
|
frequency = "monthly" |
|
|
|
start_utc, end_utc, label = get_report_period_bounds(frequency) |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS collected |
|
FROM payments |
|
WHERE payment_status = 'confirmed' |
|
AND received_at >= %s |
|
AND received_at <= %s |
|
""", (start_utc, end_utc)) |
|
collected_row = cursor.fetchone() |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS invoice_count, |
|
COALESCE(SUM(total_amount), 0) AS invoiced |
|
FROM invoices |
|
WHERE issued_at >= %s |
|
AND issued_at <= %s |
|
""", (start_utc, end_utc)) |
|
invoiced_row = cursor.fetchone() |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS overdue_count, |
|
COALESCE(SUM(total_amount - amount_paid), 0) AS overdue_balance |
|
FROM invoices |
|
WHERE status = 'overdue' |
|
""") |
|
overdue_row = cursor.fetchone() |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS outstanding_count, |
|
COALESCE(SUM(total_amount - amount_paid), 0) AS outstanding_balance |
|
FROM invoices |
|
WHERE status IN ('pending', 'partial', 'overdue') |
|
""") |
|
outstanding_row = cursor.fetchone() |
|
|
|
conn.close() |
|
|
|
return { |
|
"frequency": frequency, |
|
"period_label": label, |
|
"period_start": start_utc.isoformat(sep=" "), |
|
"period_end": end_utc.isoformat(sep=" "), |
|
"collected_cad": str(to_decimal(collected_row["collected"])), |
|
"invoice_count": int(invoiced_row["invoice_count"] or 0), |
|
"invoiced_total": str(to_decimal(invoiced_row["invoiced"])), |
|
"overdue_count": int(overdue_row["overdue_count"] or 0), |
|
"overdue_balance": str(to_decimal(overdue_row["overdue_balance"])), |
|
"outstanding_count": int(outstanding_row["outstanding_count"] or 0), |
|
"outstanding_balance": str(to_decimal(outstanding_row["outstanding_balance"])), |
|
} |
|
|
|
|
|
def ensure_email_log_table(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS email_log ( |
|
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, |
|
email_type VARCHAR(50) NOT NULL, |
|
invoice_id INT UNSIGNED NULL, |
|
recipient_email VARCHAR(255) NOT NULL, |
|
subject VARCHAR(255) NOT NULL, |
|
status VARCHAR(20) NOT NULL, |
|
error_message TEXT NULL, |
|
sent_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, |
|
KEY idx_email_log_invoice_id (invoice_id), |
|
KEY idx_email_log_type (email_type), |
|
KEY idx_email_log_sent_at (sent_at) |
|
) |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
|
|
def log_email_event(email_type, recipient_email, subject, status, invoice_id=None, error_message=None): |
|
ensure_email_log_table() |
|
conn = get_db_connection() |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
INSERT INTO email_log |
|
(email_type, invoice_id, recipient_email, subject, status, error_message) |
|
VALUES (%s, %s, %s, %s, %s, %s) |
|
""", ( |
|
email_type, |
|
invoice_id, |
|
recipient_email, |
|
subject, |
|
status, |
|
error_message |
|
)) |
|
conn.commit() |
|
conn.close() |
|
|
|
|
|
|
|
def send_configured_email(to_email, subject, body, attachments=None, email_type="system_email", invoice_id=None): |
|
settings = get_app_settings() |
|
|
|
smtp_host = (settings.get("smtp_host") or "").strip() |
|
smtp_port = int((settings.get("smtp_port") or "587").strip() or "587") |
|
smtp_user = (settings.get("smtp_user") or "").strip() |
|
smtp_pass = (settings.get("smtp_pass") or "").strip() |
|
from_email = (settings.get("smtp_from_email") or settings.get("business_email") or "").strip() |
|
from_name = (settings.get("smtp_from_name") or settings.get("business_name") or "").strip() |
|
use_tls = (settings.get("smtp_use_tls") or "0") == "1" |
|
use_ssl = (settings.get("smtp_use_ssl") or "0") == "1" |
|
|
|
if not smtp_host: |
|
raise ValueError("SMTP host is not configured.") |
|
if not from_email: |
|
raise ValueError("From email is not configured.") |
|
if not to_email: |
|
raise ValueError("Recipient email is missing.") |
|
|
|
msg = EmailMessage() |
|
msg["Subject"] = subject |
|
msg["From"] = f"{from_name} <{from_email}>" if from_name else from_email |
|
msg["To"] = to_email |
|
msg.set_content(body) |
|
|
|
for attachment in attachments or []: |
|
filename = attachment["filename"] |
|
mime_type = attachment["mime_type"] |
|
data = attachment["data"] |
|
maintype, subtype = mime_type.split("/", 1) |
|
msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename) |
|
|
|
try: |
|
if use_ssl: |
|
with smtplib.SMTP_SSL(smtp_host, smtp_port, timeout=30) as server: |
|
if smtp_user: |
|
server.login(smtp_user, smtp_pass) |
|
server.send_message(msg) |
|
else: |
|
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as server: |
|
server.ehlo() |
|
if use_tls: |
|
server.starttls() |
|
server.ehlo() |
|
if smtp_user: |
|
server.login(smtp_user, smtp_pass) |
|
server.send_message(msg) |
|
|
|
log_email_event(email_type, to_email, subject, "sent", invoice_id=invoice_id, error_message=None) |
|
except Exception as e: |
|
log_email_event(email_type, to_email, subject, "failed", invoice_id=invoice_id, error_message=str(e)) |
|
raise |
|
|
|
@app.route("/settings", methods=["GET", "POST"]) |
|
def settings(): |
|
ensure_app_settings_table() |
|
|
|
if request.method == "POST": |
|
save_app_settings(request.form) |
|
return redirect("/settings") |
|
|
|
settings = get_app_settings() |
|
return render_template("settings.html", settings=settings) |
|
|
|
|
|
|
|
|
|
@app.route("/reports/accounting-package.zip") |
|
def accounting_package_zip(): |
|
package_bytes, filename = build_accounting_package_bytes() |
|
return send_file( |
|
BytesIO(package_bytes), |
|
mimetype="application/zip", |
|
as_attachment=True, |
|
download_name=filename |
|
) |
|
|
|
@app.route("/reports/revenue") |
|
def revenue_report(): |
|
report = get_revenue_report_data() |
|
return render_template("reports/revenue.html", report=report) |
|
|
|
@app.route("/reports/revenue.json") |
|
def revenue_report_json(): |
|
report = get_revenue_report_data() |
|
return jsonify(report) |
|
|
|
@app.route("/reports/revenue/print") |
|
def revenue_report_print(): |
|
report = get_revenue_report_data() |
|
return render_template("reports/revenue_print.html", report=report) |
|
|
|
|
|
|
|
@app.route("/invoices/email/<int:invoice_id>", methods=["POST"]) |
|
def email_invoice(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
conn.close() |
|
|
|
if not invoice: |
|
return "Invoice not found", 404 |
|
|
|
recipient = (invoice.get("email") or "").strip() |
|
if not recipient: |
|
return "Client email is missing for this invoice.", 400 |
|
|
|
settings = get_app_settings() |
|
|
|
with app.test_client() as client: |
|
pdf_response = client.get(f"/invoices/pdf/{invoice_id}") |
|
if pdf_response.status_code != 200: |
|
return "Could not generate invoice PDF for email.", 500 |
|
|
|
pdf_bytes = pdf_response.data |
|
|
|
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid")) |
|
subject = f"Invoice {invoice['invoice_number']} from {settings.get('business_name') or 'OTB Billing'}" |
|
body = ( |
|
f"Hello {invoice.get('contact_name') or invoice.get('company_name') or ''},\n\n" |
|
f"Please find attached invoice {invoice['invoice_number']}.\n" |
|
f"Total: {to_decimal(invoice.get('total_amount')):.2f} {invoice.get('currency_code', 'CAD')}\n" |
|
f"Remaining: {remaining:.2f} {invoice.get('currency_code', 'CAD')}\n" |
|
f"Due: {fmt_local(invoice.get('due_at'))}\n\n" |
|
f"Thank you,\n" |
|
f"{settings.get('business_name') or 'OTB Billing'}" |
|
) |
|
|
|
try: |
|
send_configured_email( |
|
recipient, |
|
subject, |
|
body, |
|
email_type="invoice", |
|
invoice_id=invoice_id, |
|
attachments=[{ |
|
"filename": f"{invoice['invoice_number']}.pdf", |
|
"mime_type": "application/pdf", |
|
"data": pdf_bytes, |
|
}] |
|
) |
|
return redirect(f"/invoices/view/{invoice_id}?email_sent=1") |
|
except Exception: |
|
return redirect(f"/invoices/view/{invoice_id}?email_failed=1") |
|
|
|
|
|
@app.route("/reports/revenue/email", methods=["POST"]) |
|
def email_revenue_report_json(): |
|
settings = get_app_settings() |
|
recipient = (settings.get("report_delivery_email") or settings.get("business_email") or "").strip() |
|
if not recipient: |
|
return "Report delivery email is not configured.", 400 |
|
|
|
with app.test_client() as client: |
|
json_response = client.get("/reports/revenue.json") |
|
if json_response.status_code != 200: |
|
return "Could not generate revenue report JSON.", 500 |
|
|
|
report = get_revenue_report_data() |
|
subject = f"Revenue Report {report.get('period_label', '')} from {settings.get('business_name') or 'OTB Billing'}" |
|
body = ( |
|
f"Attached is the revenue report JSON for {report.get('period_label', '')}.\n\n" |
|
f"Frequency: {report.get('frequency', '')}\n" |
|
f"Collected CAD: {report.get('collected_cad', '')}\n" |
|
f"Invoices Issued: {report.get('invoice_count', '')}\n" |
|
) |
|
|
|
try: |
|
send_configured_email( |
|
recipient, |
|
subject, |
|
body, |
|
email_type="revenue_report", |
|
attachments=[{ |
|
"filename": "revenue_report.json", |
|
"mime_type": "application/json", |
|
"data": json_response.data, |
|
}] |
|
) |
|
return redirect("/reports/revenue?email_sent=1") |
|
except Exception: |
|
return redirect("/reports/revenue?email_failed=1") |
|
|
|
|
|
@app.route("/reports/accounting-package/email", methods=["POST"]) |
|
def email_accounting_package(): |
|
settings = get_app_settings() |
|
recipient = (settings.get("report_delivery_email") or settings.get("business_email") or "").strip() |
|
if not recipient: |
|
return "Report delivery email is not configured.", 400 |
|
|
|
with app.test_client() as client: |
|
zip_response = client.get("/reports/accounting-package.zip") |
|
if zip_response.status_code != 200: |
|
return "Could not generate accounting package ZIP.", 500 |
|
|
|
subject = f"Accounting Package from {settings.get('business_name') or 'OTB Billing'}" |
|
body = "Attached is the latest accounting package export." |
|
|
|
try: |
|
send_configured_email( |
|
recipient, |
|
subject, |
|
body, |
|
email_type="accounting_package", |
|
attachments=[{ |
|
"filename": "accounting_package.zip", |
|
"mime_type": "application/zip", |
|
"data": zip_response.data, |
|
}] |
|
) |
|
return redirect("/?pkg_email=1") |
|
except Exception: |
|
return redirect("/?pkg_email_failed=1") |
|
|
|
|
|
|
|
@app.route("/subscriptions") |
|
def subscriptions(): |
|
ensure_subscriptions_table() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT |
|
s.*, |
|
c.client_code, |
|
c.company_name, |
|
srv.service_code, |
|
srv.service_name |
|
FROM subscriptions s |
|
JOIN clients c ON s.client_id = c.id |
|
LEFT JOIN services srv ON s.service_id = srv.id |
|
ORDER BY s.id DESC |
|
""") |
|
subscriptions = cursor.fetchall() |
|
conn.close() |
|
|
|
return render_template("subscriptions/list.html", subscriptions=subscriptions) |
|
|
|
|
|
@app.route("/subscriptions/new", methods=["GET", "POST"]) |
|
def new_subscription(): |
|
ensure_subscriptions_table() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form.get("client_id", "").strip() |
|
service_id = request.form.get("service_id", "").strip() |
|
subscription_name = request.form.get("subscription_name", "").strip() |
|
billing_interval = request.form.get("billing_interval", "").strip() |
|
price = request.form.get("price", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
start_date_value = request.form.get("start_date", "").strip() |
|
next_invoice_date = request.form.get("next_invoice_date", "").strip() |
|
status = request.form.get("status", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not subscription_name: |
|
errors.append("Subscription name is required.") |
|
if billing_interval not in {"monthly", "quarterly", "yearly"}: |
|
errors.append("Billing interval is required.") |
|
if not price: |
|
errors.append("Price is required.") |
|
if not currency_code: |
|
errors.append("Currency is required.") |
|
if not start_date_value: |
|
errors.append("Start date is required.") |
|
if not next_invoice_date: |
|
errors.append("Next invoice date is required.") |
|
if status not in {"active", "paused", "cancelled"}: |
|
errors.append("Status is required.") |
|
|
|
if not errors: |
|
try: |
|
price_value = Decimal(str(price)) |
|
if price_value <= Decimal("0"): |
|
errors.append("Price must be greater than zero.") |
|
except Exception: |
|
errors.append("Price must be a valid number.") |
|
|
|
if errors: |
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
conn.close() |
|
|
|
return render_template( |
|
"subscriptions/new.html", |
|
clients=clients, |
|
services=services, |
|
errors=errors, |
|
form_data={ |
|
"client_id": client_id, |
|
"service_id": service_id, |
|
"subscription_name": subscription_name, |
|
"billing_interval": billing_interval, |
|
"price": price, |
|
"currency_code": currency_code, |
|
"start_date": start_date_value, |
|
"next_invoice_date": next_invoice_date, |
|
"status": status, |
|
"notes": notes, |
|
}, |
|
) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO subscriptions |
|
( |
|
client_id, |
|
service_id, |
|
subscription_name, |
|
billing_interval, |
|
price, |
|
currency_code, |
|
start_date, |
|
next_invoice_date, |
|
status, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
|
""", ( |
|
client_id, |
|
service_id or None, |
|
subscription_name, |
|
billing_interval, |
|
str(price_value), |
|
currency_code, |
|
start_date_value, |
|
next_invoice_date, |
|
status, |
|
notes or None, |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
return redirect("/subscriptions") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
conn.close() |
|
|
|
today_str = date.today().isoformat() |
|
|
|
return render_template( |
|
"subscriptions/new.html", |
|
clients=clients, |
|
services=services, |
|
errors=[], |
|
form_data={ |
|
"billing_interval": "monthly", |
|
"currency_code": "CAD", |
|
"start_date": today_str, |
|
"next_invoice_date": today_str, |
|
"status": "active", |
|
}, |
|
) |
|
|
|
|
|
@app.route("/subscriptions/run", methods=["POST"]) |
|
def run_subscriptions_now(): |
|
result = generate_due_subscription_invoices() |
|
return redirect(f"/subscriptions?run_count={result['created_count']}") |
|
|
|
|
|
|
|
@app.route("/reports/aging") |
|
def report_aging(): |
|
refresh_overdue_invoices() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
c.id AS client_id, |
|
c.client_code, |
|
c.company_name, |
|
i.invoice_number, |
|
i.due_at, |
|
i.total_amount, |
|
i.amount_paid, |
|
(i.total_amount - i.amount_paid) AS remaining |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.status IN ('pending', 'partial', 'overdue') |
|
AND (i.total_amount - i.amount_paid) > 0 |
|
ORDER BY c.company_name, i.due_at |
|
""") |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
today = datetime.utcnow().date() |
|
grouped = {} |
|
totals = { |
|
"current": Decimal("0"), |
|
"d30": Decimal("0"), |
|
"d60": Decimal("0"), |
|
"d90": Decimal("0"), |
|
"d90p": Decimal("0"), |
|
"total": Decimal("0"), |
|
} |
|
|
|
for row in rows: |
|
client_id = row["client_id"] |
|
client_label = f"{row['client_code']} - {row['company_name']}" |
|
|
|
if client_id not in grouped: |
|
grouped[client_id] = { |
|
"client": client_label, |
|
"current": Decimal("0"), |
|
"d30": Decimal("0"), |
|
"d60": Decimal("0"), |
|
"d90": Decimal("0"), |
|
"d90p": Decimal("0"), |
|
"total": Decimal("0"), |
|
} |
|
|
|
remaining = to_decimal(row["remaining"]) |
|
|
|
if row["due_at"]: |
|
due_date = row["due_at"].date() |
|
age_days = (today - due_date).days |
|
else: |
|
age_days = 0 |
|
|
|
if age_days <= 0: |
|
bucket = "current" |
|
elif age_days <= 30: |
|
bucket = "d30" |
|
elif age_days <= 60: |
|
bucket = "d60" |
|
elif age_days <= 90: |
|
bucket = "d90" |
|
else: |
|
bucket = "d90p" |
|
|
|
grouped[client_id][bucket] += remaining |
|
grouped[client_id]["total"] += remaining |
|
|
|
totals[bucket] += remaining |
|
totals["total"] += remaining |
|
|
|
aging_rows = list(grouped.values()) |
|
|
|
return render_template( |
|
"reports/aging.html", |
|
aging_rows=aging_rows, |
|
totals=totals |
|
) |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
refresh_overdue_invoices() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute("SELECT COUNT(*) AS total_clients FROM clients") |
|
total_clients = cursor.fetchone()["total_clients"] |
|
|
|
cursor.execute("SELECT COUNT(*) AS active_services FROM services WHERE status = 'active'") |
|
active_services = cursor.fetchone()["active_services"] |
|
|
|
cursor.execute(""" |
|
SELECT COUNT(*) AS outstanding_invoices |
|
FROM invoices |
|
WHERE status IN ('pending', 'partial', 'overdue') |
|
AND (total_amount - amount_paid) > 0 |
|
""") |
|
outstanding_invoices = cursor.fetchone()["outstanding_invoices"] |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(cad_value_at_payment), 0) AS revenue_received |
|
FROM payments |
|
WHERE payment_status = 'confirmed' |
|
""") |
|
revenue_received = to_decimal(cursor.fetchone()["revenue_received"]) |
|
|
|
cursor.execute(""" |
|
SELECT COALESCE(SUM(total_amount - amount_paid), 0) AS outstanding_balance |
|
FROM invoices |
|
WHERE status IN ('pending', 'partial', 'overdue') |
|
AND (total_amount - amount_paid) > 0 |
|
""") |
|
outstanding_balance = to_decimal(cursor.fetchone()["outstanding_balance"]) |
|
|
|
conn.close() |
|
|
|
app_settings = get_app_settings() |
|
|
|
return render_template( |
|
"dashboard.html", |
|
total_clients=total_clients, |
|
active_services=active_services, |
|
outstanding_invoices=outstanding_invoices, |
|
outstanding_balance=outstanding_balance, |
|
revenue_received=revenue_received, |
|
app_settings=app_settings, |
|
) |
|
|
|
@app.route("/clients") |
|
def clients(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
c.*, |
|
COALESCE(( |
|
SELECT SUM(i.total_amount - i.amount_paid) |
|
FROM invoices i |
|
WHERE i.client_id = c.id |
|
AND i.status IN ('pending', 'partial', 'overdue') |
|
AND (i.total_amount - i.amount_paid) > 0 |
|
), 0) AS outstanding_balance |
|
FROM clients c |
|
ORDER BY c.company_name |
|
""") |
|
clients = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("clients/list.html", clients=clients) |
|
|
|
@app.route("/clients/new", methods=["GET", "POST"]) |
|
def new_client(): |
|
if request.method == "POST": |
|
company_name = request.form["company_name"] |
|
contact_name = request.form["contact_name"] |
|
email = request.form["email"] |
|
phone = request.form["phone"] |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute("SELECT MAX(id) AS last_id FROM clients") |
|
result = cursor.fetchone() |
|
last_number = result["last_id"] if result["last_id"] else 0 |
|
|
|
client_code = generate_client_code(company_name, last_number) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute( |
|
""" |
|
INSERT INTO clients |
|
(client_code, company_name, contact_name, email, phone) |
|
VALUES (%s, %s, %s, %s, %s) |
|
""", |
|
(client_code, company_name, contact_name, email, phone) |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect("/clients") |
|
|
|
return render_template("clients/new.html") |
|
|
|
@app.route("/clients/edit/<int:client_id>", methods=["GET", "POST"]) |
|
def edit_client(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
company_name = request.form.get("company_name", "").strip() |
|
contact_name = request.form.get("contact_name", "").strip() |
|
email = request.form.get("email", "").strip() |
|
phone = request.form.get("phone", "").strip() |
|
status = request.form.get("status", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not company_name: |
|
errors.append("Company name is required.") |
|
if not status: |
|
errors.append("Status is required.") |
|
|
|
if errors: |
|
cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,)) |
|
client = cursor.fetchone() |
|
client["credit_balance"] = get_client_credit_balance(client_id) |
|
conn.close() |
|
return render_template("clients/edit.html", client=client, errors=errors) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE clients |
|
SET company_name = %s, |
|
contact_name = %s, |
|
email = %s, |
|
phone = %s, |
|
status = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
company_name, |
|
contact_name or None, |
|
email or None, |
|
phone or None, |
|
status, |
|
notes or None, |
|
client_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/clients") |
|
|
|
cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,)) |
|
client = cursor.fetchone() |
|
conn.close() |
|
|
|
if not client: |
|
return "Client not found", 404 |
|
|
|
client["credit_balance"] = get_client_credit_balance(client_id) |
|
|
|
return render_template("clients/edit.html", client=client, errors=[]) |
|
|
|
@app.route("/credits/<int:client_id>") |
|
def client_credits(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, client_code, company_name |
|
FROM clients |
|
WHERE id = %s |
|
""", (client_id,)) |
|
client = cursor.fetchone() |
|
|
|
if not client: |
|
conn.close() |
|
return "Client not found", 404 |
|
|
|
cursor.execute(""" |
|
SELECT * |
|
FROM credit_ledger |
|
WHERE client_id = %s |
|
ORDER BY id DESC |
|
""", (client_id,)) |
|
entries = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
balance = get_client_credit_balance(client_id) |
|
|
|
return render_template( |
|
"credits/list.html", |
|
client=client, |
|
entries=entries, |
|
balance=balance, |
|
) |
|
|
|
@app.route("/credits/add/<int:client_id>", methods=["GET", "POST"]) |
|
def add_credit(client_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, client_code, company_name |
|
FROM clients |
|
WHERE id = %s |
|
""", (client_id,)) |
|
client = cursor.fetchone() |
|
|
|
if not client: |
|
conn.close() |
|
return "Client not found", 404 |
|
|
|
if request.method == "POST": |
|
entry_type = request.form.get("entry_type", "").strip() |
|
amount = request.form.get("amount", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not entry_type: |
|
errors.append("Entry type is required.") |
|
if not amount: |
|
errors.append("Amount is required.") |
|
if not currency_code: |
|
errors.append("Currency code is required.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = Decimal(str(amount)) |
|
if amount_value == 0: |
|
errors.append("Amount cannot be zero.") |
|
except Exception: |
|
errors.append("Amount must be a valid number.") |
|
|
|
if errors: |
|
conn.close() |
|
return render_template("credits/add.html", client=client, errors=errors) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO credit_ledger |
|
( |
|
client_id, |
|
entry_type, |
|
amount, |
|
currency_code, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s) |
|
""", ( |
|
client_id, |
|
entry_type, |
|
amount, |
|
currency_code, |
|
notes or None |
|
)) |
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect(f"/credits/{client_id}") |
|
|
|
conn.close() |
|
return render_template("credits/add.html", client=client, errors=[]) |
|
|
|
@app.route("/services") |
|
def services(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT s.*, c.client_code, c.company_name |
|
FROM services s |
|
JOIN clients c ON s.client_id = c.id |
|
ORDER BY s.id DESC |
|
""") |
|
services = cursor.fetchall() |
|
conn.close() |
|
return render_template("services/list.html", services=services) |
|
|
|
@app.route("/services/new", methods=["GET", "POST"]) |
|
def new_service(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form["client_id"] |
|
service_name = request.form["service_name"] |
|
service_type = request.form["service_type"] |
|
billing_cycle = request.form["billing_cycle"] |
|
currency_code = request.form["currency_code"] |
|
recurring_amount = request.form["recurring_amount"] |
|
status = request.form["status"] |
|
start_date = request.form["start_date"] or None |
|
description = request.form["description"] |
|
|
|
cursor.execute("SELECT MAX(id) AS last_id FROM services") |
|
result = cursor.fetchone() |
|
last_number = result["last_id"] if result["last_id"] else 0 |
|
service_code = generate_service_code(service_name, last_number) |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute( |
|
""" |
|
INSERT INTO services |
|
( |
|
client_id, |
|
service_code, |
|
service_name, |
|
service_type, |
|
billing_cycle, |
|
status, |
|
currency_code, |
|
recurring_amount, |
|
start_date, |
|
description |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
|
""", |
|
( |
|
client_id, |
|
service_code, |
|
service_name, |
|
service_type, |
|
billing_cycle, |
|
status, |
|
currency_code, |
|
recurring_amount, |
|
start_date, |
|
description |
|
) |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect("/services") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC") |
|
clients = cursor.fetchall() |
|
conn.close() |
|
return render_template("services/new.html", clients=clients) |
|
|
|
@app.route("/services/edit/<int:service_id>", methods=["GET", "POST"]) |
|
def edit_service(service_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form.get("client_id", "").strip() |
|
service_name = request.form.get("service_name", "").strip() |
|
service_type = request.form.get("service_type", "").strip() |
|
billing_cycle = request.form.get("billing_cycle", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
recurring_amount = request.form.get("recurring_amount", "").strip() |
|
status = request.form.get("status", "").strip() |
|
start_date = request.form.get("start_date", "").strip() |
|
description = request.form.get("description", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not service_name: |
|
errors.append("Service name is required.") |
|
if not service_type: |
|
errors.append("Service type is required.") |
|
if not billing_cycle: |
|
errors.append("Billing cycle is required.") |
|
if not currency_code: |
|
errors.append("Currency code is required.") |
|
if not recurring_amount: |
|
errors.append("Recurring amount is required.") |
|
if not status: |
|
errors.append("Status is required.") |
|
|
|
if not errors: |
|
try: |
|
recurring_amount_value = float(recurring_amount) |
|
if recurring_amount_value < 0: |
|
errors.append("Recurring amount cannot be negative.") |
|
except ValueError: |
|
errors.append("Recurring amount must be a valid number.") |
|
|
|
if errors: |
|
cursor.execute(""" |
|
SELECT s.*, c.company_name |
|
FROM services s |
|
LEFT JOIN clients c ON s.client_id = c.id |
|
WHERE s.id = %s |
|
""", (service_id,)) |
|
service = cursor.fetchone() |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC") |
|
clients = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("services/edit.html", service=service, clients=clients, errors=errors) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE services |
|
SET client_id = %s, |
|
service_name = %s, |
|
service_type = %s, |
|
billing_cycle = %s, |
|
status = %s, |
|
currency_code = %s, |
|
recurring_amount = %s, |
|
start_date = %s, |
|
description = %s |
|
WHERE id = %s |
|
""", ( |
|
client_id, |
|
service_name, |
|
service_type, |
|
billing_cycle, |
|
status, |
|
currency_code, |
|
recurring_amount, |
|
start_date or None, |
|
description or None, |
|
service_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/services") |
|
|
|
cursor.execute(""" |
|
SELECT s.*, c.company_name |
|
FROM services s |
|
LEFT JOIN clients c ON s.client_id = c.id |
|
WHERE s.id = %s |
|
""", (service_id,)) |
|
service = cursor.fetchone() |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name ASC") |
|
clients = cursor.fetchall() |
|
conn.close() |
|
|
|
if not service: |
|
return "Service not found", 404 |
|
|
|
return render_template("services/edit.html", service=service, clients=clients, errors=[]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/invoices/export.csv") |
|
def export_invoices_csv(): |
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.client_id, |
|
c.client_code, |
|
c.company_name, |
|
i.service_id, |
|
i.currency_code, |
|
i.subtotal_amount, |
|
i.tax_amount, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
i.issued_at, |
|
i.due_at, |
|
i.paid_at, |
|
i.notes, |
|
i.created_at, |
|
i.updated_at |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id ASC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
output = StringIO() |
|
writer = csv.writer(output) |
|
writer.writerow([ |
|
"id", |
|
"invoice_number", |
|
"client_id", |
|
"client_code", |
|
"company_name", |
|
"service_id", |
|
"currency_code", |
|
"subtotal_amount", |
|
"tax_amount", |
|
"total_amount", |
|
"amount_paid", |
|
"status", |
|
"issued_at", |
|
"due_at", |
|
"paid_at", |
|
"notes", |
|
"created_at", |
|
"updated_at", |
|
]) |
|
|
|
for r in rows: |
|
writer.writerow([ |
|
r.get("id", ""), |
|
r.get("invoice_number", ""), |
|
r.get("client_id", ""), |
|
r.get("client_code", ""), |
|
r.get("company_name", ""), |
|
r.get("service_id", ""), |
|
r.get("currency_code", ""), |
|
r.get("subtotal_amount", ""), |
|
r.get("tax_amount", ""), |
|
r.get("total_amount", ""), |
|
r.get("amount_paid", ""), |
|
r.get("status", ""), |
|
r.get("issued_at", ""), |
|
r.get("due_at", ""), |
|
r.get("paid_at", ""), |
|
r.get("notes", ""), |
|
r.get("created_at", ""), |
|
r.get("updated_at", ""), |
|
]) |
|
|
|
filename = "invoices" |
|
if start_date or end_date or status or client_id or limit_count: |
|
filename += "_filtered" |
|
filename += ".csv" |
|
|
|
response = make_response(output.getvalue()) |
|
response.headers["Content-Type"] = "text/csv; charset=utf-8" |
|
response.headers["Content-Disposition"] = f"attachment; filename={filename}" |
|
return response |
|
|
|
|
|
@app.route("/invoices/export-pdf.zip") |
|
def export_invoices_pdf_zip(): |
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id ASC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
settings = get_app_settings() |
|
|
|
def build_invoice_pdf_bytes(invoice, settings): |
|
buffer = BytesIO() |
|
pdf = canvas.Canvas(buffer, pagesize=letter) |
|
width, height = letter |
|
|
|
left = 50 |
|
right = 560 |
|
y = height - 50 |
|
|
|
def money(value, currency="CAD"): |
|
return f"{to_decimal(value):.2f} {currency}" |
|
|
|
pdf.setTitle(f"Invoice {invoice['invoice_number']}") |
|
|
|
logo_url = (settings.get("business_logo_url") or "").strip() |
|
if logo_url.startswith("/static/"): |
|
local_logo_path = str(BASE_DIR) + logo_url |
|
try: |
|
pdf.drawImage(ImageReader(local_logo_path), left, y - 35, width=42, height=42, preserveAspectRatio=True, mask='auto') |
|
except Exception: |
|
pass |
|
|
|
pdf.setFont("Helvetica-Bold", 22) |
|
pdf.drawString(left + 60, y, f"Invoice {invoice['invoice_number']}") |
|
|
|
pdf.setFont("Helvetica-Bold", 14) |
|
pdf.drawRightString(right, y, settings.get("business_name") or "OTB Billing") |
|
y -= 18 |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawRightString(right, y, settings.get("business_tagline") or "") |
|
y -= 15 |
|
|
|
right_lines = [ |
|
settings.get("business_address", ""), |
|
settings.get("business_email", ""), |
|
settings.get("business_phone", ""), |
|
settings.get("business_website", ""), |
|
] |
|
for item in right_lines: |
|
if item: |
|
pdf.drawRightString(right, y, item[:80]) |
|
y -= 14 |
|
|
|
y -= 10 |
|
|
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, "Status:") |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawString(left + 45, y, str(invoice["status"]).upper()) |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Bill To") |
|
y -= 20 |
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, invoice["company_name"] or "") |
|
y -= 16 |
|
pdf.setFont("Helvetica", 11) |
|
if invoice.get("contact_name"): |
|
pdf.drawString(left, y, str(invoice["contact_name"])) |
|
y -= 15 |
|
if invoice.get("email"): |
|
pdf.drawString(left, y, str(invoice["email"])) |
|
y -= 15 |
|
if invoice.get("phone"): |
|
pdf.drawString(left, y, str(invoice["phone"])) |
|
y -= 15 |
|
pdf.drawString(left, y, f"Client Code: {invoice.get('client_code', '')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Invoice Details") |
|
y -= 20 |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, f"Invoice #: {invoice['invoice_number']}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Issued: {fmt_local(invoice.get('issued_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Due: {fmt_local(invoice.get('due_at'))}") |
|
y -= 15 |
|
if invoice.get("paid_at"): |
|
pdf.drawString(left, y, f"Paid: {fmt_local(invoice.get('paid_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Currency: {invoice.get('currency_code', 'CAD')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Service Code") |
|
pdf.drawString(180, y, "Service") |
|
pdf.drawString(330, y, "Description") |
|
pdf.drawRightString(right, y, "Total") |
|
y -= 14 |
|
pdf.line(left, y, right, y) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, str(invoice.get("service_code") or "-")) |
|
pdf.drawString(180, y, str(invoice.get("service_name") or "-")) |
|
pdf.drawString(330, y, str(invoice.get("notes") or "-")[:28]) |
|
pdf.drawRightString(right, y, money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))) |
|
y -= 28 |
|
|
|
totals_x_label = 360 |
|
totals_x_value = right |
|
|
|
totals = [ |
|
("Subtotal", money(invoice.get("subtotal_amount"), invoice.get("currency_code", "CAD"))), |
|
((settings.get("tax_label") or "Tax"), money(invoice.get("tax_amount"), invoice.get("currency_code", "CAD"))), |
|
("Total", money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))), |
|
("Paid", money(invoice.get("amount_paid"), invoice.get("currency_code", "CAD"))), |
|
] |
|
|
|
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid")) |
|
|
|
for label, value in totals: |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, label) |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawRightString(totals_x_value, y, value) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, "Remaining") |
|
pdf.drawRightString(totals_x_value, y, f"{remaining:.2f} {invoice.get('currency_code', 'CAD')}") |
|
y -= 25 |
|
|
|
if settings.get("tax_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"{settings.get('tax_label') or 'Tax'} Number: {settings.get('tax_number')}") |
|
y -= 14 |
|
|
|
if settings.get("business_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"Business Number: {settings.get('business_number')}") |
|
y -= 14 |
|
|
|
if settings.get("payment_terms"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Payment Terms") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
terms = settings.get("payment_terms", "") |
|
for chunk_start in range(0, len(terms), 90): |
|
line_text = terms[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
if settings.get("invoice_footer"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Footer") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
footer = settings.get("invoice_footer", "") |
|
for chunk_start in range(0, len(footer), 90): |
|
line_text = footer[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
pdf.showPage() |
|
pdf.save() |
|
buffer.seek(0) |
|
return buffer.getvalue() |
|
|
|
zip_buffer = BytesIO() |
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: |
|
for invoice in invoices: |
|
pdf_bytes = build_invoice_pdf_bytes(invoice, settings) |
|
zipf.writestr(f"{invoice['invoice_number']}.pdf", pdf_bytes) |
|
|
|
zip_buffer.seek(0) |
|
|
|
filename = "invoices_export" |
|
if start_date: |
|
filename += f"_{start_date}" |
|
if end_date: |
|
filename += f"_to_{end_date}" |
|
if status: |
|
filename += f"_{status}" |
|
if client_id: |
|
filename += f"_client_{client_id}" |
|
if limit_count: |
|
filename += f"_limit_{limit_count}" |
|
filename += ".zip" |
|
|
|
return send_file( |
|
zip_buffer, |
|
mimetype="application/zip", |
|
as_attachment=True, |
|
download_name=filename |
|
) |
|
|
|
|
|
@app.route("/invoices/print") |
|
def print_invoices(): |
|
refresh_overdue_invoices() |
|
|
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id ASC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
settings = get_app_settings() |
|
|
|
filters = { |
|
"start_date": start_date, |
|
"end_date": end_date, |
|
"status": status, |
|
"client_id": client_id, |
|
"limit": limit_count, |
|
} |
|
|
|
return render_template("invoices/print_batch.html", invoices=invoices, settings=settings, filters=filters) |
|
|
|
@app.route("/invoices") |
|
def invoices(): |
|
refresh_overdue_invoices() |
|
|
|
start_date = (request.args.get("start_date") or "").strip() |
|
end_date = (request.args.get("end_date") or "").strip() |
|
status = (request.args.get("status") or "").strip() |
|
client_id = (request.args.get("client_id") or "").strip() |
|
limit_count = (request.args.get("limit") or "").strip() |
|
|
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
query = """ |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
COALESCE((SELECT COUNT(*) FROM payments p WHERE p.invoice_id = i.id AND p.payment_status = 'confirmed'), 0) AS payment_count |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE 1=1 |
|
""" |
|
params = [] |
|
|
|
if start_date: |
|
query += " AND DATE(i.issued_at) >= %s" |
|
params.append(start_date) |
|
|
|
if end_date: |
|
query += " AND DATE(i.issued_at) <= %s" |
|
params.append(end_date) |
|
|
|
if status: |
|
query += " AND i.status = %s" |
|
params.append(status) |
|
|
|
if client_id: |
|
query += " AND i.client_id = %s" |
|
params.append(client_id) |
|
|
|
query += " ORDER BY i.id DESC" |
|
|
|
if limit_count: |
|
try: |
|
limit_int = int(limit_count) |
|
if limit_int > 0: |
|
query += " LIMIT %s" |
|
params.append(limit_int) |
|
except ValueError: |
|
pass |
|
|
|
cursor.execute(query, tuple(params)) |
|
invoices = cursor.fetchall() |
|
|
|
cursor.execute(""" |
|
SELECT id, client_code, company_name |
|
FROM clients |
|
ORDER BY company_name ASC |
|
""") |
|
clients = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
filters = { |
|
"start_date": start_date, |
|
"end_date": end_date, |
|
"status": status, |
|
"client_id": client_id, |
|
"limit": limit_count, |
|
} |
|
|
|
return render_template("invoices/list.html", invoices=invoices, filters=filters, clients=clients) |
|
|
|
@app.route("/invoices/new", methods=["GET", "POST"]) |
|
def new_invoice(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
client_id = request.form.get("client_id", "").strip() |
|
service_id = request.form.get("service_id", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
total_amount = request.form.get("total_amount", "").strip() |
|
due_at = request.form.get("due_at", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not service_id: |
|
errors.append("Service is required.") |
|
if not currency_code: |
|
errors.append("Currency is required.") |
|
if not total_amount: |
|
errors.append("Total amount is required.") |
|
if not due_at: |
|
errors.append("Due date is required.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = float(total_amount) |
|
if amount_value <= 0: |
|
errors.append("Total amount must be greater than zero.") |
|
except ValueError: |
|
errors.append("Total amount must be a valid number.") |
|
|
|
if errors: |
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
form_data = { |
|
"client_id": client_id, |
|
"service_id": service_id, |
|
"currency_code": currency_code, |
|
"total_amount": total_amount, |
|
"due_at": due_at, |
|
"notes": notes, |
|
} |
|
|
|
return render_template( |
|
"invoices/new.html", |
|
clients=clients, |
|
services=services, |
|
errors=errors, |
|
form_data=form_data, |
|
) |
|
|
|
invoice_number = generate_invoice_number() |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO invoices |
|
( |
|
client_id, |
|
service_id, |
|
invoice_number, |
|
currency_code, |
|
total_amount, |
|
subtotal_amount, |
|
issued_at, |
|
due_at, |
|
status, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, UTC_TIMESTAMP(), %s, 'pending', %s) |
|
""", ( |
|
client_id, |
|
service_id, |
|
invoice_number, |
|
currency_code, |
|
total_amount, |
|
total_amount, |
|
due_at, |
|
notes |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
return redirect("/invoices") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
conn.close() |
|
|
|
return render_template( |
|
"invoices/new.html", |
|
clients=clients, |
|
services=services, |
|
errors=[], |
|
form_data={}, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
@app.route("/invoices/pdf/<int:invoice_id>") |
|
def invoice_pdf(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return "Invoice not found", 404 |
|
|
|
conn.close() |
|
|
|
settings = get_app_settings() |
|
|
|
buffer = BytesIO() |
|
pdf = canvas.Canvas(buffer, pagesize=letter) |
|
width, height = letter |
|
|
|
left = 50 |
|
right = 560 |
|
y = height - 50 |
|
|
|
def draw_line(txt, x=left, font="Helvetica", size=11): |
|
nonlocal y |
|
pdf.setFont(font, size) |
|
pdf.drawString(x, y, str(txt) if txt is not None else "") |
|
y -= 16 |
|
|
|
def money(value, currency="CAD"): |
|
return f"{to_decimal(value):.2f} {currency}" |
|
|
|
pdf.setTitle(f"Invoice {invoice['invoice_number']}") |
|
|
|
logo_url = (settings.get("business_logo_url") or "").strip() |
|
if logo_url.startswith("/static/"): |
|
local_logo_path = str(BASE_DIR) + logo_url |
|
try: |
|
pdf.drawImage(ImageReader(local_logo_path), left, y - 35, width=42, height=42, preserveAspectRatio=True, mask='auto') |
|
except Exception: |
|
pass |
|
|
|
pdf.setFont("Helvetica-Bold", 22) |
|
pdf.drawString(left + 60, y, f"Invoice {invoice['invoice_number']}") |
|
|
|
pdf.setFont("Helvetica-Bold", 14) |
|
pdf.drawRightString(right, y, settings.get("business_name") or "OTB Billing") |
|
y -= 18 |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawRightString(right, y, settings.get("business_tagline") or "") |
|
y -= 15 |
|
|
|
right_lines = [ |
|
settings.get("business_address", ""), |
|
settings.get("business_email", ""), |
|
settings.get("business_phone", ""), |
|
settings.get("business_website", ""), |
|
] |
|
for item in right_lines: |
|
if item: |
|
pdf.drawRightString(right, y, item[:80]) |
|
y -= 14 |
|
|
|
y -= 10 |
|
|
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, "Status:") |
|
pdf.setFont("Helvetica", 12) |
|
pdf.drawString(left + 45, y, str(invoice["status"]).upper()) |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Bill To") |
|
y -= 20 |
|
pdf.setFont("Helvetica-Bold", 12) |
|
pdf.drawString(left, y, invoice["company_name"] or "") |
|
y -= 16 |
|
pdf.setFont("Helvetica", 11) |
|
if invoice.get("contact_name"): |
|
pdf.drawString(left, y, str(invoice["contact_name"])) |
|
y -= 15 |
|
if invoice.get("email"): |
|
pdf.drawString(left, y, str(invoice["email"])) |
|
y -= 15 |
|
if invoice.get("phone"): |
|
pdf.drawString(left, y, str(invoice["phone"])) |
|
y -= 15 |
|
pdf.drawString(left, y, f"Client Code: {invoice.get('client_code', '')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 13) |
|
pdf.drawString(left, y, "Invoice Details") |
|
y -= 20 |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, f"Invoice #: {invoice['invoice_number']}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Issued: {fmt_local(invoice.get('issued_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Due: {fmt_local(invoice.get('due_at'))}") |
|
y -= 15 |
|
if invoice.get("paid_at"): |
|
pdf.drawString(left, y, f"Paid: {fmt_local(invoice.get('paid_at'))}") |
|
y -= 15 |
|
pdf.drawString(left, y, f"Currency: {invoice.get('currency_code', 'CAD')}") |
|
y -= 28 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Service Code") |
|
pdf.drawString(180, y, "Service") |
|
pdf.drawString(330, y, "Description") |
|
pdf.drawRightString(right, y, "Total") |
|
y -= 14 |
|
pdf.line(left, y, right, y) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawString(left, y, str(invoice.get("service_code") or "-")) |
|
pdf.drawString(180, y, str(invoice.get("service_name") or "-")) |
|
pdf.drawString(330, y, str(invoice.get("notes") or "-")[:28]) |
|
pdf.drawRightString(right, y, money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))) |
|
y -= 28 |
|
|
|
totals_x_label = 360 |
|
totals_x_value = right |
|
|
|
totals = [ |
|
("Subtotal", money(invoice.get("subtotal_amount"), invoice.get("currency_code", "CAD"))), |
|
((settings.get("tax_label") or "Tax"), money(invoice.get("tax_amount"), invoice.get("currency_code", "CAD"))), |
|
("Total", money(invoice.get("total_amount"), invoice.get("currency_code", "CAD"))), |
|
("Paid", money(invoice.get("amount_paid"), invoice.get("currency_code", "CAD"))), |
|
] |
|
|
|
remaining = to_decimal(invoice.get("total_amount")) - to_decimal(invoice.get("amount_paid")) |
|
|
|
for label, value in totals: |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, label) |
|
pdf.setFont("Helvetica", 11) |
|
pdf.drawRightString(totals_x_value, y, value) |
|
y -= 18 |
|
|
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(totals_x_label, y, "Remaining") |
|
pdf.drawRightString(totals_x_value, y, f"{remaining:.2f} {invoice.get('currency_code', 'CAD')}") |
|
y -= 25 |
|
|
|
if settings.get("tax_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"{settings.get('tax_label') or 'Tax'} Number: {settings.get('tax_number')}") |
|
y -= 14 |
|
|
|
if settings.get("business_number"): |
|
pdf.setFont("Helvetica", 10) |
|
pdf.drawString(left, y, f"Business Number: {settings.get('business_number')}") |
|
y -= 14 |
|
|
|
if settings.get("payment_terms"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Payment Terms") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
for chunk_start in range(0, len(settings.get("payment_terms", "")), 90): |
|
line_text = settings.get("payment_terms", "")[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
if settings.get("invoice_footer"): |
|
y -= 8 |
|
pdf.setFont("Helvetica-Bold", 11) |
|
pdf.drawString(left, y, "Footer") |
|
y -= 15 |
|
pdf.setFont("Helvetica", 10) |
|
for chunk_start in range(0, len(settings.get("invoice_footer", "")), 90): |
|
line_text = settings.get("invoice_footer", "")[chunk_start:chunk_start+90] |
|
pdf.drawString(left, y, line_text) |
|
y -= 13 |
|
|
|
pdf.showPage() |
|
pdf.save() |
|
buffer.seek(0) |
|
|
|
return send_file( |
|
buffer, |
|
mimetype="application/pdf", |
|
as_attachment=True, |
|
download_name=f"{invoice['invoice_number']}.pdf" |
|
) |
|
|
|
|
|
@app.route("/invoices/view/<int:invoice_id>") |
|
def view_invoice(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.*, |
|
c.client_code, |
|
c.company_name, |
|
c.contact_name, |
|
c.email, |
|
c.phone, |
|
s.service_code, |
|
s.service_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
LEFT JOIN services s ON i.service_id = s.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return "Invoice not found", 404 |
|
|
|
conn.close() |
|
settings = get_app_settings() |
|
return render_template("invoices/view.html", invoice=invoice, settings=settings) |
|
|
|
|
|
@app.route("/invoices/edit/<int:invoice_id>", methods=["GET", "POST"]) |
|
def edit_invoice(invoice_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT i.*, |
|
COALESCE((SELECT COUNT(*) FROM payments p WHERE p.invoice_id = i.id), 0) AS payment_count |
|
FROM invoices i |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice = cursor.fetchone() |
|
|
|
if not invoice: |
|
conn.close() |
|
return "Invoice not found", 404 |
|
|
|
locked = invoice["payment_count"] > 0 or float(invoice["amount_paid"]) > 0 |
|
|
|
if request.method == "POST": |
|
due_at = request.form.get("due_at", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
if locked: |
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE invoices |
|
SET due_at = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
due_at or None, |
|
notes or None, |
|
invoice_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/invoices") |
|
|
|
client_id = request.form.get("client_id", "").strip() |
|
service_id = request.form.get("service_id", "").strip() |
|
currency_code = request.form.get("currency_code", "").strip() |
|
total_amount = request.form.get("total_amount", "").strip() |
|
status = request.form.get("status", "").strip() |
|
|
|
errors = [] |
|
|
|
if not client_id: |
|
errors.append("Client is required.") |
|
if not service_id: |
|
errors.append("Service is required.") |
|
if not currency_code: |
|
errors.append("Currency is required.") |
|
if not total_amount: |
|
errors.append("Total amount is required.") |
|
if not due_at: |
|
errors.append("Due date is required.") |
|
if not status: |
|
errors.append("Status is required.") |
|
|
|
manual_statuses = {"draft", "pending", "cancelled"} |
|
if status and status not in manual_statuses: |
|
errors.append("Manual invoice status must be draft, pending, or cancelled.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = float(total_amount) |
|
if amount_value < 0: |
|
errors.append("Total amount cannot be negative.") |
|
except ValueError: |
|
errors.append("Total amount must be a valid number.") |
|
|
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
if errors: |
|
invoice["client_id"] = int(client_id) if client_id else invoice["client_id"] |
|
invoice["service_id"] = int(service_id) if service_id else invoice["service_id"] |
|
invoice["currency_code"] = currency_code or invoice["currency_code"] |
|
invoice["total_amount"] = total_amount or invoice["total_amount"] |
|
invoice["due_at"] = due_at or invoice["due_at"] |
|
invoice["status"] = status or invoice["status"] |
|
invoice["notes"] = notes |
|
conn.close() |
|
return render_template("invoices/edit.html", invoice=invoice, clients=clients, services=services, errors=errors, locked=locked) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE invoices |
|
SET client_id = %s, |
|
service_id = %s, |
|
currency_code = %s, |
|
total_amount = %s, |
|
subtotal_amount = %s, |
|
due_at = %s, |
|
status = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
client_id, |
|
service_id, |
|
currency_code, |
|
total_amount, |
|
total_amount, |
|
due_at, |
|
status, |
|
notes or None, |
|
invoice_id |
|
)) |
|
conn.commit() |
|
conn.close() |
|
return redirect("/invoices") |
|
|
|
clients = [] |
|
services = [] |
|
|
|
if not locked: |
|
cursor.execute("SELECT id, client_code, company_name FROM clients ORDER BY company_name") |
|
clients = cursor.fetchall() |
|
|
|
cursor.execute("SELECT id, service_code, service_name FROM services ORDER BY service_name") |
|
services = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("invoices/edit.html", invoice=invoice, clients=clients, services=services, errors=[], locked=locked) |
|
|
|
|
|
|
|
@app.route("/payments/export.csv") |
|
def export_payments_csv(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
cursor.execute(""" |
|
SELECT |
|
p.id, |
|
p.invoice_id, |
|
i.invoice_number, |
|
p.client_id, |
|
c.client_code, |
|
c.company_name, |
|
p.payment_method, |
|
p.payment_currency, |
|
p.payment_amount, |
|
p.cad_value_at_payment, |
|
p.reference, |
|
p.sender_name, |
|
p.txid, |
|
p.wallet_address, |
|
p.payment_status, |
|
p.received_at, |
|
p.notes |
|
FROM payments p |
|
JOIN invoices i ON p.invoice_id = i.id |
|
JOIN clients c ON p.client_id = c.id |
|
ORDER BY p.id ASC |
|
""") |
|
rows = cursor.fetchall() |
|
conn.close() |
|
|
|
output = StringIO() |
|
writer = csv.writer(output) |
|
writer.writerow([ |
|
"id", |
|
"invoice_id", |
|
"invoice_number", |
|
"client_id", |
|
"client_code", |
|
"company_name", |
|
"payment_method", |
|
"payment_currency", |
|
"payment_amount", |
|
"cad_value_at_payment", |
|
"reference", |
|
"sender_name", |
|
"txid", |
|
"wallet_address", |
|
"payment_status", |
|
"received_at", |
|
"notes", |
|
]) |
|
|
|
for r in rows: |
|
writer.writerow([ |
|
r.get("id", ""), |
|
r.get("invoice_id", ""), |
|
r.get("invoice_number", ""), |
|
r.get("client_id", ""), |
|
r.get("client_code", ""), |
|
r.get("company_name", ""), |
|
r.get("payment_method", ""), |
|
r.get("payment_currency", ""), |
|
r.get("payment_amount", ""), |
|
r.get("cad_value_at_payment", ""), |
|
r.get("reference", ""), |
|
r.get("sender_name", ""), |
|
r.get("txid", ""), |
|
r.get("wallet_address", ""), |
|
r.get("payment_status", ""), |
|
r.get("received_at", ""), |
|
r.get("notes", ""), |
|
]) |
|
|
|
response = make_response(output.getvalue()) |
|
response.headers["Content-Type"] = "text/csv; charset=utf-8" |
|
response.headers["Content-Disposition"] = "attachment; filename=payments.csv" |
|
return response |
|
|
|
@app.route("/payments") |
|
def payments(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
p.*, |
|
i.invoice_number, |
|
i.status AS invoice_status, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.currency_code AS invoice_currency_code, |
|
c.client_code, |
|
c.company_name |
|
FROM payments p |
|
JOIN invoices i ON p.invoice_id = i.id |
|
JOIN clients c ON p.client_id = c.id |
|
ORDER BY p.id DESC |
|
""") |
|
payments = cursor.fetchall() |
|
|
|
conn.close() |
|
return render_template("payments/list.html", payments=payments) |
|
|
|
@app.route("/payments/new", methods=["GET", "POST"]) |
|
def new_payment(): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
if request.method == "POST": |
|
invoice_id = request.form.get("invoice_id", "").strip() |
|
payment_method = request.form.get("payment_method", "").strip() |
|
payment_currency = request.form.get("payment_currency", "").strip() |
|
payment_amount = request.form.get("payment_amount", "").strip() |
|
cad_value_at_payment = request.form.get("cad_value_at_payment", "").strip() |
|
reference = request.form.get("reference", "").strip() |
|
sender_name = request.form.get("sender_name", "").strip() |
|
txid = request.form.get("txid", "").strip() |
|
wallet_address = request.form.get("wallet_address", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not invoice_id: |
|
errors.append("Invoice is required.") |
|
if not payment_method: |
|
errors.append("Payment method is required.") |
|
if not payment_currency: |
|
errors.append("Payment currency is required.") |
|
if not payment_amount: |
|
errors.append("Payment amount is required.") |
|
if not cad_value_at_payment: |
|
errors.append("CAD value at payment is required.") |
|
|
|
if not errors: |
|
try: |
|
payment_amount_value = Decimal(str(payment_amount)) |
|
if payment_amount_value <= Decimal("0"): |
|
errors.append("Payment amount must be greater than zero.") |
|
except Exception: |
|
errors.append("Payment amount must be a valid number.") |
|
|
|
if not errors: |
|
try: |
|
cad_value_value = Decimal(str(cad_value_at_payment)) |
|
if cad_value_value < Decimal("0"): |
|
errors.append("CAD value at payment cannot be negative.") |
|
except Exception: |
|
errors.append("CAD value at payment must be a valid number.") |
|
|
|
invoice_row = None |
|
|
|
if not errors: |
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.client_id, |
|
i.invoice_number, |
|
i.currency_code, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
c.client_code, |
|
c.company_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.id = %s |
|
""", (invoice_id,)) |
|
invoice_row = cursor.fetchone() |
|
|
|
if not invoice_row: |
|
errors.append("Selected invoice was not found.") |
|
else: |
|
allowed_statuses = {"pending", "partial", "overdue"} |
|
if invoice_row["status"] not in allowed_statuses: |
|
errors.append("Payments can only be recorded against pending, partial, or overdue invoices.") |
|
else: |
|
remaining_balance = to_decimal(invoice_row["total_amount"]) - to_decimal(invoice_row["amount_paid"]) |
|
entered_amount = to_decimal(payment_amount) |
|
|
|
if remaining_balance <= Decimal("0"): |
|
errors.append("This invoice has no remaining balance.") |
|
elif entered_amount > remaining_balance: |
|
errors.append( |
|
f"Payment amount exceeds remaining balance. Remaining balance is {fmt_money(remaining_balance, invoice_row['currency_code'])} {invoice_row['currency_code']}." |
|
) |
|
|
|
if errors: |
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.currency_code, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
c.client_code, |
|
c.company_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.status IN ('pending', 'partial', 'overdue') |
|
AND (i.total_amount - i.amount_paid) > 0 |
|
ORDER BY i.id DESC |
|
""") |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
form_data = { |
|
"invoice_id": invoice_id, |
|
"payment_method": payment_method, |
|
"payment_currency": payment_currency, |
|
"payment_amount": payment_amount, |
|
"cad_value_at_payment": cad_value_at_payment, |
|
"reference": reference, |
|
"sender_name": sender_name, |
|
"txid": txid, |
|
"wallet_address": wallet_address, |
|
"notes": notes, |
|
} |
|
|
|
return render_template( |
|
"payments/new.html", |
|
invoices=invoices, |
|
errors=errors, |
|
form_data=form_data, |
|
) |
|
|
|
client_id = invoice_row["client_id"] |
|
|
|
insert_cursor = conn.cursor() |
|
insert_cursor.execute(""" |
|
INSERT INTO payments |
|
( |
|
invoice_id, |
|
client_id, |
|
payment_method, |
|
payment_currency, |
|
payment_amount, |
|
cad_value_at_payment, |
|
reference, |
|
sender_name, |
|
txid, |
|
wallet_address, |
|
payment_status, |
|
received_at, |
|
notes |
|
) |
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'confirmed', UTC_TIMESTAMP(), %s) |
|
""", ( |
|
invoice_id, |
|
client_id, |
|
payment_method, |
|
payment_currency, |
|
payment_amount, |
|
cad_value_at_payment, |
|
reference or None, |
|
sender_name or None, |
|
txid or None, |
|
wallet_address or None, |
|
notes or None |
|
)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
recalc_invoice_totals(invoice_id) |
|
|
|
return redirect("/payments") |
|
|
|
cursor.execute(""" |
|
SELECT |
|
i.id, |
|
i.invoice_number, |
|
i.currency_code, |
|
i.total_amount, |
|
i.amount_paid, |
|
i.status, |
|
c.client_code, |
|
c.company_name |
|
FROM invoices i |
|
JOIN clients c ON i.client_id = c.id |
|
WHERE i.status IN ('pending', 'partial', 'overdue') |
|
AND (i.total_amount - i.amount_paid) > 0 |
|
ORDER BY i.id DESC |
|
""") |
|
invoices = cursor.fetchall() |
|
conn.close() |
|
|
|
return render_template( |
|
"payments/new.html", |
|
invoices=invoices, |
|
errors=[], |
|
form_data={}, |
|
) |
|
|
|
|
|
|
|
@app.route("/payments/void/<int:payment_id>", methods=["POST"]) |
|
def void_payment(payment_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT id, invoice_id, payment_status |
|
FROM payments |
|
WHERE id = %s |
|
""", (payment_id,)) |
|
payment = cursor.fetchone() |
|
|
|
if not payment: |
|
conn.close() |
|
return "Payment not found", 404 |
|
|
|
if payment["payment_status"] != "confirmed": |
|
conn.close() |
|
return redirect("/payments") |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE payments |
|
SET payment_status = 'reversed' |
|
WHERE id = %s |
|
""", (payment_id,)) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
recalc_invoice_totals(payment["invoice_id"]) |
|
|
|
return redirect("/payments") |
|
|
|
recalc_invoice_totals(payment["invoice_id"]) |
|
|
|
return redirect("/payments") |
|
|
|
@app.route("/payments/edit/<int:payment_id>", methods=["GET", "POST"]) |
|
def edit_payment(payment_id): |
|
conn = get_db_connection() |
|
cursor = conn.cursor(dictionary=True) |
|
|
|
cursor.execute(""" |
|
SELECT |
|
p.*, |
|
i.invoice_number, |
|
c.client_code, |
|
c.company_name |
|
FROM payments p |
|
JOIN invoices i ON p.invoice_id = i.id |
|
JOIN clients c ON p.client_id = c.id |
|
WHERE p.id = %s |
|
""", (payment_id,)) |
|
payment = cursor.fetchone() |
|
|
|
if not payment: |
|
conn.close() |
|
return "Payment not found", 404 |
|
|
|
if request.method == "POST": |
|
payment_method = request.form.get("payment_method", "").strip() |
|
payment_currency = request.form.get("payment_currency", "").strip() |
|
payment_amount = request.form.get("payment_amount", "").strip() |
|
cad_value_at_payment = request.form.get("cad_value_at_payment", "").strip() |
|
reference = request.form.get("reference", "").strip() |
|
sender_name = request.form.get("sender_name", "").strip() |
|
txid = request.form.get("txid", "").strip() |
|
wallet_address = request.form.get("wallet_address", "").strip() |
|
notes = request.form.get("notes", "").strip() |
|
|
|
errors = [] |
|
|
|
if not payment_method: |
|
errors.append("Payment method is required.") |
|
if not payment_currency: |
|
errors.append("Payment currency is required.") |
|
if not payment_amount: |
|
errors.append("Payment amount is required.") |
|
if not cad_value_at_payment: |
|
errors.append("CAD value at payment is required.") |
|
|
|
if not errors: |
|
try: |
|
amount_value = float(payment_amount) |
|
if amount_value <= 0: |
|
errors.append("Payment amount must be greater than zero.") |
|
except ValueError: |
|
errors.append("Payment amount must be a valid number.") |
|
|
|
try: |
|
cad_value = float(cad_value_at_payment) |
|
if cad_value < 0: |
|
errors.append("CAD value at payment cannot be negative.") |
|
except ValueError: |
|
errors.append("CAD value at payment must be a valid number.") |
|
|
|
if errors: |
|
payment["payment_method"] = payment_method or payment["payment_method"] |
|
payment["payment_currency"] = payment_currency or payment["payment_currency"] |
|
payment["payment_amount"] = payment_amount or payment["payment_amount"] |
|
payment["cad_value_at_payment"] = cad_value_at_payment or payment["cad_value_at_payment"] |
|
payment["reference"] = reference |
|
payment["sender_name"] = sender_name |
|
payment["txid"] = txid |
|
payment["wallet_address"] = wallet_address |
|
payment["notes"] = notes |
|
conn.close() |
|
return render_template("payments/edit.html", payment=payment, errors=errors) |
|
|
|
update_cursor = conn.cursor() |
|
update_cursor.execute(""" |
|
UPDATE payments |
|
SET payment_method = %s, |
|
payment_currency = %s, |
|
payment_amount = %s, |
|
cad_value_at_payment = %s, |
|
reference = %s, |
|
sender_name = %s, |
|
txid = %s, |
|
wallet_address = %s, |
|
notes = %s |
|
WHERE id = %s |
|
""", ( |
|
payment_method, |
|
payment_currency, |
|
payment_amount, |
|
cad_value_at_payment, |
|
reference or None, |
|
sender_name or None, |
|
txid or None, |
|
wallet_address or None, |
|
notes or None, |
|
payment_id |
|
)) |
|
conn.commit() |
|
invoice_id = payment["invoice_id"] |
|
conn.close() |
|
|
|
recalc_invoice_totals(invoice_id) |
|
|
|
return redirect("/payments") |
|
|
|
conn.close() |
|
return render_template("payments/edit.html", payment=payment, errors=[]) |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=5050, debug=True)
|
|
|