import os import psycopg2 from psycopg2 import pool, IntegrityError from dotenv import load_dotenv import logging load_dotenv() # Global connection pool _connection_pool = None def get_connection_pool(): """Initialize and return the connection pool""" global _connection_pool if _connection_pool is None: try: _connection_pool = psycopg2.pool.ThreadedConnectionPool( minconn=2, maxconn=20, host=os.getenv("PG_HOST"), port=os.getenv("PG_PORT", 5432), dbname=os.getenv("PG_DATABASE"), user=os.getenv("PG_USER"), password=os.getenv("PG_PASSWORD"), connect_timeout=5 ) logging.info("Database connection pool created successfully") except Exception as e: logging.error(f"Error creating connection pool: {e}") raise return _connection_pool def get_connection(): """Get a connection from the pool""" try: pool = get_connection_pool() conn = pool.getconn() if conn.closed: # Connection is closed, remove it and get a new one pool.putconn(conn, close=True) conn = pool.getconn() return conn except Exception as e: logging.error(f"Error getting connection from pool: {e}") raise def return_connection(conn): """Return a connection to the pool""" try: pool = get_connection_pool() pool.putconn(conn) except Exception as e: logging.error(f"Error returning connection to pool: {e}") def close_all_connections(): """Close all connections in the pool""" global _connection_pool if _connection_pool: _connection_pool.closeall() _connection_pool = None logging.info("All database connections closed") def column_exists(cursor, table_name, column_name): """Check if a column exists in a table""" cursor.execute(""" SELECT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s ) """, (table_name, column_name)) return cursor.fetchone()[0] def index_exists(cursor, index_name): """Check if an index exists""" cursor.execute(""" SELECT EXISTS ( SELECT 1 FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = %s AND n.nspname = 'public' ) """, (index_name,)) return cursor.fetchone()[0] def init_db(): """Initialize database tables and indexes""" conn = None try: conn = get_connection() cursor = conn.cursor() # Create subscribers table cursor.execute(""" CREATE TABLE IF NOT EXISTS subscribers ( id SERIAL PRIMARY KEY, email TEXT UNIQUE NOT NULL ) """) # Add created_at column if it doesn't exist if not column_exists(cursor, 'subscribers', 'created_at'): cursor.execute(""" ALTER TABLE subscribers ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP """) logging.info("Added created_at column to subscribers table") # Create newsletters table cursor.execute(""" CREATE TABLE IF NOT EXISTS newsletters( id SERIAL PRIMARY KEY, subject TEXT NOT NULL, body TEXT NOT NULL, sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create indexes only if they don't exist if not index_exists(cursor, 'idx_newsletters_sent_at'): cursor.execute("CREATE INDEX idx_newsletters_sent_at ON newsletters(sent_at DESC)") logging.info("Created index idx_newsletters_sent_at") if not index_exists(cursor, 'idx_subscribers_email'): cursor.execute("CREATE INDEX idx_subscribers_email ON subscribers(email)") logging.info("Created index idx_subscribers_email") if not index_exists(cursor, 'idx_subscribers_created_at'): cursor.execute("CREATE INDEX idx_subscribers_created_at ON subscribers(created_at DESC)") logging.info("Created index idx_subscribers_created_at") conn.commit() cursor.close() logging.info("Database tables and indexes initialized successfully") except Exception as e: logging.error(f"Error initializing database: {e}") if conn: conn.rollback() raise finally: if conn: return_connection(conn) def add_email(email): """Add email to subscribers with connection pooling""" conn = None try: conn = get_connection() cursor = conn.cursor() cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,)) conn.commit() cursor.close() logging.info(f"Email added successfully: {email}") return True except IntegrityError: # Email already exists if conn: conn.rollback() logging.info(f"Email already exists: {email}") return False except Exception as e: if conn: conn.rollback() logging.error(f"Error adding email {email}: {e}") return False finally: if conn: return_connection(conn) def remove_email(email): """Remove email from subscribers with connection pooling""" conn = None try: conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,)) conn.commit() rows_affected = cursor.rowcount cursor.close() if rows_affected > 0: logging.info(f"Email removed successfully: {email}") return True else: logging.info(f"Email not found for removal: {email}") return False except Exception as e: if conn: conn.rollback() logging.error(f"Error removing email {email}: {e}") return False finally: if conn: return_connection(conn) def get_subscriber_count(): """Get total number of subscribers""" conn = None try: conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM subscribers") count = cursor.fetchone()[0] cursor.close() return count except Exception as e: logging.error(f"Error getting subscriber count: {e}") return 0 finally: if conn: return_connection(conn) # Cleanup function for graceful shutdown import atexit atexit.register(close_all_connections)