import os import logging import psycopg2 from psycopg2 import IntegrityError, pool from dotenv import load_dotenv from werkzeug.security import generate_password_hash from contextlib import contextmanager from datetime import datetime, timezone load_dotenv() # Logging setup logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Connection pool for better performance connection_pool = None def init_connection_pool(): """Initialize the connection pool.""" global connection_pool try: connection_pool = psycopg2.pool.ThreadedConnectionPool( 1, 20, # min and max connections host=os.getenv("PG_HOST"), port=os.getenv("PG_PORT"), dbname=os.getenv("PG_DATABASE"), user=os.getenv("PG_USER"), password=os.getenv("PG_PASSWORD"), ) logger.info("Connection pool created successfully") except Exception as e: logger.error(f"Connection pool creation error: {e}") raise @contextmanager def get_db_connection(): """Context manager for database connections.""" if connection_pool is None: init_connection_pool() conn = None try: conn = connection_pool.getconn() yield conn except Exception as e: if conn: conn.rollback() logger.error(f"Database operation error: {e}") raise finally: if conn: connection_pool.putconn(conn) def check_column_exists(cursor, table_name, column_name): """Check if a column exists in a table.""" cursor.execute( """ SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = %s AND column_name = %s ) """, (table_name, column_name) ) return cursor.fetchone()[0] def migrate_database(conn): """Apply database migrations to upgrade existing schema.""" cursor = conn.cursor() # Migration 1: Add new columns to subscribers table if not check_column_exists(cursor, 'subscribers', 'subscribed_at'): logger.info("Adding subscribed_at column to subscribers table") cursor.execute( "ALTER TABLE subscribers ADD COLUMN subscribed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP" ) if not check_column_exists(cursor, 'subscribers', 'status'): logger.info("Adding status column to subscribers table") cursor.execute( "ALTER TABLE subscribers ADD COLUMN status TEXT DEFAULT 'active'" ) # Add check constraint cursor.execute( "ALTER TABLE subscribers ADD CONSTRAINT subscribers_status_check CHECK (status IN ('active', 'unsubscribed'))" ) # Update existing rows cursor.execute("UPDATE subscribers SET status = 'active' WHERE status IS NULL") if not check_column_exists(cursor, 'subscribers', 'source'): logger.info("Adding source column to subscribers table") cursor.execute( "ALTER TABLE subscribers ADD COLUMN source TEXT DEFAULT 'manual'" ) # Migration 2: Add new columns to admin_users table if not check_column_exists(cursor, 'admin_users', 'created_at'): logger.info("Adding created_at column to admin_users table") cursor.execute( "ALTER TABLE admin_users ADD COLUMN created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP" ) if not check_column_exists(cursor, 'admin_users', 'last_login'): logger.info("Adding last_login column to admin_users table") cursor.execute( "ALTER TABLE admin_users ADD COLUMN last_login TIMESTAMP WITH TIME ZONE" ) if not check_column_exists(cursor, 'admin_users', 'is_active'): logger.info("Adding is_active column to admin_users table") cursor.execute( "ALTER TABLE admin_users ADD COLUMN is_active BOOLEAN DEFAULT TRUE" ) # Migration 3: Add new columns to newsletters table if not check_column_exists(cursor, 'newsletters', 'sent_by'): logger.info("Adding sent_by column to newsletters table") cursor.execute( "ALTER TABLE newsletters ADD COLUMN sent_by TEXT" ) if not check_column_exists(cursor, 'newsletters', 'recipient_count'): logger.info("Adding recipient_count column to newsletters table") cursor.execute( "ALTER TABLE newsletters ADD COLUMN recipient_count INTEGER DEFAULT 0" ) if not check_column_exists(cursor, 'newsletters', 'success_count'): logger.info("Adding success_count column to newsletters table") cursor.execute( "ALTER TABLE newsletters ADD COLUMN success_count INTEGER DEFAULT 0" ) if not check_column_exists(cursor, 'newsletters', 'failure_count'): logger.info("Adding failure_count column to newsletters table") cursor.execute( "ALTER TABLE newsletters ADD COLUMN failure_count INTEGER DEFAULT 0" ) conn.commit() logger.info("Database migrations completed successfully") def init_db(): """Initialize the database tables with improved schema.""" try: with get_db_connection() as conn: cursor = conn.cursor() # Create basic subscribers table (backwards compatible) cursor.execute( """ CREATE TABLE IF NOT EXISTS subscribers ( id SERIAL PRIMARY KEY, email TEXT UNIQUE NOT NULL ) """ ) # Create basic admin_users table (backwards compatible) cursor.execute( """ CREATE TABLE IF NOT EXISTS admin_users ( id SERIAL PRIMARY KEY, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL ) """ ) # Create basic newsletters table (backwards compatible) cursor.execute( """ CREATE TABLE IF NOT EXISTS newsletters ( id SERIAL PRIMARY KEY, subject TEXT NOT NULL, body TEXT NOT NULL, sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """ ) # Email delivery tracking (new table) cursor.execute( """ CREATE TABLE IF NOT EXISTS email_deliveries ( id SERIAL PRIMARY KEY, newsletter_id INTEGER REFERENCES newsletters(id), email TEXT NOT NULL, status TEXT CHECK (status IN ('sent', 'failed', 'bounced')), sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, error_message TEXT ) """ ) conn.commit() logger.info("Basic database tables created successfully") # Apply migrations to upgrade schema migrate_database(conn) # Add indexes after migrations are complete try: cursor.execute("CREATE INDEX IF NOT EXISTS idx_subscribers_email ON subscribers(email)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_subscribers_status ON subscribers(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_newsletters_sent_at ON newsletters(sent_at)") conn.commit() logger.info("Database indexes created successfully") except Exception as e: logger.warning(f"Some indexes may not have been created: {e}") # Continue anyway as this is not critical logger.info("Database initialization completed successfully") except Exception as e: logger.error(f"Database initialization error: {e}") raise def get_subscriber_stats(): """Get comprehensive subscriber statistics.""" try: with get_db_connection() as conn: cursor = conn.cursor() # Check if status column exists has_status = check_column_exists(cursor, 'subscribers', 'status') if has_status: # Get total subscribers with status filtering cursor.execute("SELECT COUNT(*) FROM subscribers WHERE status = 'active'") total_active = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM subscribers WHERE status = 'unsubscribed'") total_unsubscribed = cursor.fetchone()[0] # Get recent signups (last 30 days) - check if subscribed_at exists has_subscribed_at = check_column_exists(cursor, 'subscribers', 'subscribed_at') if has_subscribed_at: cursor.execute(""" SELECT COUNT(*) FROM subscribers WHERE subscribed_at >= NOW() - INTERVAL '30 days' AND status = 'active' """) recent_signups = cursor.fetchone()[0] else: recent_signups = 0 else: # Fallback for old schema cursor.execute("SELECT COUNT(*) FROM subscribers") total_active = cursor.fetchone()[0] total_unsubscribed = 0 recent_signups = 0 # Get newsletters sent cursor.execute("SELECT COUNT(*) FROM newsletters") newsletters_sent = cursor.fetchone()[0] return { 'total_active': total_active, 'total_unsubscribed': total_unsubscribed, 'recent_signups': recent_signups, 'newsletters_sent': newsletters_sent } except Exception as e: logger.error(f"Error retrieving subscriber stats: {e}") return {'total_active': 0, 'total_unsubscribed': 0, 'recent_signups': 0, 'newsletters_sent': 0} def get_all_emails(page=1, per_page=50, search=''): """Return paginated list of subscriber emails with search functionality.""" try: with get_db_connection() as conn: cursor = conn.cursor() # Check which columns exist has_status = check_column_exists(cursor, 'subscribers', 'status') has_subscribed_at = check_column_exists(cursor, 'subscribers', 'subscribed_at') has_source = check_column_exists(cursor, 'subscribers', 'source') # Calculate offset offset = (page - 1) * per_page # Build base query based on available columns if has_status: base_query = "FROM subscribers WHERE status = 'active'" else: base_query = "FROM subscribers WHERE 1=1" params = [] if search: base_query += " AND email ILIKE %s" params.append(f"%{search}%") # Get total count cursor.execute(f"SELECT COUNT(*) {base_query}", params) total_count = cursor.fetchone()[0] # Build select query based on available columns select_fields = ["id", "email"] if has_subscribed_at: select_fields.append("subscribed_at") if has_source: select_fields.append("source") # Get paginated results query = f""" SELECT {', '.join(select_fields)} {base_query} ORDER BY {"subscribed_at DESC" if has_subscribed_at else "id DESC"} LIMIT %s OFFSET %s """ params.extend([per_page, offset]) cursor.execute(query, params) results = cursor.fetchall() subscribers = [] for row in results: subscriber = { 'id': row[0], 'email': row[1], 'subscribed_at': row[2] if has_subscribed_at and len(row) > 2 else None, 'source': row[3] if has_source and len(row) > 3 else 'manual' } subscribers.append(subscriber) return { 'subscribers': subscribers, 'total_count': total_count, 'page': page, 'per_page': per_page, 'total_pages': (total_count + per_page - 1) // per_page } except Exception as e: logger.error(f"Error retrieving emails: {e}") return {'subscribers': [], 'total_count': 0, 'page': 1, 'per_page': per_page, 'total_pages': 0} def add_email(email, source='manual'): """Insert an email into the subscribers table.""" try: with get_db_connection() as conn: cursor = conn.cursor() # Check if source column exists has_source = check_column_exists(cursor, 'subscribers', 'source') if has_source: cursor.execute( "INSERT INTO subscribers (email, source) VALUES (%s, %s)", (email.lower().strip(), source) ) else: cursor.execute( "INSERT INTO subscribers (email) VALUES (%s)", (email.lower().strip(),) ) conn.commit() logger.info(f"Email {email} added successfully.") return True except IntegrityError: logger.warning(f"Attempted to add duplicate email: {email}") return False except Exception as e: logger.error(f"Error adding email {email}: {e}") return False def remove_email(email): """Mark email as unsubscribed or delete if status column doesn't exist.""" try: with get_db_connection() as conn: cursor = conn.cursor() # Check if status column exists has_status = check_column_exists(cursor, 'subscribers', 'status') if has_status: # Mark as unsubscribed cursor.execute( "UPDATE subscribers SET status = 'unsubscribed' WHERE email = %s", (email,) ) else: # Delete the record (old behavior) cursor.execute( "DELETE FROM subscribers WHERE email = %s", (email,) ) rowcount = cursor.rowcount conn.commit() logger.info(f"Email {email} {'unsubscribed' if has_status else 'removed'} successfully.") return rowcount > 0 except Exception as e: logger.error(f"Error {'unsubscribing' if has_status else 'removing'} email {email}: {e}") return False def get_admin(username): """Retrieve admin credentials and update last login.""" try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """SELECT id, username, password, is_active FROM admin_users WHERE username = %s AND is_active = TRUE""", (username,), ) result = cursor.fetchone() if result: # Update last login cursor.execute( "UPDATE admin_users SET last_login = %s WHERE id = %s", (datetime.now(timezone.utc), result[0]) ) conn.commit() return result except Exception as e: logger.error(f"Error retrieving admin: {e}") return None def create_default_admin(): """Create a default admin user if one doesn't already exist.""" default_username = os.getenv("ADMIN_USERNAME", "admin") default_password = os.getenv("ADMIN_PASSWORD", "changeme") hashed_password = generate_password_hash(default_password, method="pbkdf2:sha256") try: with get_db_connection() as conn: cursor = conn.cursor() # Check if any admin exists cursor.execute("SELECT COUNT(*) FROM admin_users WHERE is_active = TRUE") admin_count = cursor.fetchone()[0] if admin_count == 0: cursor.execute( "INSERT INTO admin_users (username, password) VALUES (%s, %s)", (default_username, hashed_password), ) conn.commit() logger.info("Default admin created successfully") else: logger.info("Admin users already exist") except Exception as e: logger.error(f"Error creating default admin: {e}") def save_newsletter(subject, body, sent_by, recipient_count=0): """Save newsletter to database and return the ID.""" try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """INSERT INTO newsletters (subject, body, sent_by, recipient_count) VALUES (%s, %s, %s, %s) RETURNING id""", (subject, body, sent_by, recipient_count) ) newsletter_id = cursor.fetchone()[0] conn.commit() return newsletter_id except Exception as e: logger.error(f"Error saving newsletter: {e}") return None def update_newsletter_stats(newsletter_id, success_count, failure_count): """Update newsletter delivery statistics.""" try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """UPDATE newsletters SET success_count = %s, failure_count = %s WHERE id = %s""", (success_count, failure_count, newsletter_id) ) conn.commit() except Exception as e: logger.error(f"Error updating newsletter stats: {e}") def log_email_delivery(newsletter_id, email, status, error_message=None): """Log individual email delivery attempt.""" try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """INSERT INTO email_deliveries (newsletter_id, email, status, error_message) VALUES (%s, %s, %s, %s)""", (newsletter_id, email, status, error_message) ) conn.commit() except Exception as e: logger.error(f"Error logging email delivery: {e}") def get_recent_newsletters(limit=10): """Get recent newsletters with statistics.""" try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """SELECT id, subject, sent_at, sent_by, recipient_count, success_count, failure_count FROM newsletters ORDER BY sent_at DESC LIMIT %s""", (limit,) ) results = cursor.fetchall() return [{ 'id': row[0], 'subject': row[1], 'sent_at': row[2], 'sent_by': row[3], 'recipient_count': row[4], 'success_count': row[5], 'failure_count': row[6] } for row in results] except Exception as e: logger.error(f"Error retrieving recent newsletters: {e}") return []