diff --git a/app.py b/app.py
index 8d6188e..85c05bd 100644
--- a/app.py
+++ b/app.py
@@ -1,7 +1,9 @@
import os
-import logging
import smtplib
from email.mime.text import MIMEText
+from functools import wraps
+from urllib.parse import urlparse, urljoin
+
from flask import (
Flask,
render_template,
@@ -11,135 +13,222 @@ from flask import (
flash,
session,
)
+from markupsafe import escape
from dotenv import load_dotenv
from werkzeug.security import check_password_hash
-from functools import wraps # Import wraps
-from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin
+
+from database import (
+ get_connection,
+ init_db,
+ get_all_emails,
+ get_admin,
+ create_default_admin,
+)
load_dotenv()
+
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY")
-base_url = os.getenv("BASE_URL")
+base_url = os.getenv("BASE_URL", "").strip().strip("/")
-# SMTP settings (for sending update emails)
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
-SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) # Use SENDER_EMAIL
+SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER)
-# Logging setup
-logging.basicConfig(
- level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
-)
-logger = logging.getLogger(__name__)
-
-# Initialize the database and create default admin user if necessary.
init_db()
create_default_admin()
-# Decorator for requiring login
+
def login_required(f):
- @wraps(f) # Use wraps to preserve function metadata
+ @wraps(f)
def decorated_function(*args, **kwargs):
if "username" not in session:
- return redirect(url_for("login"))
+ next_url = request.full_path if request.query_string else request.path
+ return redirect(url_for("login", next=next_url))
return f(*args, **kwargs)
return decorated_function
-def send_update_email(subject, body, email):
- """Sends email, returns True on success, False on failure."""
+def get_dashboard_counts():
+ """Return dict of counts: total subscribers, total newsletters, sent today."""
+ counts = {"total_subscribers": 0, "total_newsletters": 0, "sent_today": 0}
try:
- server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
- server.set_debuglevel(False) # Keep debug level at False for production
- server.login(SMTP_USER, SMTP_PASSWORD)
-
- unsub_link = f"https://{base_url}/unsubscribe?email={email}"
- custom_body = (
- f"{body}
"
- f"If you ever wish to unsubscribe, please click here"
- )
-
- msg = MIMEText(custom_body, "html", "utf-8")
- msg["Subject"] = subject
- msg["From"] = SENDER_EMAIL # Use sender email
- msg["To"] = email
-
- server.sendmail(SENDER_EMAIL, email, msg.as_string()) # Use sender email
-
- server.quit()
- logger.info(f"Update email sent to: {email}")
- return True
- except Exception as e:
- logger.error(f"Failed to send email to {email}: {e}")
- return False
-
-
-def process_send_update_email(subject, body):
- """Helper function to send an update email to all subscribers."""
- subscribers = get_all_emails()
- if not subscribers:
- return "No subscribers found."
- try:
- for email in subscribers:
- if not send_update_email(subject, body, email):
- return f"Failed to send to {email}" # Specific failure message
-
- # Log newsletter content for audit purposes
conn = get_connection()
- cursor = conn.cursor()
- cursor.execute(
- "INSERT INTO newsletters (subject, body) VALUES (%s, %s)", (subject, body)
- )
- conn.commit()
- cursor.close()
- conn.close()
+ cur = conn.cursor()
+ cur.execute("SELECT COUNT(*) FROM subscribers")
+ counts["total_subscribers"] = cur.fetchone()[0] or 0
+ cur.execute("SELECT COUNT(*) FROM newsletters")
+ counts["total_newsletters"] = cur.fetchone()[0] or 0
+
+ cur.execute(
+ """
+ SELECT COUNT(*)
+ FROM newsletters
+ WHERE sent_at::date = CURRENT_DATE
+ """
+ )
+ counts["sent_today"] = cur.fetchone()[0] or 0
+
+ cur.close()
+ conn.close()
+ except Exception:
+ pass
+ return counts
+
+
+def is_safe_url(target: str) -> bool:
+ if not target:
+ return False
+ ref = urlparse(request.host_url)
+ test = urlparse(urljoin(request.host_url, target))
+ return test.scheme in ("http", "https") and ref.netloc == test.netloc
+
+
+def send_update_email(subject: str, body_html: str, email: str) -> bool:
+ """Send a single HTML email with retries."""
+ max_retries = 3
+ retry_count = 0
+
+ unsub_link = ""
+ if base_url:
+ unsub_link = f"https://{base_url}/unsubscribe?email={email}"
+
+ if unsub_link:
+ custom_body = (
+ f"{body_html}"
+ f"
"
+ f"If you ever wish to unsubscribe, please click "
+ f"here."
+ )
+ else:
+ custom_body = body_html
+
+ while retry_count < max_retries:
+ try:
+ server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
+ server.set_debuglevel(0)
+ server.login(SMTP_USER, SMTP_PASSWORD)
+
+ msg = MIMEText(custom_body, "html", "utf-8")
+ msg["Subject"] = subject
+ msg["From"] = SENDER_EMAIL
+ msg["To"] = email
+
+ server.sendmail(SENDER_EMAIL, [email], msg.as_string())
+ server.quit()
+ return True
+ except Exception:
+ retry_count += 1
+ if retry_count >= max_retries:
+ break
+ import time
+
+ time.sleep(1.0)
+
+ return False
+
+
+def process_send_update_email(subject: str, body_html: str) -> str:
+ """Send update email to all subscribers and log newsletter content."""
+ try:
+ subscribers = get_all_emails()
+ if not subscribers:
+ return "No subscribers found."
+
+ failures = []
+ for email in subscribers:
+ if not send_update_email(subject, body_html, email):
+ failures.append(email)
+
+ try:
+ conn = get_connection()
+ cursor = conn.cursor()
+ cursor.execute(
+ "INSERT INTO newsletters (subject, body) VALUES (%s, %s)",
+ (subject, body_html),
+ )
+ conn.commit()
+ cursor.close()
+ conn.close()
+ except Exception:
+ pass
+
+ if failures:
+ return f"Sent with failures: {len(failures)} recipients failed."
return "Email has been sent to all subscribers."
except Exception as e:
- logger.exception("Error processing sending updates")
return f"Failed to send email: {e}"
-@app.route("/")
+@app.route("/", methods=["GET"])
@login_required
def index():
- """Displays all subscriber emails"""
- emails = get_all_emails()
- return render_template("admin_index.html", emails=emails)
+ """Dashboard: list subscriber emails and show widgets."""
+ emails = []
+ try:
+ emails = get_all_emails()
+ except Exception:
+ flash("Could not load subscribers right now.", "danger")
+
+ counts = get_dashboard_counts()
+ return render_template("admin_index.html", emails=emails, counts=counts)
+
@app.route("/send_update", methods=["GET", "POST"])
@login_required
def send_update():
- """Display a form to send an update email; process submission on POST."""
if request.method == "POST":
- subject = request.form["subject"]
- body = request.form["body"]
- result_message = process_send_update_email(subject, body)
- flash(result_message)
+ subject = (request.form.get("subject") or "").strip()
+ body_html = request.form.get("body") or ""
+
+ if not subject or not body_html:
+ flash("Subject and body are required", "danger")
+ return redirect(url_for("send_update"))
+
+ result_message = process_send_update_email(subject, body_html)
+ flash(escape(result_message))
return redirect(url_for("send_update"))
+
return render_template("send_update.html")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
- username = request.form.get("username")
- password = request.form.get("password")
+ username = (request.form.get("username") or "").strip()
+ password = (request.form.get("password") or "").strip()
+
+ if not username or not password:
+ flash("Username and password are required", "danger")
+ return redirect(url_for("login"))
+
admin = get_admin(username)
if admin and check_password_hash(admin[1], password):
session["username"] = username
+ session.permanent = True
+ app.config["SESSION_COOKIE_HTTPONLY"] = True
+ app.config["SESSION_COOKIE_SECURE"] = True
+ app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
+
+ next_url = request.args.get("next")
+ if next_url and is_safe_url(next_url):
+ flash("Logged in successfully", "success")
+ return redirect(next_url)
+
flash("Logged in successfully", "success")
return redirect(url_for("index"))
else:
flash("Invalid username or password", "danger")
return redirect(url_for("login"))
+
return render_template("login.html")
-@app.route("/logout")
+@app.route("/logout", methods=["GET"])
def logout():
session.pop("username", None)
flash("Logged out successfully", "success")
@@ -147,4 +236,9 @@ def logout():
if __name__ == "__main__":
- app.run(port=5001, debug=True)
+ is_prod = os.getenv("ENVIRONMENT", "development").lower() == "production"
+ app.config["PREFERRED_URL_SCHEME"] = "https" if is_prod else "http"
+ if is_prod:
+ app.run(host="0.0.0.0", port=5001, debug=False, use_reloader=False)
+ else:
+ app.run(host="0.0.0.0", port=5001, debug=True, use_reloader=True)
\ No newline at end of file