admin-panel/database.py
2025-08-25 21:32:45 -05:00

522 lines
No EOL
19 KiB
Python

import os
import logging
import psycopg2
from psycopg2 import IntegrityError, pool
from dotenv import load_dotenv
from werkzeug.security import generate_password_hash
from contextlib import contextmanager
from datetime import datetime, timezone
load_dotenv()
# Logging setup
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Connection pool for better performance
connection_pool = None
def init_connection_pool():
"""Initialize the connection pool."""
global connection_pool
try:
connection_pool = psycopg2.pool.ThreadedConnectionPool(
1, 20, # min and max connections
host=os.getenv("PG_HOST"),
port=os.getenv("PG_PORT"),
dbname=os.getenv("PG_DATABASE"),
user=os.getenv("PG_USER"),
password=os.getenv("PG_PASSWORD"),
)
logger.info("Connection pool created successfully")
except Exception as e:
logger.error(f"Connection pool creation error: {e}")
raise
@contextmanager
def get_db_connection():
"""Context manager for database connections."""
if connection_pool is None:
init_connection_pool()
conn = None
try:
conn = connection_pool.getconn()
yield conn
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Database operation error: {e}")
raise
finally:
if conn:
connection_pool.putconn(conn)
def check_column_exists(cursor, table_name, column_name):
"""Check if a column exists in a table."""
cursor.execute(
"""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_name = %s AND column_name = %s
)
""",
(table_name, column_name)
)
return cursor.fetchone()[0]
def migrate_database(conn):
"""Apply database migrations to upgrade existing schema."""
cursor = conn.cursor()
# Migration 1: Add new columns to subscribers table
if not check_column_exists(cursor, 'subscribers', 'subscribed_at'):
logger.info("Adding subscribed_at column to subscribers table")
cursor.execute(
"ALTER TABLE subscribers ADD COLUMN subscribed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP"
)
if not check_column_exists(cursor, 'subscribers', 'status'):
logger.info("Adding status column to subscribers table")
cursor.execute(
"ALTER TABLE subscribers ADD COLUMN status TEXT DEFAULT 'active'"
)
# Add check constraint
cursor.execute(
"ALTER TABLE subscribers ADD CONSTRAINT subscribers_status_check CHECK (status IN ('active', 'unsubscribed'))"
)
# Update existing rows
cursor.execute("UPDATE subscribers SET status = 'active' WHERE status IS NULL")
if not check_column_exists(cursor, 'subscribers', 'source'):
logger.info("Adding source column to subscribers table")
cursor.execute(
"ALTER TABLE subscribers ADD COLUMN source TEXT DEFAULT 'manual'"
)
# Migration 2: Add new columns to admin_users table
if not check_column_exists(cursor, 'admin_users', 'created_at'):
logger.info("Adding created_at column to admin_users table")
cursor.execute(
"ALTER TABLE admin_users ADD COLUMN created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP"
)
if not check_column_exists(cursor, 'admin_users', 'last_login'):
logger.info("Adding last_login column to admin_users table")
cursor.execute(
"ALTER TABLE admin_users ADD COLUMN last_login TIMESTAMP WITH TIME ZONE"
)
if not check_column_exists(cursor, 'admin_users', 'is_active'):
logger.info("Adding is_active column to admin_users table")
cursor.execute(
"ALTER TABLE admin_users ADD COLUMN is_active BOOLEAN DEFAULT TRUE"
)
# Migration 3: Add new columns to newsletters table
if not check_column_exists(cursor, 'newsletters', 'sent_by'):
logger.info("Adding sent_by column to newsletters table")
cursor.execute(
"ALTER TABLE newsletters ADD COLUMN sent_by TEXT"
)
if not check_column_exists(cursor, 'newsletters', 'recipient_count'):
logger.info("Adding recipient_count column to newsletters table")
cursor.execute(
"ALTER TABLE newsletters ADD COLUMN recipient_count INTEGER DEFAULT 0"
)
if not check_column_exists(cursor, 'newsletters', 'success_count'):
logger.info("Adding success_count column to newsletters table")
cursor.execute(
"ALTER TABLE newsletters ADD COLUMN success_count INTEGER DEFAULT 0"
)
if not check_column_exists(cursor, 'newsletters', 'failure_count'):
logger.info("Adding failure_count column to newsletters table")
cursor.execute(
"ALTER TABLE newsletters ADD COLUMN failure_count INTEGER DEFAULT 0"
)
conn.commit()
logger.info("Database migrations completed successfully")
def init_db():
"""Initialize the database tables with improved schema."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# Create basic subscribers table (backwards compatible)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS subscribers (
id SERIAL PRIMARY KEY,
email TEXT UNIQUE NOT NULL
)
"""
)
# Create basic admin_users table (backwards compatible)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS admin_users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
)
"""
)
# Create basic newsletters table (backwards compatible)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS newsletters (
id SERIAL PRIMARY KEY,
subject TEXT NOT NULL,
body TEXT NOT NULL,
sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
"""
)
# Email delivery tracking (new table)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS email_deliveries (
id SERIAL PRIMARY KEY,
newsletter_id INTEGER REFERENCES newsletters(id),
email TEXT NOT NULL,
status TEXT CHECK (status IN ('sent', 'failed', 'bounced')),
sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
error_message TEXT
)
"""
)
conn.commit()
logger.info("Basic database tables created successfully")
# Apply migrations to upgrade schema
migrate_database(conn)
# Add indexes after migrations are complete
try:
cursor.execute("CREATE INDEX IF NOT EXISTS idx_subscribers_email ON subscribers(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_subscribers_status ON subscribers(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_newsletters_sent_at ON newsletters(sent_at)")
conn.commit()
logger.info("Database indexes created successfully")
except Exception as e:
logger.warning(f"Some indexes may not have been created: {e}")
# Continue anyway as this is not critical
logger.info("Database initialization completed successfully")
except Exception as e:
logger.error(f"Database initialization error: {e}")
raise
def get_subscriber_stats():
"""Get comprehensive subscriber statistics."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# Check if status column exists
has_status = check_column_exists(cursor, 'subscribers', 'status')
if has_status:
# Get total subscribers with status filtering
cursor.execute("SELECT COUNT(*) FROM subscribers WHERE status = 'active'")
total_active = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM subscribers WHERE status = 'unsubscribed'")
total_unsubscribed = cursor.fetchone()[0]
# Get recent signups (last 30 days) - check if subscribed_at exists
has_subscribed_at = check_column_exists(cursor, 'subscribers', 'subscribed_at')
if has_subscribed_at:
cursor.execute("""
SELECT COUNT(*) FROM subscribers
WHERE subscribed_at >= NOW() - INTERVAL '30 days' AND status = 'active'
""")
recent_signups = cursor.fetchone()[0]
else:
recent_signups = 0
else:
# Fallback for old schema
cursor.execute("SELECT COUNT(*) FROM subscribers")
total_active = cursor.fetchone()[0]
total_unsubscribed = 0
recent_signups = 0
# Get newsletters sent
cursor.execute("SELECT COUNT(*) FROM newsletters")
newsletters_sent = cursor.fetchone()[0]
return {
'total_active': total_active,
'total_unsubscribed': total_unsubscribed,
'recent_signups': recent_signups,
'newsletters_sent': newsletters_sent
}
except Exception as e:
logger.error(f"Error retrieving subscriber stats: {e}")
return {'total_active': 0, 'total_unsubscribed': 0, 'recent_signups': 0, 'newsletters_sent': 0}
def get_all_emails(page=1, per_page=50, search=''):
"""Return paginated list of subscriber emails with search functionality."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# Check which columns exist
has_status = check_column_exists(cursor, 'subscribers', 'status')
has_subscribed_at = check_column_exists(cursor, 'subscribers', 'subscribed_at')
has_source = check_column_exists(cursor, 'subscribers', 'source')
# Calculate offset
offset = (page - 1) * per_page
# Build base query based on available columns
if has_status:
base_query = "FROM subscribers WHERE status = 'active'"
else:
base_query = "FROM subscribers WHERE 1=1"
params = []
if search:
base_query += " AND email ILIKE %s"
params.append(f"%{search}%")
# Get total count
cursor.execute(f"SELECT COUNT(*) {base_query}", params)
total_count = cursor.fetchone()[0]
# Build select query based on available columns
select_fields = ["id", "email"]
if has_subscribed_at:
select_fields.append("subscribed_at")
if has_source:
select_fields.append("source")
# Get paginated results
query = f"""
SELECT {', '.join(select_fields)}
{base_query}
ORDER BY {"subscribed_at DESC" if has_subscribed_at else "id DESC"}
LIMIT %s OFFSET %s
"""
params.extend([per_page, offset])
cursor.execute(query, params)
results = cursor.fetchall()
subscribers = []
for row in results:
subscriber = {
'id': row[0],
'email': row[1],
'subscribed_at': row[2] if has_subscribed_at and len(row) > 2 else None,
'source': row[3] if has_source and len(row) > 3 else 'manual'
}
subscribers.append(subscriber)
return {
'subscribers': subscribers,
'total_count': total_count,
'page': page,
'per_page': per_page,
'total_pages': (total_count + per_page - 1) // per_page
}
except Exception as e:
logger.error(f"Error retrieving emails: {e}")
return {'subscribers': [], 'total_count': 0, 'page': 1, 'per_page': per_page, 'total_pages': 0}
def add_email(email, source='manual'):
"""Insert an email into the subscribers table."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# Check if source column exists
has_source = check_column_exists(cursor, 'subscribers', 'source')
if has_source:
cursor.execute(
"INSERT INTO subscribers (email, source) VALUES (%s, %s)",
(email.lower().strip(), source)
)
else:
cursor.execute(
"INSERT INTO subscribers (email) VALUES (%s)",
(email.lower().strip(),)
)
conn.commit()
logger.info(f"Email {email} added successfully.")
return True
except IntegrityError:
logger.warning(f"Attempted to add duplicate email: {email}")
return False
except Exception as e:
logger.error(f"Error adding email {email}: {e}")
return False
def remove_email(email):
"""Mark email as unsubscribed or delete if status column doesn't exist."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# Check if status column exists
has_status = check_column_exists(cursor, 'subscribers', 'status')
if has_status:
# Mark as unsubscribed
cursor.execute(
"UPDATE subscribers SET status = 'unsubscribed' WHERE email = %s",
(email,)
)
else:
# Delete the record (old behavior)
cursor.execute(
"DELETE FROM subscribers WHERE email = %s",
(email,)
)
rowcount = cursor.rowcount
conn.commit()
logger.info(f"Email {email} {'unsubscribed' if has_status else 'removed'} successfully.")
return rowcount > 0
except Exception as e:
logger.error(f"Error {'unsubscribing' if has_status else 'removing'} email {email}: {e}")
return False
def get_admin(username):
"""Retrieve admin credentials and update last login."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""SELECT id, username, password, is_active
FROM admin_users
WHERE username = %s AND is_active = TRUE""",
(username,),
)
result = cursor.fetchone()
if result:
# Update last login
cursor.execute(
"UPDATE admin_users SET last_login = %s WHERE id = %s",
(datetime.now(timezone.utc), result[0])
)
conn.commit()
return result
except Exception as e:
logger.error(f"Error retrieving admin: {e}")
return None
def create_default_admin():
"""Create a default admin user if one doesn't already exist."""
default_username = os.getenv("ADMIN_USERNAME", "admin")
default_password = os.getenv("ADMIN_PASSWORD", "changeme")
hashed_password = generate_password_hash(default_password, method="pbkdf2:sha256")
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# Check if any admin exists
cursor.execute("SELECT COUNT(*) FROM admin_users WHERE is_active = TRUE")
admin_count = cursor.fetchone()[0]
if admin_count == 0:
cursor.execute(
"INSERT INTO admin_users (username, password) VALUES (%s, %s)",
(default_username, hashed_password),
)
conn.commit()
logger.info("Default admin created successfully")
else:
logger.info("Admin users already exist")
except Exception as e:
logger.error(f"Error creating default admin: {e}")
def save_newsletter(subject, body, sent_by, recipient_count=0):
"""Save newsletter to database and return the ID."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO newsletters (subject, body, sent_by, recipient_count)
VALUES (%s, %s, %s, %s) RETURNING id""",
(subject, body, sent_by, recipient_count)
)
newsletter_id = cursor.fetchone()[0]
conn.commit()
return newsletter_id
except Exception as e:
logger.error(f"Error saving newsletter: {e}")
return None
def update_newsletter_stats(newsletter_id, success_count, failure_count):
"""Update newsletter delivery statistics."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""UPDATE newsletters
SET success_count = %s, failure_count = %s
WHERE id = %s""",
(success_count, failure_count, newsletter_id)
)
conn.commit()
except Exception as e:
logger.error(f"Error updating newsletter stats: {e}")
def log_email_delivery(newsletter_id, email, status, error_message=None):
"""Log individual email delivery attempt."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO email_deliveries (newsletter_id, email, status, error_message)
VALUES (%s, %s, %s, %s)""",
(newsletter_id, email, status, error_message)
)
conn.commit()
except Exception as e:
logger.error(f"Error logging email delivery: {e}")
def get_recent_newsletters(limit=10):
"""Get recent newsletters with statistics."""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""SELECT id, subject, sent_at, sent_by, recipient_count, success_count, failure_count
FROM newsletters
ORDER BY sent_at DESC
LIMIT %s""",
(limit,)
)
results = cursor.fetchall()
return [{
'id': row[0],
'subject': row[1],
'sent_at': row[2],
'sent_by': row[3],
'recipient_count': row[4],
'success_count': row[5],
'failure_count': row[6]
} for row in results]
except Exception as e:
logger.error(f"Error retrieving recent newsletters: {e}")
return []