import os import time from threading import Thread import smtplib from email.mime.text import MIMEText from flask import Flask, render_template, request, jsonify from dotenv import load_dotenv from database import init_db, get_connection, add_email, remove_email load_dotenv() SMTP_SERVER = os.getenv('SMTP_SERVER') SMTP_PORT = int(os.getenv('SMTP_PORT', 465)) SMTP_USER = os.getenv('SMTP_USER') SMTP_PASSWORD = os.getenv('SMTP_PASSWORD') app = Flask(__name__) init_db() @app.before_request def start_timer(): request._start_time = time.time() @app.after_request def log_request(response): elapsed = time.time() - getattr(request, '_start_time', time.time()) app.logger.info(f"{request.method} {request.path} completed in {elapsed:.3f}s") return response def send_confirmation_email(to_address: str, unsubscribe_link: str): """ Sends the HTML confirmation email to `to_address`. This runs inside its own SMTP_SSL connection (timeout=10s). """ subject = "Thanks for subscribing!" html_body = render_template( "confirmation_email.html", unsubscribe_link=unsubscribe_link ) msg = MIMEText(html_body, "html", "utf-8") msg["Subject"] = subject msg["From"] = SMTP_USER msg["To"] = to_address try: with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10) as server: server.login(SMTP_USER, SMTP_PASSWORD) server.sendmail(SMTP_USER, [to_address], msg.as_string()) except Exception as e: app.logger.error(f"Failed to send email to {to_address}: {e}") def send_confirmation_async(email, unsubscribe_link): """ Wrapper for threading.Thread target. """ send_confirmation_email(email, unsubscribe_link) @app.route("/", methods=["GET"]) def index(): return render_template("index.html") @app.route("/subscribe", methods=["POST"]) def subscribe(): data = request.get_json() or {} email = data.get("email") if not email: return jsonify(error="No email provided"), 400 if add_email(email): unsubscribe_link = f"{request.url_root}unsubscribe?email={email}" Thread( target=send_confirmation_async, args=(email, unsubscribe_link), daemon=True ).start() return jsonify(message="Email has been added"), 201 return jsonify(error="Email already exists"), 400 @app.route("/unsubscribe", methods=["GET"]) def unsubscribe(): email = request.args.get("email") if not email: return "No email specified.", 400 if remove_email(email): return f"The email {email} has been unsubscribed.", 200 return f"Email {email} was not found or has already been unsubscribed.", 400 @app.route("/newsletters", methods=["GET"]) def newsletters(): """ List all newsletters (newest first). """ conn = get_connection() cursor = conn.cursor() cursor.execute( "SELECT id, subject, body, sent_at " + "FROM newsletters ORDER BY sent_at DESC" ) rows = cursor.fetchall() cursor.close() conn.close() newsletters = [ {"id": r[0], "subject": r[1], "body": r[2], "sent_at": r[3]} for r in rows ] return render_template("newsletters.html", newsletters=newsletters) @app.route("/newsletter/", methods=["GET"]) def newsletter_detail(newsletter_id): """ Show a single newsletter by its ID. """ conn = get_connection() cursor = conn.cursor() cursor.execute( "SELECT id, subject, body, sent_at " + "FROM newsletters WHERE id = %s", (newsletter_id,) ) row = cursor.fetchone() cursor.close() conn.close() if not row: return "Newsletter not found.", 404 newsletter = { "id": row[0], "subject": row[1], "body": row[2], "sent_at": row[3] } return render_template("newsletter_detail.html", newsletter=newsletter) if __name__ == "__main__": app.run(host="0.0.0.0", debug=True)