Merge pull request 'Refactor and Enhance Admin Panel: Security, Logging, and Code Quality Improvements' (#2) from newsletter into main

Reviewed-on: https://brew.bsd.cafe/RideAware/admin-panel/pulls/2
This commit is contained in:
blake 2025-04-03 20:26:10 +02:00
commit b64644cb21
7 changed files with 323 additions and 155 deletions

View file

@ -16,4 +16,3 @@ ENV FLASK_APP=server.py
EXPOSE 5001 EXPOSE 5001
CMD ["gunicorn", "--bind", "0.0.0.0:5001", "app:app"] CMD ["gunicorn", "--bind", "0.0.0.0:5001", "app:app"]

125
app.py
View file

@ -1,103 +1,136 @@
import os import os
import logging
import smtplib import smtplib
from email.mime.text import MIMEText from email.mime.text import MIMEText
from flask import Flask, render_template, request, redirect, url_for, flash, session from flask import (
Flask,
render_template,
request,
redirect,
url_for,
flash,
session,
)
from dotenv import load_dotenv from dotenv import load_dotenv
from werkzeug.security import check_password_hash from werkzeug.security import check_password_hash
from functools import wraps # Import wraps
from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin from database import get_connection, init_db, get_all_emails, get_admin, create_default_admin
load_dotenv() load_dotenv()
app = Flask(__name__) app = Flask(__name__)
# Use a secret key from .env; ensure your .env sets SECRET_KEY app.secret_key = os.getenv("SECRET_KEY")
app.secret_key = os.getenv('SECRET_KEY') base_url = os.getenv("BASE_URL")
base_url = os.getenv('BASE_URL')
# SMTP settings (for sending update emails) # SMTP settings (for sending update emails)
SMTP_SERVER = os.getenv('SMTP_SERVER') SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
SMTP_USER = os.getenv('SMTP_USER') SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv('SMTP_PASSWORD') SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) # Use SENDER_EMAIL
# Logging setup
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize the database and create default admin user if necessary. # Initialize the database and create default admin user if necessary.
init_db() init_db()
create_default_admin() create_default_admin()
# Decorator for requiring login
def login_required(f): def login_required(f):
from functools import wraps @wraps(f) # Use wraps to preserve function metadata
@wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if "username" not in session: if "username" not in session:
return redirect(url_for('login')) return redirect(url_for("login"))
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function
def send_update_email(subject, body, email):
"""Sends email, returns True on success, False on failure."""
try:
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
server.set_debuglevel(False) # Keep debug level at False for production
server.login(SMTP_USER, SMTP_PASSWORD)
unsub_link = f"https://{base_url}/unsubscribe?email={email}"
custom_body = (
f"{body}<br><br>"
f"If you ever wish to unsubscribe, please click <a href='{unsub_link}'>here</a>"
)
msg = MIMEText(custom_body, "html", "utf-8")
msg["Subject"] = subject
msg["From"] = SENDER_EMAIL # Use sender email
msg["To"] = email
server.sendmail(SENDER_EMAIL, email, msg.as_string()) # Use sender email
server.quit()
logger.info(f"Update email sent to: {email}")
return True
except Exception as e:
logger.error(f"Failed to send email to {email}: {e}")
return False
def process_send_update_email(subject, body): def process_send_update_email(subject, body):
"""Helper function to send an update email to all subscribers.""" """Helper function to send an update email to all subscribers."""
subscribers = get_all_emails() subscribers = get_all_emails()
if not subscribers: if not subscribers:
return "No subscribers found." return "No subscribers found."
try: try:
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
server.set_debuglevel(True)
server.login(SMTP_USER, SMTP_PASSWORD)
for email in subscribers: for email in subscribers:
unsub_link = f"https://{base_url}/unsubscribe?email={email}" if not send_update_email(subject, body, email):
custom_body = ( return f"Failed to send to {email}" # Specific failure message
f"{body}<br><br>"
f"If you ever wish to unsubscribe, please click <a href='{unsub_link}'>here</a>"
)
msg = MIMEText(custom_body, 'html', 'utf-8')
msg['Subject'] = subject
msg['From'] = SMTP_USER
msg['To'] = email
server.sendmail(SMTP_USER, email, msg.as_string())
print(f"Update email sent to: {email}")
server.quit()
# Log newsletter content for audit purposes
conn = get_connection() conn = get_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( cursor.execute(
"INSERT INTO newsletters (subject, body) VALUES (%s, %s)", "INSERT INTO newsletters (subject, body) VALUES (%s, %s)", (subject, body)
(subject, body) )
)
conn.commit() conn.commit()
cursor.close() cursor.close()
conn.close() conn.close()
return "Email has been sent." return "Email has been sent to all subscribers."
except Exception as e: except Exception as e:
print(f"Failed to send email: {e}") logger.exception("Error processing sending updates")
return f"Failed to send email: {e}" return f"Failed to send email: {e}"
@app.route('/')
@app.route("/")
@login_required @login_required
def index(): def index():
"""Displays all subscriber emails""" """Displays all subscriber emails"""
emails = get_all_emails() emails = get_all_emails()
return render_template("admin_index.html", emails=emails) return render_template("admin_index.html", emails=emails)
@app.route('/send_update', methods=['GET', 'POST']) @app.route("/send_update", methods=["GET", "POST"])
@login_required @login_required
def send_update(): def send_update():
"""Display a form to send an update email; process submission on POST.""" """Display a form to send an update email; process submission on POST."""
if request.method == 'POST': if request.method == "POST":
subject = request.form['subject'] subject = request.form["subject"]
body = request.form['body'] body = request.form["body"]
# Call the helper function using its new name.
result_message = process_send_update_email(subject, body) result_message = process_send_update_email(subject, body)
flash(result_message) flash(result_message)
return redirect(url_for("send_update")) return redirect(url_for("send_update"))
return render_template("send_update.html") return render_template("send_update.html")
@app.route('/login', methods=['GET', 'POST'])
@app.route("/login", methods=["GET", "POST"])
def login(): def login():
if request.method == 'POST': if request.method == "POST":
username = request.form.get('username') username = request.form.get("username")
password = request.form.get('password') password = request.form.get("password")
admin = get_admin(username) admin = get_admin(username)
# Expect get_admin() to return a tuple like (username, password_hash)
if admin and check_password_hash(admin[1], password): if admin and check_password_hash(admin[1], password):
session['username'] = username session["username"] = username
flash("Logged in successfully", "success") flash("Logged in successfully", "success")
return redirect(url_for("index")) return redirect(url_for("index"))
else: else:
@ -105,11 +138,13 @@ def login():
return redirect(url_for("login")) return redirect(url_for("login"))
return render_template("login.html") return render_template("login.html")
@app.route('/logout')
@app.route("/logout")
def logout(): def logout():
session.pop('username', None) session.pop("username", None)
flash("Logged out successfully", "success") flash("Logged out successfully", "success")
return redirect(url_for("login")) return redirect(url_for("login"))
if __name__ == '__main__':
if __name__ == "__main__":
app.run(port=5001, debug=True) app.run(port=5001, debug=True)

View file

@ -1,43 +1,89 @@
import os import os
import logging
import psycopg2 import psycopg2
from psycopg2 import IntegrityError from psycopg2 import IntegrityError
from dotenv import load_dotenv from dotenv import load_dotenv
from werkzeug.security import generate_password_hash from werkzeug.security import generate_password_hash
load_dotenv() load_dotenv()
# Logging setup
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def get_connection(): def get_connection():
"""Return a new connection to the PostgreSQL database.""" """Return a new connection to the PostgreSQL database."""
return psycopg2.connect( try:
host=os.getenv("PG_HOST"), conn = psycopg2.connect(
port=os.getenv("PG_PORT"), host=os.getenv("PG_HOST"),
dbname=os.getenv("PG_DATABASE"), port=os.getenv("PG_PORT"),
user=os.getenv("PG_USER"), dbname=os.getenv("PG_DATABASE"),
password=os.getenv("PG_PASSWORD"), user=os.getenv("PG_USER"),
connect_timeout=10 password=os.getenv("PG_PASSWORD"),
) connect_timeout=10,
)
return conn
except Exception as e:
logger.error(f"Database connection error: {e}")
raise
def init_db(): def init_db():
"""Initialize the database tables.""" """Initialize the database tables."""
conn = get_connection() conn = None
cursor = conn.cursor() try:
# Create subscribers table (if not exists) conn = get_connection()
cursor.execute(""" cursor = conn.cursor()
CREATE TABLE IF NOT EXISTS subscribers (
id SERIAL PRIMARY KEY, # Create subscribers table (if not exists)
email TEXT UNIQUE NOT NULL cursor.execute(
"""
CREATE TABLE IF NOT EXISTS subscribers (
id SERIAL PRIMARY KEY,
email TEXT UNIQUE NOT NULL
)
"""
) )
""")
# Create admin_users table (if not exists) # Create admin_users table (if not exists)
cursor.execute(""" cursor.execute(
CREATE TABLE IF NOT EXISTS admin_users ( """
id SERIAL PRIMARY KEY, CREATE TABLE IF NOT EXISTS admin_users (
username TEXT UNIQUE NOT NULL, id SERIAL PRIMARY KEY,
password TEXT NOT NULL username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
)
"""
) )
""")
conn.commit() # Newsletter storage
cursor.close() cursor.execute(
conn.close() """
CREATE TABLE IF NOT EXISTS newsletters (
id SERIAL PRIMARY KEY,
subject TEXT NOT NULL,
body TEXT NOT NULL,
sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
logger.info("Database initialized successfully.")
except Exception as e:
logger.error(f"Database initialization error: {e}")
if conn:
conn.rollback() # Rollback if there's an error
raise
finally:
if conn:
cursor.close()
conn.close()
def get_all_emails(): def get_all_emails():
"""Return a list of all subscriber emails.""" """Return a list of all subscriber emails."""
@ -46,78 +92,112 @@ def get_all_emails():
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT email FROM subscribers") cursor.execute("SELECT email FROM subscribers")
results = cursor.fetchall() results = cursor.fetchall()
cursor.close() emails = [row[0] for row in results]
conn.close() logger.debug(f"Retrieved emails: {emails}")
return [row[0] for row in results] return emails
except Exception as e: except Exception as e:
print(f"Error retrieving emails: {e}") logger.error(f"Error retrieving emails: {e}")
return [] return []
finally:
if conn:
cursor.close()
conn.close()
def add_email(email): def add_email(email):
"""Insert an email into the subscribers table.""" """Insert an email into the subscribers table."""
conn = None
try: try:
conn = get_connection() conn = get_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,)) cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,))
conn.commit() conn.commit()
cursor.close() logger.info(f"Email {email} added successfully.")
conn.close()
return True return True
except IntegrityError: except IntegrityError:
logger.warning(f"Attempted to add duplicate email: {email}")
return False return False
except Exception as e: except Exception as e:
print(f"Error adding email: {e}") logger.error(f"Error adding email {email}: {e}")
return False return False
finally:
if conn:
cursor.close()
conn.close()
def remove_email(email): def remove_email(email):
"""Remove an email from the subscribers table.""" """Remove an email from the subscribers table."""
conn = None
try: try:
conn = get_connection() conn = get_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,)) cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,))
conn.commit()
rowcount = cursor.rowcount rowcount = cursor.rowcount
cursor.close() conn.commit()
conn.close() logger.info(f"Email {email} removed successfully.")
return rowcount > 0 return rowcount > 0
except Exception as e: except Exception as e:
print(f"Error removing email: {e}") logger.error(f"Error removing email {email}: {e}")
return False return False
finally:
if conn:
cursor.close()
conn.close()
def get_admin(username): def get_admin(username):
"""Retrieve admin credentials for a given username. """Retrieve admin credentials for a given username.
Returns a tuple (username, password_hash) if found, otherwise None. Returns a tuple (username, password_hash) if found, otherwise None.
""" """
conn = None
try: try:
conn = get_connection() conn = get_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT username, password FROM admin_users WHERE username = %s", (username,)) cursor.execute(
"SELECT username, password FROM admin_users WHERE username = %s",
(username,),
)
result = cursor.fetchone() result = cursor.fetchone()
cursor.close()
conn.close()
return result # (username, password_hash) return result # (username, password_hash)
except Exception as e: except Exception as e:
print(f"Error retrieving admin: {e}") logger.error(f"Error retrieving admin: {e}")
return None return None
finally:
if conn:
cursor.close()
conn.close()
def create_default_admin(): def create_default_admin():
"""Create a default admin user if one doesn't already exist.""" """Create a default admin user if one doesn't already exist."""
default_username = os.getenv("ADMIN_USERNAME", "admin") default_username = os.getenv("ADMIN_USERNAME", "admin")
default_password = os.getenv("ADMIN_PASSWORD", "changeme") default_password = os.getenv("ADMIN_PASSWORD", "changeme")
hashed = generate_password_hash(default_password, method="pbkdf2:sha256") hashed_password = generate_password_hash(default_password, method="pbkdf2:sha256")
conn = None
try: try:
conn = get_connection() conn = get_connection()
cursor = conn.cursor() cursor = conn.cursor()
# Check if the admin already exists # Check if the admin already exists
cursor.execute("SELECT id FROM admin_users WHERE username = %s", (default_username,)) cursor.execute(
"SELECT id FROM admin_users WHERE username = %s", (default_username,)
)
if cursor.fetchone() is None: if cursor.fetchone() is None:
cursor.execute("INSERT INTO admin_users (username, password) VALUES (%s, %s)", cursor.execute(
(default_username, hashed)) "INSERT INTO admin_users (username, password) VALUES (%s, %s)",
(default_username, hashed_password),
)
conn.commit() conn.commit()
print("Default admin created successfully") logger.info("Default admin created successfully")
else: else:
print("Default admin already exists") logger.info("Default admin already exists")
cursor.close()
conn.close()
except Exception as e: except Exception as e:
print(f"Error creating default admin: {e}") logger.error(f"Error creating default admin: {e}")
if conn:
conn.rollback()
finally:
if conn:
cursor.close()
conn.close()

55
static/css/style.css Normal file
View file

@ -0,0 +1,55 @@
body {
font-family: Arial, sans-serif;
padding: 20px;
}
table {
border-collapse: collapse;
width: 100%;
}
th,
td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #f2f2f2;
}
a {
margin-right: 10px;
}
form {
max-width: 600px;
margin: 0 auto;
}
label {
display: block;
margin-top: 15px;
}
input[type="text"],
input[type="password"],
textarea {
width: 100%;
padding: 8px;
}
button {
margin-top: 15px;
padding: 10px 20px;
}
.flash {
background-color: #f8d7da;
color: #721c24;
padding: 10px;
margin-bottom: 10px;
text-align: center;
}

View file

@ -4,30 +4,41 @@
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin Center - Subscribers</title> <title>Admin Center - Subscribers</title>
<style> <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
body { font-family: Arial, sans-serif; padding: 20px; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
a { margin-right: 10px; }
</style>
</head> </head>
<body> <body>
<h1>Subscribers</h1> <h1>Subscribers</h1>
<p><a href="{{ url_for('send_update') }}">Send Update Email</a></p> <p>
{% if emails %} <a href="{{ url_for('send_update') }}">Send Update Email</a>|
<table> <a href="{{ url_for('logout') }}">Logout</a>
<tr> </p>
<th>Email Address</th>
</tr> {% with messages = get_flashed_messages(with_categories=true) %}
{% for email in emails %} {% if messages %}
<tr> {% for category, message in messages %}
<td>{{ email }}</td> <div class="flash">{{ message }}</div>
</tr> {% endfor %}
{% endfor %} {% endif %}
</table> {% endwith %}
{% else %}
<p>No subscribers found.</p> {% if emails %}
{% endif %} <table>
<thead>
<tr>
<th>Email Address</th>
</tr>
</thead>
<tbody>
{% for email in emails %}
<tr>
<td>{{ email }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No subscribers found.</p>
{% endif %}
</body> </body>
</html> </html>

View file

@ -1,26 +1,21 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8" /> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin Login</title> <title>Admin Login</title>
<style> <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
body { font-family: Arial, sans-serif; padding: 20px; }
form { max-width: 400px; margin: 0 auto; }
label { display: block; margin-top: 15px; }
input[type="text"], input[type="password"] { width: 100%; padding: 8px; }
button { margin-top: 15px; padding: 10px 20px; }
.flash { background-color: #f8d7da; color: #721c24; padding: 10px; margin-bottom: 10px; text-align: center; }
</style>
</head> </head>
<body> <body>
<h1>Admin Login</h1> <h1>Admin Login</h1>
{% with messages = get_flashed_messages(with_categories=true) %} {% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %} {% if messages %}
{% for category, message in messages %} {% for category, message in messages %}
<div class="flash">{{ message }}</div> <div class="flash">{{ message }}</div>
{% endfor %} {% endfor %}
{% endif %} {% endif %}
{% endwith %} {% endwith %}
<form action="{{ url_for('login') }}" method="POST"> <form action="{{ url_for('login') }}" method="POST">
<label for="username">Username:</label> <label for="username">Username:</label>
@ -30,4 +25,5 @@
<button type="submit">Login</button> <button type="submit">Login</button>
</form> </form>
</body> </body>
</html> </html>

View file

@ -1,25 +1,19 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8" /> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin Center - Send Update</title> <title>Admin Center - Send Update</title>
<style> <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
body { font-family: Arial, sans-serif; padding: 20px; }
form { max-width: 600px; }
label { display: block; margin-top: 15px; }
input[type="text"], textarea { width: 100%; padding: 8px; }
button { margin-top: 15px; padding: 10px 20px; }
.flash {
background-color: #f8d7da;
color: #721c24;
padding: 10px;
margin-bottom: 10px;
}
</style>
</head> </head>
<body> <body>
<h1>Send Update Email</h1> <h1>Send Update Email</h1>
<p>
<a href="{{ url_for('index') }}">Back to Subscribers List</a> |
<a href="{{ url_for('logout') }}">Logout</a>
</p>
{% with messages = get_flashed_messages() %} {% with messages = get_flashed_messages() %}
{% if messages %} {% if messages %}
{% for message in messages %} {% for message in messages %}
@ -37,9 +31,7 @@
<button type="submit">Send Update</button> <button type="submit">Send Update</button>
</form> </form>
<p>
<a href="{{ url_for('index') }}">Back to Subscribers List</a> |
<a href="{{ url_for('logout') }}">Logout</a>
</p>
</body> </body>
</html> </html>