refactor(app): remove logging, add security, dashboard counts, and robust flows
This commit is contained in:
parent
e36bdc4568
commit
a8589b659f
1 changed files with 169 additions and 75 deletions
244
app.py
244
app.py
|
|
@ -1,7 +1,9 @@
|
||||||
import os
|
import os
|
||||||
import logging
|
|
||||||
import smtplib
|
import smtplib
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
|
from functools import wraps
|
||||||
|
from urllib.parse import urlparse, urljoin
|
||||||
|
|
||||||
from flask import (
|
from flask import (
|
||||||
Flask,
|
Flask,
|
||||||
render_template,
|
render_template,
|
||||||
|
|
@ -11,135 +13,222 @@ from flask import (
|
||||||
flash,
|
flash,
|
||||||
session,
|
session,
|
||||||
)
|
)
|
||||||
|
from markupsafe import escape
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from werkzeug.security import check_password_hash
|
from werkzeug.security import check_password_hash
|
||||||
from functools import wraps # Import wraps
|
|
||||||
from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin
|
from database import (
|
||||||
|
get_connection,
|
||||||
|
init_db,
|
||||||
|
get_all_emails,
|
||||||
|
get_admin,
|
||||||
|
create_default_admin,
|
||||||
|
)
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
app.secret_key = os.getenv("SECRET_KEY")
|
app.secret_key = os.getenv("SECRET_KEY")
|
||||||
base_url = os.getenv("BASE_URL")
|
base_url = os.getenv("BASE_URL", "").strip().strip("/")
|
||||||
|
|
||||||
# SMTP settings (for sending update emails)
|
|
||||||
SMTP_SERVER = os.getenv("SMTP_SERVER")
|
SMTP_SERVER = os.getenv("SMTP_SERVER")
|
||||||
SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
|
SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
|
||||||
SMTP_USER = os.getenv("SMTP_USER")
|
SMTP_USER = os.getenv("SMTP_USER")
|
||||||
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
|
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
|
||||||
SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) # Use SENDER_EMAIL
|
SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER)
|
||||||
|
|
||||||
# Logging setup
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
||||||
)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Initialize the database and create default admin user if necessary.
|
|
||||||
init_db()
|
init_db()
|
||||||
create_default_admin()
|
create_default_admin()
|
||||||
|
|
||||||
# Decorator for requiring login
|
|
||||||
def login_required(f):
|
def login_required(f):
|
||||||
@wraps(f) # Use wraps to preserve function metadata
|
@wraps(f)
|
||||||
def decorated_function(*args, **kwargs):
|
def decorated_function(*args, **kwargs):
|
||||||
if "username" not in session:
|
if "username" not in session:
|
||||||
return redirect(url_for("login"))
|
next_url = request.full_path if request.query_string else request.path
|
||||||
|
return redirect(url_for("login", next=next_url))
|
||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
return decorated_function
|
return decorated_function
|
||||||
|
|
||||||
|
|
||||||
def send_update_email(subject, body, email):
|
def get_dashboard_counts():
|
||||||
"""Sends email, returns True on success, False on failure."""
|
"""Return dict of counts: total subscribers, total newsletters, sent today."""
|
||||||
|
counts = {"total_subscribers": 0, "total_newsletters": 0, "sent_today": 0}
|
||||||
try:
|
try:
|
||||||
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
|
|
||||||
server.set_debuglevel(False) # Keep debug level at False for production
|
|
||||||
server.login(SMTP_USER, SMTP_PASSWORD)
|
|
||||||
|
|
||||||
unsub_link = f"https://{base_url}/unsubscribe?email={email}"
|
|
||||||
custom_body = (
|
|
||||||
f"{body}<br><br>"
|
|
||||||
f"If you ever wish to unsubscribe, please click <a href='{unsub_link}'>here</a>"
|
|
||||||
)
|
|
||||||
|
|
||||||
msg = MIMEText(custom_body, "html", "utf-8")
|
|
||||||
msg["Subject"] = subject
|
|
||||||
msg["From"] = SENDER_EMAIL # Use sender email
|
|
||||||
msg["To"] = email
|
|
||||||
|
|
||||||
server.sendmail(SENDER_EMAIL, email, msg.as_string()) # Use sender email
|
|
||||||
|
|
||||||
server.quit()
|
|
||||||
logger.info(f"Update email sent to: {email}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to send email to {email}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def process_send_update_email(subject, body):
|
|
||||||
"""Helper function to send an update email to all subscribers."""
|
|
||||||
subscribers = get_all_emails()
|
|
||||||
if not subscribers:
|
|
||||||
return "No subscribers found."
|
|
||||||
try:
|
|
||||||
for email in subscribers:
|
|
||||||
if not send_update_email(subject, body, email):
|
|
||||||
return f"Failed to send to {email}" # Specific failure message
|
|
||||||
|
|
||||||
# Log newsletter content for audit purposes
|
|
||||||
conn = get_connection()
|
conn = get_connection()
|
||||||
cursor = conn.cursor()
|
cur = conn.cursor()
|
||||||
cursor.execute(
|
cur.execute("SELECT COUNT(*) FROM subscribers")
|
||||||
"INSERT INTO newsletters (subject, body) VALUES (%s, %s)", (subject, body)
|
counts["total_subscribers"] = cur.fetchone()[0] or 0
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
cursor.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
cur.execute("SELECT COUNT(*) FROM newsletters")
|
||||||
|
counts["total_newsletters"] = cur.fetchone()[0] or 0
|
||||||
|
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM newsletters
|
||||||
|
WHERE sent_at::date = CURRENT_DATE
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
counts["sent_today"] = cur.fetchone()[0] or 0
|
||||||
|
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
def is_safe_url(target: str) -> bool:
|
||||||
|
if not target:
|
||||||
|
return False
|
||||||
|
ref = urlparse(request.host_url)
|
||||||
|
test = urlparse(urljoin(request.host_url, target))
|
||||||
|
return test.scheme in ("http", "https") and ref.netloc == test.netloc
|
||||||
|
|
||||||
|
|
||||||
|
def send_update_email(subject: str, body_html: str, email: str) -> bool:
|
||||||
|
"""Send a single HTML email with retries."""
|
||||||
|
max_retries = 3
|
||||||
|
retry_count = 0
|
||||||
|
|
||||||
|
unsub_link = ""
|
||||||
|
if base_url:
|
||||||
|
unsub_link = f"https://{base_url}/unsubscribe?email={email}"
|
||||||
|
|
||||||
|
if unsub_link:
|
||||||
|
custom_body = (
|
||||||
|
f"{body_html}"
|
||||||
|
f"<br><br>"
|
||||||
|
f"If you ever wish to unsubscribe, please click "
|
||||||
|
f"<a href='{unsub_link}'>here</a>."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
custom_body = body_html
|
||||||
|
|
||||||
|
while retry_count < max_retries:
|
||||||
|
try:
|
||||||
|
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
|
||||||
|
server.set_debuglevel(0)
|
||||||
|
server.login(SMTP_USER, SMTP_PASSWORD)
|
||||||
|
|
||||||
|
msg = MIMEText(custom_body, "html", "utf-8")
|
||||||
|
msg["Subject"] = subject
|
||||||
|
msg["From"] = SENDER_EMAIL
|
||||||
|
msg["To"] = email
|
||||||
|
|
||||||
|
server.sendmail(SENDER_EMAIL, [email], msg.as_string())
|
||||||
|
server.quit()
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
retry_count += 1
|
||||||
|
if retry_count >= max_retries:
|
||||||
|
break
|
||||||
|
import time
|
||||||
|
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def process_send_update_email(subject: str, body_html: str) -> str:
|
||||||
|
"""Send update email to all subscribers and log newsletter content."""
|
||||||
|
try:
|
||||||
|
subscribers = get_all_emails()
|
||||||
|
if not subscribers:
|
||||||
|
return "No subscribers found."
|
||||||
|
|
||||||
|
failures = []
|
||||||
|
for email in subscribers:
|
||||||
|
if not send_update_email(subject, body_html, email):
|
||||||
|
failures.append(email)
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = get_connection()
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT INTO newsletters (subject, body) VALUES (%s, %s)",
|
||||||
|
(subject, body_html),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
cursor.close()
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if failures:
|
||||||
|
return f"Sent with failures: {len(failures)} recipients failed."
|
||||||
return "Email has been sent to all subscribers."
|
return "Email has been sent to all subscribers."
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception("Error processing sending updates")
|
|
||||||
return f"Failed to send email: {e}"
|
return f"Failed to send email: {e}"
|
||||||
|
|
||||||
|
|
||||||
@app.route("/")
|
@app.route("/", methods=["GET"])
|
||||||
@login_required
|
@login_required
|
||||||
def index():
|
def index():
|
||||||
"""Displays all subscriber emails"""
|
"""Dashboard: list subscriber emails and show widgets."""
|
||||||
emails = get_all_emails()
|
emails = []
|
||||||
return render_template("admin_index.html", emails=emails)
|
try:
|
||||||
|
emails = get_all_emails()
|
||||||
|
except Exception:
|
||||||
|
flash("Could not load subscribers right now.", "danger")
|
||||||
|
|
||||||
|
counts = get_dashboard_counts()
|
||||||
|
return render_template("admin_index.html", emails=emails, counts=counts)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/send_update", methods=["GET", "POST"])
|
@app.route("/send_update", methods=["GET", "POST"])
|
||||||
@login_required
|
@login_required
|
||||||
def send_update():
|
def send_update():
|
||||||
"""Display a form to send an update email; process submission on POST."""
|
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
subject = request.form["subject"]
|
subject = (request.form.get("subject") or "").strip()
|
||||||
body = request.form["body"]
|
body_html = request.form.get("body") or ""
|
||||||
result_message = process_send_update_email(subject, body)
|
|
||||||
flash(result_message)
|
if not subject or not body_html:
|
||||||
|
flash("Subject and body are required", "danger")
|
||||||
|
return redirect(url_for("send_update"))
|
||||||
|
|
||||||
|
result_message = process_send_update_email(subject, body_html)
|
||||||
|
flash(escape(result_message))
|
||||||
return redirect(url_for("send_update"))
|
return redirect(url_for("send_update"))
|
||||||
|
|
||||||
return render_template("send_update.html")
|
return render_template("send_update.html")
|
||||||
|
|
||||||
|
|
||||||
@app.route("/login", methods=["GET", "POST"])
|
@app.route("/login", methods=["GET", "POST"])
|
||||||
def login():
|
def login():
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
username = request.form.get("username")
|
username = (request.form.get("username") or "").strip()
|
||||||
password = request.form.get("password")
|
password = (request.form.get("password") or "").strip()
|
||||||
|
|
||||||
|
if not username or not password:
|
||||||
|
flash("Username and password are required", "danger")
|
||||||
|
return redirect(url_for("login"))
|
||||||
|
|
||||||
admin = get_admin(username)
|
admin = get_admin(username)
|
||||||
if admin and check_password_hash(admin[1], password):
|
if admin and check_password_hash(admin[1], password):
|
||||||
session["username"] = username
|
session["username"] = username
|
||||||
|
session.permanent = True
|
||||||
|
app.config["SESSION_COOKIE_HTTPONLY"] = True
|
||||||
|
app.config["SESSION_COOKIE_SECURE"] = True
|
||||||
|
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
|
||||||
|
|
||||||
|
next_url = request.args.get("next")
|
||||||
|
if next_url and is_safe_url(next_url):
|
||||||
|
flash("Logged in successfully", "success")
|
||||||
|
return redirect(next_url)
|
||||||
|
|
||||||
flash("Logged in successfully", "success")
|
flash("Logged in successfully", "success")
|
||||||
return redirect(url_for("index"))
|
return redirect(url_for("index"))
|
||||||
else:
|
else:
|
||||||
flash("Invalid username or password", "danger")
|
flash("Invalid username or password", "danger")
|
||||||
return redirect(url_for("login"))
|
return redirect(url_for("login"))
|
||||||
|
|
||||||
return render_template("login.html")
|
return render_template("login.html")
|
||||||
|
|
||||||
|
|
||||||
@app.route("/logout")
|
@app.route("/logout", methods=["GET"])
|
||||||
def logout():
|
def logout():
|
||||||
session.pop("username", None)
|
session.pop("username", None)
|
||||||
flash("Logged out successfully", "success")
|
flash("Logged out successfully", "success")
|
||||||
|
|
@ -147,4 +236,9 @@ def logout():
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(port=5001, debug=True)
|
is_prod = os.getenv("ENVIRONMENT", "development").lower() == "production"
|
||||||
|
app.config["PREFERRED_URL_SCHEME"] = "https" if is_prod else "http"
|
||||||
|
if is_prod:
|
||||||
|
app.run(host="0.0.0.0", port=5001, debug=False, use_reloader=False)
|
||||||
|
else:
|
||||||
|
app.run(host="0.0.0.0", port=5001, debug=True, use_reloader=True)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue