522 lines
		
	
	
		
			No EOL
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			522 lines
		
	
	
		
			No EOL
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import logging
 | |
| import psycopg2
 | |
| from psycopg2 import IntegrityError, pool
 | |
| from dotenv import load_dotenv
 | |
| from werkzeug.security import generate_password_hash
 | |
| from contextlib import contextmanager
 | |
| from datetime import datetime, timezone
 | |
| 
 | |
| load_dotenv()
 | |
| 
 | |
| # Logging setup
 | |
| logging.basicConfig(
 | |
|     level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
 | |
| )
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| # Connection pool for better performance
 | |
| connection_pool = None
 | |
| 
 | |
| def init_connection_pool():
 | |
|     """Initialize the connection pool."""
 | |
|     global connection_pool
 | |
|     try:
 | |
|         connection_pool = psycopg2.pool.ThreadedConnectionPool(
 | |
|             1, 20,  # min and max connections
 | |
|             host=os.getenv("PG_HOST"),
 | |
|             port=os.getenv("PG_PORT"),
 | |
|             dbname=os.getenv("PG_DATABASE"),
 | |
|             user=os.getenv("PG_USER"),
 | |
|             password=os.getenv("PG_PASSWORD"),
 | |
|         )
 | |
|         logger.info("Connection pool created successfully")
 | |
|     except Exception as e:
 | |
|         logger.error(f"Connection pool creation error: {e}")
 | |
|         raise
 | |
| 
 | |
| @contextmanager
 | |
| def get_db_connection():
 | |
|     """Context manager for database connections."""
 | |
|     if connection_pool is None:
 | |
|         init_connection_pool()
 | |
|     
 | |
|     conn = None
 | |
|     try:
 | |
|         conn = connection_pool.getconn()
 | |
|         yield conn
 | |
|     except Exception as e:
 | |
|         if conn:
 | |
|             conn.rollback()
 | |
|         logger.error(f"Database operation error: {e}")
 | |
|         raise
 | |
|     finally:
 | |
|         if conn:
 | |
|             connection_pool.putconn(conn)
 | |
| 
 | |
| def check_column_exists(cursor, table_name, column_name):
 | |
|     """Check if a column exists in a table."""
 | |
|     cursor.execute(
 | |
|         """
 | |
|         SELECT EXISTS (
 | |
|             SELECT FROM information_schema.columns 
 | |
|             WHERE table_name = %s AND column_name = %s
 | |
|         )
 | |
|         """,
 | |
|         (table_name, column_name)
 | |
|     )
 | |
|     return cursor.fetchone()[0]
 | |
| 
 | |
| def migrate_database(conn):
 | |
|     """Apply database migrations to upgrade existing schema."""
 | |
|     cursor = conn.cursor()
 | |
|     
 | |
|     # Migration 1: Add new columns to subscribers table
 | |
|     if not check_column_exists(cursor, 'subscribers', 'subscribed_at'):
 | |
|         logger.info("Adding subscribed_at column to subscribers table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE subscribers ADD COLUMN subscribed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP"
 | |
|         )
 | |
|     
 | |
|     if not check_column_exists(cursor, 'subscribers', 'status'):
 | |
|         logger.info("Adding status column to subscribers table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE subscribers ADD COLUMN status TEXT DEFAULT 'active'"
 | |
|         )
 | |
|         # Add check constraint
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE subscribers ADD CONSTRAINT subscribers_status_check CHECK (status IN ('active', 'unsubscribed'))"
 | |
|         )
 | |
|         # Update existing rows
 | |
|         cursor.execute("UPDATE subscribers SET status = 'active' WHERE status IS NULL")
 | |
|     
 | |
|     if not check_column_exists(cursor, 'subscribers', 'source'):
 | |
|         logger.info("Adding source column to subscribers table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE subscribers ADD COLUMN source TEXT DEFAULT 'manual'"
 | |
|         )
 | |
|     
 | |
|     # Migration 2: Add new columns to admin_users table
 | |
|     if not check_column_exists(cursor, 'admin_users', 'created_at'):
 | |
|         logger.info("Adding created_at column to admin_users table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE admin_users ADD COLUMN created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP"
 | |
|         )
 | |
|     
 | |
|     if not check_column_exists(cursor, 'admin_users', 'last_login'):
 | |
|         logger.info("Adding last_login column to admin_users table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE admin_users ADD COLUMN last_login TIMESTAMP WITH TIME ZONE"
 | |
|         )
 | |
|     
 | |
|     if not check_column_exists(cursor, 'admin_users', 'is_active'):
 | |
|         logger.info("Adding is_active column to admin_users table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE admin_users ADD COLUMN is_active BOOLEAN DEFAULT TRUE"
 | |
|         )
 | |
|     
 | |
|     # Migration 3: Add new columns to newsletters table
 | |
|     if not check_column_exists(cursor, 'newsletters', 'sent_by'):
 | |
|         logger.info("Adding sent_by column to newsletters table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE newsletters ADD COLUMN sent_by TEXT"
 | |
|         )
 | |
|     
 | |
|     if not check_column_exists(cursor, 'newsletters', 'recipient_count'):
 | |
|         logger.info("Adding recipient_count column to newsletters table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE newsletters ADD COLUMN recipient_count INTEGER DEFAULT 0"
 | |
|         )
 | |
|     
 | |
|     if not check_column_exists(cursor, 'newsletters', 'success_count'):
 | |
|         logger.info("Adding success_count column to newsletters table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE newsletters ADD COLUMN success_count INTEGER DEFAULT 0"
 | |
|         )
 | |
|     
 | |
|     if not check_column_exists(cursor, 'newsletters', 'failure_count'):
 | |
|         logger.info("Adding failure_count column to newsletters table")
 | |
|         cursor.execute(
 | |
|             "ALTER TABLE newsletters ADD COLUMN failure_count INTEGER DEFAULT 0"
 | |
|         )
 | |
|     
 | |
|     conn.commit()
 | |
|     logger.info("Database migrations completed successfully")
 | |
| 
 | |
| def init_db():
 | |
|     """Initialize the database tables with improved schema."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
| 
 | |
|             # Create basic subscribers table (backwards compatible)
 | |
|             cursor.execute(
 | |
|                 """
 | |
|                 CREATE TABLE IF NOT EXISTS subscribers (
 | |
|                     id SERIAL PRIMARY KEY,
 | |
|                     email TEXT UNIQUE NOT NULL
 | |
|                 )
 | |
|             """
 | |
|             )
 | |
| 
 | |
|             # Create basic admin_users table (backwards compatible)
 | |
|             cursor.execute(
 | |
|                 """
 | |
|                 CREATE TABLE IF NOT EXISTS admin_users (
 | |
|                     id SERIAL PRIMARY KEY,
 | |
|                     username TEXT UNIQUE NOT NULL,
 | |
|                     password TEXT NOT NULL
 | |
|                 )
 | |
|             """
 | |
|             )
 | |
| 
 | |
|             # Create basic newsletters table (backwards compatible)
 | |
|             cursor.execute(
 | |
|                 """
 | |
|                 CREATE TABLE IF NOT EXISTS newsletters (
 | |
|                     id SERIAL PRIMARY KEY,
 | |
|                     subject TEXT NOT NULL,
 | |
|                     body TEXT NOT NULL,
 | |
|                     sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
 | |
|                 )
 | |
|             """
 | |
|             )
 | |
| 
 | |
|             # Email delivery tracking (new table)
 | |
|             cursor.execute(
 | |
|                 """
 | |
|                 CREATE TABLE IF NOT EXISTS email_deliveries (
 | |
|                     id SERIAL PRIMARY KEY,
 | |
|                     newsletter_id INTEGER REFERENCES newsletters(id),
 | |
|                     email TEXT NOT NULL,
 | |
|                     status TEXT CHECK (status IN ('sent', 'failed', 'bounced')),
 | |
|                     sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
 | |
|                     error_message TEXT
 | |
|                 )
 | |
|             """
 | |
|             )
 | |
| 
 | |
|             conn.commit()
 | |
|             logger.info("Basic database tables created successfully")
 | |
|             
 | |
|             # Apply migrations to upgrade schema
 | |
|             migrate_database(conn)
 | |
|             
 | |
|             # Add indexes after migrations are complete
 | |
|             try:
 | |
|                 cursor.execute("CREATE INDEX IF NOT EXISTS idx_subscribers_email ON subscribers(email)")
 | |
|                 cursor.execute("CREATE INDEX IF NOT EXISTS idx_subscribers_status ON subscribers(status)")
 | |
|                 cursor.execute("CREATE INDEX IF NOT EXISTS idx_newsletters_sent_at ON newsletters(sent_at)")
 | |
|                 conn.commit()
 | |
|                 logger.info("Database indexes created successfully")
 | |
|             except Exception as e:
 | |
|                 logger.warning(f"Some indexes may not have been created: {e}")
 | |
|                 # Continue anyway as this is not critical
 | |
|             
 | |
|             logger.info("Database initialization completed successfully")
 | |
|             
 | |
|     except Exception as e:
 | |
|         logger.error(f"Database initialization error: {e}")
 | |
|         raise
 | |
| 
 | |
| def get_subscriber_stats():
 | |
|     """Get comprehensive subscriber statistics."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             
 | |
|             # Check if status column exists
 | |
|             has_status = check_column_exists(cursor, 'subscribers', 'status')
 | |
|             
 | |
|             if has_status:
 | |
|                 # Get total subscribers with status filtering
 | |
|                 cursor.execute("SELECT COUNT(*) FROM subscribers WHERE status = 'active'")
 | |
|                 total_active = cursor.fetchone()[0]
 | |
|                 
 | |
|                 cursor.execute("SELECT COUNT(*) FROM subscribers WHERE status = 'unsubscribed'")
 | |
|                 total_unsubscribed = cursor.fetchone()[0]
 | |
|                 
 | |
|                 # Get recent signups (last 30 days) - check if subscribed_at exists
 | |
|                 has_subscribed_at = check_column_exists(cursor, 'subscribers', 'subscribed_at')
 | |
|                 if has_subscribed_at:
 | |
|                     cursor.execute("""
 | |
|                         SELECT COUNT(*) FROM subscribers 
 | |
|                         WHERE subscribed_at >= NOW() - INTERVAL '30 days' AND status = 'active'
 | |
|                     """)
 | |
|                     recent_signups = cursor.fetchone()[0]
 | |
|                 else:
 | |
|                     recent_signups = 0
 | |
|             else:
 | |
|                 # Fallback for old schema
 | |
|                 cursor.execute("SELECT COUNT(*) FROM subscribers")
 | |
|                 total_active = cursor.fetchone()[0]
 | |
|                 total_unsubscribed = 0
 | |
|                 recent_signups = 0
 | |
|             
 | |
|             # Get newsletters sent
 | |
|             cursor.execute("SELECT COUNT(*) FROM newsletters")
 | |
|             newsletters_sent = cursor.fetchone()[0]
 | |
|             
 | |
|             return {
 | |
|                 'total_active': total_active,
 | |
|                 'total_unsubscribed': total_unsubscribed,
 | |
|                 'recent_signups': recent_signups,
 | |
|                 'newsletters_sent': newsletters_sent
 | |
|             }
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error retrieving subscriber stats: {e}")
 | |
|         return {'total_active': 0, 'total_unsubscribed': 0, 'recent_signups': 0, 'newsletters_sent': 0}
 | |
| 
 | |
| def get_all_emails(page=1, per_page=50, search=''):
 | |
|     """Return paginated list of subscriber emails with search functionality."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             
 | |
|             # Check which columns exist
 | |
|             has_status = check_column_exists(cursor, 'subscribers', 'status')
 | |
|             has_subscribed_at = check_column_exists(cursor, 'subscribers', 'subscribed_at')
 | |
|             has_source = check_column_exists(cursor, 'subscribers', 'source')
 | |
|             
 | |
|             # Calculate offset
 | |
|             offset = (page - 1) * per_page
 | |
|             
 | |
|             # Build base query based on available columns
 | |
|             if has_status:
 | |
|                 base_query = "FROM subscribers WHERE status = 'active'"
 | |
|             else:
 | |
|                 base_query = "FROM subscribers WHERE 1=1"
 | |
|             
 | |
|             params = []
 | |
|             
 | |
|             if search:
 | |
|                 base_query += " AND email ILIKE %s"
 | |
|                 params.append(f"%{search}%")
 | |
|             
 | |
|             # Get total count
 | |
|             cursor.execute(f"SELECT COUNT(*) {base_query}", params)
 | |
|             total_count = cursor.fetchone()[0]
 | |
|             
 | |
|             # Build select query based on available columns
 | |
|             select_fields = ["id", "email"]
 | |
|             if has_subscribed_at:
 | |
|                 select_fields.append("subscribed_at")
 | |
|             if has_source:
 | |
|                 select_fields.append("source")
 | |
|             
 | |
|             # Get paginated results
 | |
|             query = f"""
 | |
|                 SELECT {', '.join(select_fields)}
 | |
|                 {base_query} 
 | |
|                 ORDER BY {"subscribed_at DESC" if has_subscribed_at else "id DESC"}
 | |
|                 LIMIT %s OFFSET %s
 | |
|             """
 | |
|             params.extend([per_page, offset])
 | |
|             cursor.execute(query, params)
 | |
|             
 | |
|             results = cursor.fetchall()
 | |
|             subscribers = []
 | |
|             
 | |
|             for row in results:
 | |
|                 subscriber = {
 | |
|                     'id': row[0],
 | |
|                     'email': row[1],
 | |
|                     'subscribed_at': row[2] if has_subscribed_at and len(row) > 2 else None,
 | |
|                     'source': row[3] if has_source and len(row) > 3 else 'manual'
 | |
|                 }
 | |
|                 subscribers.append(subscriber)
 | |
|             
 | |
|             return {
 | |
|                 'subscribers': subscribers,
 | |
|                 'total_count': total_count,
 | |
|                 'page': page,
 | |
|                 'per_page': per_page,
 | |
|                 'total_pages': (total_count + per_page - 1) // per_page
 | |
|             }
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error retrieving emails: {e}")
 | |
|         return {'subscribers': [], 'total_count': 0, 'page': 1, 'per_page': per_page, 'total_pages': 0}
 | |
| 
 | |
| def add_email(email, source='manual'):
 | |
|     """Insert an email into the subscribers table."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             
 | |
|             # Check if source column exists
 | |
|             has_source = check_column_exists(cursor, 'subscribers', 'source')
 | |
|             
 | |
|             if has_source:
 | |
|                 cursor.execute(
 | |
|                     "INSERT INTO subscribers (email, source) VALUES (%s, %s)", 
 | |
|                     (email.lower().strip(), source)
 | |
|                 )
 | |
|             else:
 | |
|                 cursor.execute(
 | |
|                     "INSERT INTO subscribers (email) VALUES (%s)", 
 | |
|                     (email.lower().strip(),)
 | |
|                 )
 | |
|                 
 | |
|             conn.commit()
 | |
|             logger.info(f"Email {email} added successfully.")
 | |
|             return True
 | |
|     except IntegrityError:
 | |
|         logger.warning(f"Attempted to add duplicate email: {email}")
 | |
|         return False
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error adding email {email}: {e}")
 | |
|         return False
 | |
| 
 | |
| def remove_email(email):
 | |
|     """Mark email as unsubscribed or delete if status column doesn't exist."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             
 | |
|             # Check if status column exists
 | |
|             has_status = check_column_exists(cursor, 'subscribers', 'status')
 | |
|             
 | |
|             if has_status:
 | |
|                 # Mark as unsubscribed
 | |
|                 cursor.execute(
 | |
|                     "UPDATE subscribers SET status = 'unsubscribed' WHERE email = %s", 
 | |
|                     (email,)
 | |
|                 )
 | |
|             else:
 | |
|                 # Delete the record (old behavior)
 | |
|                 cursor.execute(
 | |
|                     "DELETE FROM subscribers WHERE email = %s", 
 | |
|                     (email,)
 | |
|                 )
 | |
|                 
 | |
|             rowcount = cursor.rowcount
 | |
|             conn.commit()
 | |
|             logger.info(f"Email {email} {'unsubscribed' if has_status else 'removed'} successfully.")
 | |
|             return rowcount > 0
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error {'unsubscribing' if has_status else 'removing'} email {email}: {e}")
 | |
|         return False
 | |
| 
 | |
| def get_admin(username):
 | |
|     """Retrieve admin credentials and update last login."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             cursor.execute(
 | |
|                 """SELECT id, username, password, is_active 
 | |
|                    FROM admin_users 
 | |
|                    WHERE username = %s AND is_active = TRUE""",
 | |
|                 (username,),
 | |
|             )
 | |
|             result = cursor.fetchone()
 | |
|             
 | |
|             if result:
 | |
|                 # Update last login
 | |
|                 cursor.execute(
 | |
|                     "UPDATE admin_users SET last_login = %s WHERE id = %s",
 | |
|                     (datetime.now(timezone.utc), result[0])
 | |
|                 )
 | |
|                 conn.commit()
 | |
|                 
 | |
|             return result
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error retrieving admin: {e}")
 | |
|         return None
 | |
| 
 | |
| def create_default_admin():
 | |
|     """Create a default admin user if one doesn't already exist."""
 | |
|     default_username = os.getenv("ADMIN_USERNAME", "admin")
 | |
|     default_password = os.getenv("ADMIN_PASSWORD", "changeme")
 | |
|     hashed_password = generate_password_hash(default_password, method="pbkdf2:sha256")
 | |
|     
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
| 
 | |
|             # Check if any admin exists
 | |
|             cursor.execute("SELECT COUNT(*) FROM admin_users WHERE is_active = TRUE")
 | |
|             admin_count = cursor.fetchone()[0]
 | |
|             
 | |
|             if admin_count == 0:
 | |
|                 cursor.execute(
 | |
|                     "INSERT INTO admin_users (username, password) VALUES (%s, %s)",
 | |
|                     (default_username, hashed_password),
 | |
|                 )
 | |
|                 conn.commit()
 | |
|                 logger.info("Default admin created successfully")
 | |
|             else:
 | |
|                 logger.info("Admin users already exist")
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error creating default admin: {e}")
 | |
| 
 | |
| def save_newsletter(subject, body, sent_by, recipient_count=0):
 | |
|     """Save newsletter to database and return the ID."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             cursor.execute(
 | |
|                 """INSERT INTO newsletters (subject, body, sent_by, recipient_count) 
 | |
|                    VALUES (%s, %s, %s, %s) RETURNING id""",
 | |
|                 (subject, body, sent_by, recipient_count)
 | |
|             )
 | |
|             newsletter_id = cursor.fetchone()[0]
 | |
|             conn.commit()
 | |
|             return newsletter_id
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error saving newsletter: {e}")
 | |
|         return None
 | |
| 
 | |
| def update_newsletter_stats(newsletter_id, success_count, failure_count):
 | |
|     """Update newsletter delivery statistics."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             cursor.execute(
 | |
|                 """UPDATE newsletters 
 | |
|                    SET success_count = %s, failure_count = %s 
 | |
|                    WHERE id = %s""",
 | |
|                 (success_count, failure_count, newsletter_id)
 | |
|             )
 | |
|             conn.commit()
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error updating newsletter stats: {e}")
 | |
| 
 | |
| def log_email_delivery(newsletter_id, email, status, error_message=None):
 | |
|     """Log individual email delivery attempt."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             cursor.execute(
 | |
|                 """INSERT INTO email_deliveries (newsletter_id, email, status, error_message) 
 | |
|                    VALUES (%s, %s, %s, %s)""",
 | |
|                 (newsletter_id, email, status, error_message)
 | |
|             )
 | |
|             conn.commit()
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error logging email delivery: {e}")
 | |
| 
 | |
| def get_recent_newsletters(limit=10):
 | |
|     """Get recent newsletters with statistics."""
 | |
|     try:
 | |
|         with get_db_connection() as conn:
 | |
|             cursor = conn.cursor()
 | |
|             cursor.execute(
 | |
|                 """SELECT id, subject, sent_at, sent_by, recipient_count, success_count, failure_count
 | |
|                    FROM newsletters 
 | |
|                    ORDER BY sent_at DESC 
 | |
|                    LIMIT %s""",
 | |
|                 (limit,)
 | |
|             )
 | |
|             results = cursor.fetchall()
 | |
|             return [{
 | |
|                 'id': row[0],
 | |
|                 'subject': row[1],
 | |
|                 'sent_at': row[2],
 | |
|                 'sent_by': row[3],
 | |
|                 'recipient_count': row[4],
 | |
|                 'success_count': row[5],
 | |
|                 'failure_count': row[6]
 | |
|             } for row in results]
 | |
|     except Exception as e:
 | |
|         logger.error(f"Error retrieving recent newsletters: {e}")
 | |
|         return [] | 
