From a8589b659fbc838fdb4747871815e1a0fa8a7f80 Mon Sep 17 00:00:00 2001 From: Cipher Vance Date: Sun, 31 Aug 2025 12:11:23 -0500 Subject: [PATCH] refactor(app): remove logging, add security, dashboard counts, and robust flows --- app.py | 244 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 169 insertions(+), 75 deletions(-) diff --git a/app.py b/app.py index 8d6188e..85c05bd 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,9 @@ import os -import logging import smtplib from email.mime.text import MIMEText +from functools import wraps +from urllib.parse import urlparse, urljoin + from flask import ( Flask, render_template, @@ -11,135 +13,222 @@ from flask import ( flash, session, ) +from markupsafe import escape from dotenv import load_dotenv from werkzeug.security import check_password_hash -from functools import wraps # Import wraps -from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin + +from database import ( + get_connection, + init_db, + get_all_emails, + get_admin, + create_default_admin, +) load_dotenv() + app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY") -base_url = os.getenv("BASE_URL") +base_url = os.getenv("BASE_URL", "").strip().strip("/") -# SMTP settings (for sending update emails) SMTP_SERVER = os.getenv("SMTP_SERVER") SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") -SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) # Use SENDER_EMAIL +SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) -# Logging setup -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - -# Initialize the database and create default admin user if necessary. init_db() create_default_admin() -# Decorator for requiring login + def login_required(f): - @wraps(f) # Use wraps to preserve function metadata + @wraps(f) def decorated_function(*args, **kwargs): if "username" not in session: - return redirect(url_for("login")) + next_url = request.full_path if request.query_string else request.path + return redirect(url_for("login", next=next_url)) return f(*args, **kwargs) return decorated_function -def send_update_email(subject, body, email): - """Sends email, returns True on success, False on failure.""" +def get_dashboard_counts(): + """Return dict of counts: total subscribers, total newsletters, sent today.""" + counts = {"total_subscribers": 0, "total_newsletters": 0, "sent_today": 0} try: - server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10) - server.set_debuglevel(False) # Keep debug level at False for production - server.login(SMTP_USER, SMTP_PASSWORD) - - unsub_link = f"https://{base_url}/unsubscribe?email={email}" - custom_body = ( - f"{body}

" - f"If you ever wish to unsubscribe, please click here" - ) - - msg = MIMEText(custom_body, "html", "utf-8") - msg["Subject"] = subject - msg["From"] = SENDER_EMAIL # Use sender email - msg["To"] = email - - server.sendmail(SENDER_EMAIL, email, msg.as_string()) # Use sender email - - server.quit() - logger.info(f"Update email sent to: {email}") - return True - except Exception as e: - logger.error(f"Failed to send email to {email}: {e}") - return False - - -def process_send_update_email(subject, body): - """Helper function to send an update email to all subscribers.""" - subscribers = get_all_emails() - if not subscribers: - return "No subscribers found." - try: - for email in subscribers: - if not send_update_email(subject, body, email): - return f"Failed to send to {email}" # Specific failure message - - # Log newsletter content for audit purposes conn = get_connection() - cursor = conn.cursor() - cursor.execute( - "INSERT INTO newsletters (subject, body) VALUES (%s, %s)", (subject, body) - ) - conn.commit() - cursor.close() - conn.close() + cur = conn.cursor() + cur.execute("SELECT COUNT(*) FROM subscribers") + counts["total_subscribers"] = cur.fetchone()[0] or 0 + cur.execute("SELECT COUNT(*) FROM newsletters") + counts["total_newsletters"] = cur.fetchone()[0] or 0 + + cur.execute( + """ + SELECT COUNT(*) + FROM newsletters + WHERE sent_at::date = CURRENT_DATE + """ + ) + counts["sent_today"] = cur.fetchone()[0] or 0 + + cur.close() + conn.close() + except Exception: + pass + return counts + + +def is_safe_url(target: str) -> bool: + if not target: + return False + ref = urlparse(request.host_url) + test = urlparse(urljoin(request.host_url, target)) + return test.scheme in ("http", "https") and ref.netloc == test.netloc + + +def send_update_email(subject: str, body_html: str, email: str) -> bool: + """Send a single HTML email with retries.""" + max_retries = 3 + retry_count = 0 + + unsub_link = "" + if base_url: + unsub_link = f"https://{base_url}/unsubscribe?email={email}" + + if unsub_link: + custom_body = ( + f"{body_html}" + f"

" + f"If you ever wish to unsubscribe, please click " + f"here." + ) + else: + custom_body = body_html + + while retry_count < max_retries: + try: + server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10) + server.set_debuglevel(0) + server.login(SMTP_USER, SMTP_PASSWORD) + + msg = MIMEText(custom_body, "html", "utf-8") + msg["Subject"] = subject + msg["From"] = SENDER_EMAIL + msg["To"] = email + + server.sendmail(SENDER_EMAIL, [email], msg.as_string()) + server.quit() + return True + except Exception: + retry_count += 1 + if retry_count >= max_retries: + break + import time + + time.sleep(1.0) + + return False + + +def process_send_update_email(subject: str, body_html: str) -> str: + """Send update email to all subscribers and log newsletter content.""" + try: + subscribers = get_all_emails() + if not subscribers: + return "No subscribers found." + + failures = [] + for email in subscribers: + if not send_update_email(subject, body_html, email): + failures.append(email) + + try: + conn = get_connection() + cursor = conn.cursor() + cursor.execute( + "INSERT INTO newsletters (subject, body) VALUES (%s, %s)", + (subject, body_html), + ) + conn.commit() + cursor.close() + conn.close() + except Exception: + pass + + if failures: + return f"Sent with failures: {len(failures)} recipients failed." return "Email has been sent to all subscribers." except Exception as e: - logger.exception("Error processing sending updates") return f"Failed to send email: {e}" -@app.route("/") +@app.route("/", methods=["GET"]) @login_required def index(): - """Displays all subscriber emails""" - emails = get_all_emails() - return render_template("admin_index.html", emails=emails) + """Dashboard: list subscriber emails and show widgets.""" + emails = [] + try: + emails = get_all_emails() + except Exception: + flash("Could not load subscribers right now.", "danger") + + counts = get_dashboard_counts() + return render_template("admin_index.html", emails=emails, counts=counts) + @app.route("/send_update", methods=["GET", "POST"]) @login_required def send_update(): - """Display a form to send an update email; process submission on POST.""" if request.method == "POST": - subject = request.form["subject"] - body = request.form["body"] - result_message = process_send_update_email(subject, body) - flash(result_message) + subject = (request.form.get("subject") or "").strip() + body_html = request.form.get("body") or "" + + if not subject or not body_html: + flash("Subject and body are required", "danger") + return redirect(url_for("send_update")) + + result_message = process_send_update_email(subject, body_html) + flash(escape(result_message)) return redirect(url_for("send_update")) + return render_template("send_update.html") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": - username = request.form.get("username") - password = request.form.get("password") + username = (request.form.get("username") or "").strip() + password = (request.form.get("password") or "").strip() + + if not username or not password: + flash("Username and password are required", "danger") + return redirect(url_for("login")) + admin = get_admin(username) if admin and check_password_hash(admin[1], password): session["username"] = username + session.permanent = True + app.config["SESSION_COOKIE_HTTPONLY"] = True + app.config["SESSION_COOKIE_SECURE"] = True + app.config["SESSION_COOKIE_SAMESITE"] = "Lax" + + next_url = request.args.get("next") + if next_url and is_safe_url(next_url): + flash("Logged in successfully", "success") + return redirect(next_url) + flash("Logged in successfully", "success") return redirect(url_for("index")) else: flash("Invalid username or password", "danger") return redirect(url_for("login")) + return render_template("login.html") -@app.route("/logout") +@app.route("/logout", methods=["GET"]) def logout(): session.pop("username", None) flash("Logged out successfully", "success") @@ -147,4 +236,9 @@ def logout(): if __name__ == "__main__": - app.run(port=5001, debug=True) + is_prod = os.getenv("ENVIRONMENT", "development").lower() == "production" + app.config["PREFERRED_URL_SCHEME"] = "https" if is_prod else "http" + if is_prod: + app.run(host="0.0.0.0", port=5001, debug=False, use_reloader=False) + else: + app.run(host="0.0.0.0", port=5001, debug=True, use_reloader=True) \ No newline at end of file