import os import smtplib from email.mime.text import MIMEText from functools import wraps from urllib.parse import urlparse, urljoin from flask import ( Flask, render_template, request, redirect, url_for, flash, session, ) from markupsafe import escape from dotenv import load_dotenv from werkzeug.security import check_password_hash from database import ( get_connection, init_db, get_all_emails, get_admin, create_default_admin, ) load_dotenv() app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY") base_url = os.getenv("BASE_URL", "").strip().strip("/") SMTP_SERVER = os.getenv("SMTP_SERVER") SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) init_db() create_default_admin() def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if "username" not in session: next_url = request.full_path if request.query_string else request.path return redirect(url_for("login", next=next_url)) return f(*args, **kwargs) return decorated_function def get_dashboard_counts(): """Return dict of counts: total subscribers, total newsletters, sent today.""" counts = {"total_subscribers": 0, "total_newsletters": 0, "sent_today": 0} try: conn = get_connection() cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM subscribers") counts["total_subscribers"] = cur.fetchone()[0] or 0 cur.execute("SELECT COUNT(*) FROM newsletters") counts["total_newsletters"] = cur.fetchone()[0] or 0 cur.execute( """ SELECT COUNT(*) FROM newsletters WHERE sent_at::date = CURRENT_DATE """ ) counts["sent_today"] = cur.fetchone()[0] or 0 cur.close() conn.close() except Exception: pass return counts def is_safe_url(target: str) -> bool: if not target: return False ref = urlparse(request.host_url) test = urlparse(urljoin(request.host_url, target)) return test.scheme in ("http", "https") and ref.netloc == test.netloc def send_update_email(subject: str, body_html: str, email: str) -> bool: """Send a single HTML email with retries.""" max_retries = 3 retry_count = 0 unsub_link = "" if base_url: unsub_link = f"https://{base_url}/unsubscribe?email={email}" if unsub_link: custom_body = ( f"{body_html}" f"

" f"If you ever wish to unsubscribe, please click " f"here." ) else: custom_body = body_html while retry_count < max_retries: try: server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10) server.set_debuglevel(0) server.login(SMTP_USER, SMTP_PASSWORD) msg = MIMEText(custom_body, "html", "utf-8") msg["Subject"] = subject msg["From"] = SENDER_EMAIL msg["To"] = email server.sendmail(SENDER_EMAIL, [email], msg.as_string()) server.quit() return True except Exception: retry_count += 1 if retry_count >= max_retries: break import time time.sleep(1.0) return False def process_send_update_email(subject: str, body_html: str) -> str: """Send update email to all subscribers and log newsletter content.""" try: subscribers = get_all_emails() if not subscribers: return "No subscribers found." failures = [] for email in subscribers: if not send_update_email(subject, body_html, email): failures.append(email) try: conn = get_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO newsletters (subject, body) VALUES (%s, %s)", (subject, body_html), ) conn.commit() cursor.close() conn.close() except Exception: pass if failures: return f"Sent with failures: {len(failures)} recipients failed." return "Email has been sent to all subscribers." except Exception as e: return f"Failed to send email: {e}" @app.route("/", methods=["GET"]) @login_required def index(): """Dashboard: list subscriber emails and show widgets.""" emails = [] try: emails = get_all_emails() except Exception: flash("Could not load subscribers right now.", "danger") counts = get_dashboard_counts() return render_template("admin_index.html", emails=emails, counts=counts) @app.route("/send_update", methods=["GET", "POST"]) @login_required def send_update(): if request.method == "POST": subject = (request.form.get("subject") or "").strip() body_html = request.form.get("body") or "" if not subject or not body_html: flash("Subject and body are required", "danger") return redirect(url_for("send_update")) result_message = process_send_update_email(subject, body_html) flash(escape(result_message)) return redirect(url_for("send_update")) return render_template("send_update.html") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = (request.form.get("username") or "").strip() password = (request.form.get("password") or "").strip() if not username or not password: flash("Username and password are required", "danger") return redirect(url_for("login")) admin = get_admin(username) if admin and check_password_hash(admin[1], password): session["username"] = username session.permanent = True app.config["SESSION_COOKIE_HTTPONLY"] = True app.config["SESSION_COOKIE_SECURE"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" next_url = request.args.get("next") if next_url and is_safe_url(next_url): flash("Logged in successfully", "success") return redirect(next_url) flash("Logged in successfully", "success") return redirect(url_for("index")) else: flash("Invalid username or password", "danger") return redirect(url_for("login")) return render_template("login.html") @app.route("/logout", methods=["GET"]) def logout(): session.pop("username", None) flash("Logged out successfully", "success") return redirect(url_for("login")) if __name__ == "__main__": is_prod = os.getenv("ENVIRONMENT", "development").lower() == "production" app.config["PREFERRED_URL_SCHEME"] = "https" if is_prod else "http" if is_prod: app.run(host="0.0.0.0", port=5001, debug=False, use_reloader=False) else: app.run(host="0.0.0.0", port=5001, debug=True, use_reloader=True)