(refactor): Improve email sending and logging, enhance security

Implemented robust logging using the logging module
Improved error handling and resource management with
try...except..finally blocks
Seprareted email sending logic into smaller managable functions
Added SENDER_EMAIL configuration for sending emails
Fixed security vulnerabilities
This commit is contained in:
Blake Ridgway 2025-04-03 11:50:18 -05:00
parent fe8e8c7e64
commit 145d426dc0

128
app.py
View file

@ -1,103 +1,137 @@
import os import os
import logging
import smtplib import smtplib
from email.mime.text import MIMEText from email.mime.text import MIMEText
from flask import Flask, render_template, request, redirect, url_for, flash, session from flask import (
Flask,
render_template,
request,
redirect,
url_for,
flash,
session,
)
from dotenv import load_dotenv from dotenv import load_dotenv
from werkzeug.security import check_password_hash from werkzeug.security import check_password_hash
from functools import wraps # Import wraps
from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin
load_dotenv() load_dotenv()
app = Flask(__name__) app = Flask(__name__)
# Use a secret key from .env; ensure your .env sets SECRET_KEY app.secret_key = os.getenv("SECRET_KEY")
app.secret_key = os.getenv('SECRET_KEY') base_url = os.getenv("BASE_URL")
base_url = os.getenv('BASE_URL')
# SMTP settings (for sending update emails) # SMTP settings (for sending update emails)
SMTP_SERVER = os.getenv('SMTP_SERVER') SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
SMTP_USER = os.getenv('SMTP_USER') SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv('SMTP_PASSWORD') SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) # Use SENDER_EMAIL
# Logging setup
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize the database and create default admin user if necessary. # Initialize the database and create default admin user if necessary.
init_db() init_db()
create_default_admin() create_default_admin()
# Decorator for requiring login
def login_required(f): def login_required(f):
from functools import wraps @wraps(f) # Use wraps to preserve function metadata
@wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if "username" not in session: if "username" not in session:
return redirect(url_for('login')) return redirect(url_for("login"))
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function
def send_update_email(subject, body, email):
"""Sends email, returns True on success, False on failure."""
try:
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
server.set_debuglevel(False) # Keep debug level at False for production
server.login(SMTP_USER, SMTP_PASSWORD)
unsub_link = f"https://{base_url}/unsubscribe?email={email}"
custom_body = (
f"{body}<br><br>"
f"If you ever wish to unsubscribe, please click <a href='{unsub_link}'>here</a>"
)
msg = MIMEText(custom_body, "html", "utf-8")
msg["Subject"] = subject
msg["From"] = SENDER_EMAIL # Use sender email
msg["To"] = email
server.sendmail(SENDER_EMAIL, email, msg.as_string()) # Use sender email
server.quit()
logger.info(f"Update email sent to: {email}")
return True
except Exception as e:
logger.error(f"Failed to send email to {email}: {e}")
return False
def process_send_update_email(subject, body): def process_send_update_email(subject, body):
"""Helper function to send an update email to all subscribers.""" """Helper function to send an update email to all subscribers."""
subscribers = get_all_emails() subscribers = get_all_emails()
if not subscribers: if not subscribers:
return "No subscribers found." return "No subscribers found."
try: try:
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
server.set_debuglevel(True)
server.login(SMTP_USER, SMTP_PASSWORD)
for email in subscribers: for email in subscribers:
unsub_link = f"https://{base_url}/unsubscribe?email={email}" if not send_update_email(subject, body, email):
custom_body = ( return f"Failed to send to {email}" # Specific failure message
f"{body}<br><br>"
f"If you ever wish to unsubscribe, please click <a href='{unsub_link}'>here</a>" # Log newsletter content for audit purposes
)
msg = MIMEText(custom_body, 'html', 'utf-8')
msg['Subject'] = subject
msg['From'] = SMTP_USER
msg['To'] = email
server.sendmail(SMTP_USER, email, msg.as_string())
print(f"Update email sent to: {email}")
server.quit()
conn = get_connection() conn = get_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
"INSERT INTO newsletters (subject, body) VALUES (%s, %s)", "INSERT INTO newsletters (subject, body) VALUES (%s, %s)", (subject, body)
(subject, body) )
)
conn.commit() conn.commit()
cursor.close() cursor.close()
conn.close() conn.close()
return "Email has been sent." return "Email has been sent to all subscribers."
except Exception as e: except Exception as e:
print(f"Failed to send email: {e}") logger.exception("Error processing sending updates")
return f"Failed to send email: {e}" return f"Failed to send email: {e}"
@app.route('/')
@app.route("/")
@login_required @login_required
def index(): def index():
"""Displays all subscriber emails""" """Displays all subscriber emails"""
emails = get_all_emails() emails = get_all_emails()
return render_template("admin_index.html", emails=emails) return render_template("admin_index.html", emails=emails)
@app.route('/send_update', methods=['GET', 'POST'])
@app.route("/send_update", methods=["GET", "POST"])
@login_required @login_required
def send_update(): def send_update():
"""Display a form to send an update email; process submission on POST.""" """Display a form to send an update email; process submission on POST."""
if request.method == 'POST': if request.method == "POST":
subject = request.form['subject'] subject = request.form["subject"]
body = request.form['body'] body = request.form["body"]
# Call the helper function using its new name.
result_message = process_send_update_email(subject, body) result_message = process_send_update_email(subject, body)
flash(result_message) flash(result_message)
return redirect(url_for("send_update")) return redirect(url_for("send_update"))
return render_template("send_update.html") return render_template("send_update.html")
@app.route('/login', methods=['GET', 'POST'])
@app.route("/login", methods=["GET", "POST"])
def login(): def login():
if request.method == 'POST': if request.method == "POST":
username = request.form.get('username') username = request.form.get("username")
password = request.form.get('password') password = request.form.get("password")
admin = get_admin(username) admin = get_admin(username)
# Expect get_admin() to return a tuple like (username, password_hash)
if admin and check_password_hash(admin[1], password): if admin and check_password_hash(admin[1], password):
session['username'] = username session["username"] = username
flash("Logged in successfully", "success") flash("Logged in successfully", "success")
return redirect(url_for("index")) return redirect(url_for("index"))
else: else:
@ -105,11 +139,13 @@ def login():
return redirect(url_for("login")) return redirect(url_for("login"))
return render_template("login.html") return render_template("login.html")
@app.route('/logout')
@app.route("/logout")
def logout(): def logout():
session.pop('username', None) session.pop("username", None)
flash("Logged out successfully", "success") flash("Logged out successfully", "success")
return redirect(url_for("login")) return redirect(url_for("login"))
if __name__ == '__main__':
if __name__ == "__main__":
app.run(port=5001, debug=True) app.run(port=5001, debug=True)