import os import psycopg2 from psycopg2 import IntegrityError, pool, OperationalError from dotenv import load_dotenv from werkzeug.security import generate_password_hash load_dotenv() try: DB_MIN_CONN = int(os.getenv("DB_MIN_CONN", 1)) DB_MAX_CONN = int(os.getenv("DB_MAX_CONN", 10)) conn_pool = pool.ThreadedConnectionPool( minconn=DB_MIN_CONN, maxconn=DB_MAX_CONN, host=os.getenv("PG_HOST"), port=os.getenv("PG_PORT"), dbname=os.getenv("PG_DATABASE"), user=os.getenv("PG_USER"), password=os.getenv("PG_PASSWORD"), connect_timeout=10, ) except OperationalError: raise except Exception: raise def get_connection(): """Get a connection from the connection pool.""" return conn_pool.getconn() def init_db(): """Initialize database tables with connection pool.""" conn = None cursor = None try: conn = get_connection() cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS subscribers ( id SERIAL PRIMARY KEY, email TEXT UNIQUE NOT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS admin_users ( id SERIAL PRIMARY KEY, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS newsletters ( id SERIAL PRIMARY KEY, subject TEXT NOT NULL, body TEXT NOT NULL, sent_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """ ) conn.commit() except Exception: if conn: conn.rollback() raise finally: if cursor: cursor.close() if conn: conn_pool.putconn(conn) def get_all_emails(): """Return a list of all subscriber emails.""" conn = None cursor = None try: conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT email FROM subscribers") results = cursor.fetchall() return [row[0] for row in results] except Exception: return [] finally: if cursor: cursor.close() if conn: conn_pool.putconn(conn) def add_email(email): """Insert an email into the subscribers table.""" conn = None cursor = None try: conn = get_connection() cursor = conn.cursor() cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,)) conn.commit() return True except IntegrityError: if conn: conn.rollback() return False except Exception: if conn: conn.rollback() return False finally: if cursor: cursor.close() if conn: conn_pool.putconn(conn) def remove_email(email): """Remove an email from the subscribers table.""" conn = None cursor = None try: conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,)) rowcount = cursor.rowcount conn.commit() return rowcount > 0 except Exception: if conn: conn.rollback() return False finally: if cursor: cursor.close() if conn: conn_pool.putconn(conn) def get_admin(username): """Retrieve admin credentials for a given username. Returns a tuple (username, password_hash) if found, otherwise None. """ conn = None cursor = None try: conn = get_connection() cursor = conn.cursor() cursor.execute( "SELECT username, password FROM admin_users WHERE username = %s", (username,), ) return cursor.fetchone() except Exception: return None finally: if cursor: cursor.close() if conn: conn_pool.putconn(conn) def create_default_admin(): """Create a default admin user if one doesn't already exist.""" default_username = os.getenv("ADMIN_USERNAME", "admin") default_password = os.getenv("ADMIN_PASSWORD", "changeme") hashed_password = generate_password_hash(default_password, method="pbkdf2:sha256") conn = None cursor = None try: conn = get_connection() cursor = conn.cursor() cursor.execute( "SELECT id FROM admin_users WHERE username = %s", (default_username,) ) exists = cursor.fetchone() if exists is None: cursor.execute( "INSERT INTO admin_users (username, password) VALUES (%s, %s)", (default_username, hashed_password), ) conn.commit() except Exception: if conn: conn.rollback() finally: if cursor: cursor.close() if conn: conn_pool.putconn(conn) def close_pool(): """Close the database connection pool.""" try: conn_pool.closeall() except Exception: pass class DatabaseContext: """Optional context manager for manual transactions.""" def __init__(self): self.conn = None self.cursor = None def __enter__(self): self.conn = get_connection() self.cursor = self.conn.cursor() return self.cursor def __exit__(self, exc_type, exc_val, exc_tb): try: if exc_type: self.conn.rollback() else: self.conn.commit() finally: if self.cursor: self.cursor.close() if self.conn: conn_pool.putconn(self.conn)