From 66011bcd0f3651da6c1674d11f1b3e532be58dec Mon Sep 17 00:00:00 2001 From: Cipher Vance Date: Mon, 25 Aug 2025 14:01:08 -0500 Subject: [PATCH] feat: implement connection pooling and automatic schema migration for performance --- database.py | 246 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 201 insertions(+), 45 deletions(-) diff --git a/database.py b/database.py index 0569f18..d8059fe 100644 --- a/database.py +++ b/database.py @@ -1,66 +1,222 @@ import os import psycopg2 -from psycopg2 import IntegrityError +from psycopg2 import pool, IntegrityError from dotenv import load_dotenv +import logging load_dotenv() +# Global connection pool +_connection_pool = None + +def get_connection_pool(): + """Initialize and return the connection pool""" + global _connection_pool + if _connection_pool is None: + try: + _connection_pool = psycopg2.pool.ThreadedConnectionPool( + minconn=2, + maxconn=20, + host=os.getenv("PG_HOST"), + port=os.getenv("PG_PORT", 5432), + dbname=os.getenv("PG_DATABASE"), + user=os.getenv("PG_USER"), + password=os.getenv("PG_PASSWORD"), + connect_timeout=5 + ) + logging.info("Database connection pool created successfully") + except Exception as e: + logging.error(f"Error creating connection pool: {e}") + raise + return _connection_pool + def get_connection(): - """Return a database connection.""" - return psycopg2.connect( - host=os.getenv("PG_HOST"), - port=os.getenv("PG_PORT"), - dbname=os.getenv("PG_DATABASE"), - user=os.getenv("PG_USER"), - password=os.getenv("PG_PASSWORD"), - connect_timeout=10 - ) + """Get a connection from the pool""" + try: + pool = get_connection_pool() + conn = pool.getconn() + if conn.closed: + # Connection is closed, remove it and get a new one + pool.putconn(conn, close=True) + conn = pool.getconn() + return conn + except Exception as e: + logging.error(f"Error getting connection from pool: {e}") + raise + +def return_connection(conn): + """Return a connection to the pool""" + try: + pool = get_connection_pool() + pool.putconn(conn) + except Exception as e: + logging.error(f"Error returning connection to pool: {e}") + +def close_all_connections(): + """Close all connections in the pool""" + global _connection_pool + if _connection_pool: + _connection_pool.closeall() + _connection_pool = None + logging.info("All database connections closed") + +def column_exists(cursor, table_name, column_name): + """Check if a column exists in a table""" + cursor.execute(""" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_name = %s AND column_name = %s + ) + """, (table_name, column_name)) + return cursor.fetchone()[0] + +def index_exists(cursor, index_name): + """Check if an index exists""" + cursor.execute(""" + SELECT EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = %s AND n.nspname = 'public' + ) + """, (index_name,)) + return cursor.fetchone()[0] def init_db(): - conn = get_connection() - cursor = conn.cursor() - cursor.execute(""" - CREATE TABLE IF NOT EXISTS subscribers ( - id SERIAL PRIMARY KEY, - email TEXT UNIQUE NOT NULL - ) - """) - - cursor.execute(""" - CREATE TABLE IF NOT EXISTS newsletters( - id SERIAL PRIMARY KEY, - subject TEXT NOT NULL, - body TEXT NOT NULL, - sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) + """Initialize database tables and indexes""" + conn = None + try: + conn = get_connection() + cursor = conn.cursor() - conn.commit() - cursor.close() - conn.close() + # Create subscribers table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS subscribers ( + id SERIAL PRIMARY KEY, + email TEXT UNIQUE NOT NULL + ) + """) + + # Add created_at column if it doesn't exist + if not column_exists(cursor, 'subscribers', 'created_at'): + cursor.execute(""" + ALTER TABLE subscribers + ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + """) + logging.info("Added created_at column to subscribers table") + + # Create newsletters table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS newsletters( + id SERIAL PRIMARY KEY, + subject TEXT NOT NULL, + body TEXT NOT NULL, + sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create indexes only if they don't exist + if not index_exists(cursor, 'idx_newsletters_sent_at'): + cursor.execute("CREATE INDEX idx_newsletters_sent_at ON newsletters(sent_at DESC)") + logging.info("Created index idx_newsletters_sent_at") + + if not index_exists(cursor, 'idx_subscribers_email'): + cursor.execute("CREATE INDEX idx_subscribers_email ON subscribers(email)") + logging.info("Created index idx_subscribers_email") + + if not index_exists(cursor, 'idx_subscribers_created_at'): + cursor.execute("CREATE INDEX idx_subscribers_created_at ON subscribers(created_at DESC)") + logging.info("Created index idx_subscribers_created_at") + + conn.commit() + cursor.close() + logging.info("Database tables and indexes initialized successfully") + + except Exception as e: + logging.error(f"Error initializing database: {e}") + if conn: + conn.rollback() + raise + finally: + if conn: + return_connection(conn) def add_email(email): + """Add email to subscribers with connection pooling""" + conn = None try: - with get_connection() as conn: - with conn.cursor() as cursor: - cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,)) - conn.commit() + conn = get_connection() + cursor = conn.cursor() + cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,)) + conn.commit() + cursor.close() + logging.info(f"Email added successfully: {email}") return True + except IntegrityError: + # Email already exists + if conn: + conn.rollback() + logging.info(f"Email already exists: {email}") return False - except psycopg2.OperationalError as e: - print(f"Error: {e}") + + except Exception as e: + if conn: + conn.rollback() + logging.error(f"Error adding email {email}: {e}") return False + + finally: + if conn: + return_connection(conn) def remove_email(email): + """Remove email from subscribers with connection pooling""" + conn = None try: - with get_connection() as conn: - with conn.cursor() as cursor: - cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,)) - conn.commit() - if cursor.rowcount > 0: - return True - return False + conn = get_connection() + cursor = conn.cursor() + cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,)) + conn.commit() + rows_affected = cursor.rowcount + cursor.close() + + if rows_affected > 0: + logging.info(f"Email removed successfully: {email}") + return True + else: + logging.info(f"Email not found for removal: {email}") + return False + except Exception as e: - print(f"Error removing email: {e}") - return False \ No newline at end of file + if conn: + conn.rollback() + logging.error(f"Error removing email {email}: {e}") + return False + + finally: + if conn: + return_connection(conn) + +def get_subscriber_count(): + """Get total number of subscribers""" + conn = None + try: + conn = get_connection() + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM subscribers") + count = cursor.fetchone()[0] + cursor.close() + return count + + except Exception as e: + logging.error(f"Error getting subscriber count: {e}") + return 0 + + finally: + if conn: + return_connection(conn) + +# Cleanup function for graceful shutdown +import atexit +atexit.register(close_all_connections) \ No newline at end of file