222 lines
No EOL
6.8 KiB
Python
222 lines
No EOL
6.8 KiB
Python
import os
|
|
import psycopg2
|
|
from psycopg2 import pool, IntegrityError
|
|
from dotenv import load_dotenv
|
|
import logging
|
|
|
|
load_dotenv()
|
|
|
|
# Global connection pool
|
|
_connection_pool = None
|
|
|
|
def get_connection_pool():
|
|
"""Initialize and return the connection pool"""
|
|
global _connection_pool
|
|
if _connection_pool is None:
|
|
try:
|
|
_connection_pool = psycopg2.pool.ThreadedConnectionPool(
|
|
minconn=2,
|
|
maxconn=20,
|
|
host=os.getenv("PG_HOST"),
|
|
port=os.getenv("PG_PORT", 5432),
|
|
dbname=os.getenv("PG_DATABASE"),
|
|
user=os.getenv("PG_USER"),
|
|
password=os.getenv("PG_PASSWORD"),
|
|
connect_timeout=5
|
|
)
|
|
logging.info("Database connection pool created successfully")
|
|
except Exception as e:
|
|
logging.error(f"Error creating connection pool: {e}")
|
|
raise
|
|
return _connection_pool
|
|
|
|
def get_connection():
|
|
"""Get a connection from the pool"""
|
|
try:
|
|
pool = get_connection_pool()
|
|
conn = pool.getconn()
|
|
if conn.closed:
|
|
# Connection is closed, remove it and get a new one
|
|
pool.putconn(conn, close=True)
|
|
conn = pool.getconn()
|
|
return conn
|
|
except Exception as e:
|
|
logging.error(f"Error getting connection from pool: {e}")
|
|
raise
|
|
|
|
def return_connection(conn):
|
|
"""Return a connection to the pool"""
|
|
try:
|
|
pool = get_connection_pool()
|
|
pool.putconn(conn)
|
|
except Exception as e:
|
|
logging.error(f"Error returning connection to pool: {e}")
|
|
|
|
def close_all_connections():
|
|
"""Close all connections in the pool"""
|
|
global _connection_pool
|
|
if _connection_pool:
|
|
_connection_pool.closeall()
|
|
_connection_pool = None
|
|
logging.info("All database connections closed")
|
|
|
|
def column_exists(cursor, table_name, column_name):
|
|
"""Check if a column exists in a table"""
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM information_schema.columns
|
|
WHERE table_name = %s AND column_name = %s
|
|
)
|
|
""", (table_name, column_name))
|
|
return cursor.fetchone()[0]
|
|
|
|
def index_exists(cursor, index_name):
|
|
"""Check if an index exists"""
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM pg_class c
|
|
JOIN pg_namespace n ON n.oid = c.relnamespace
|
|
WHERE c.relname = %s AND n.nspname = 'public'
|
|
)
|
|
""", (index_name,))
|
|
return cursor.fetchone()[0]
|
|
|
|
def init_db():
|
|
"""Initialize database tables and indexes"""
|
|
conn = None
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Create subscribers table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS subscribers (
|
|
id SERIAL PRIMARY KEY,
|
|
email TEXT UNIQUE NOT NULL
|
|
)
|
|
""")
|
|
|
|
# Add created_at column if it doesn't exist
|
|
if not column_exists(cursor, 'subscribers', 'created_at'):
|
|
cursor.execute("""
|
|
ALTER TABLE subscribers
|
|
ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
""")
|
|
logging.info("Added created_at column to subscribers table")
|
|
|
|
# Create newsletters table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS newsletters(
|
|
id SERIAL PRIMARY KEY,
|
|
subject TEXT NOT NULL,
|
|
body TEXT NOT NULL,
|
|
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# Create indexes only if they don't exist
|
|
if not index_exists(cursor, 'idx_newsletters_sent_at'):
|
|
cursor.execute("CREATE INDEX idx_newsletters_sent_at ON newsletters(sent_at DESC)")
|
|
logging.info("Created index idx_newsletters_sent_at")
|
|
|
|
if not index_exists(cursor, 'idx_subscribers_email'):
|
|
cursor.execute("CREATE INDEX idx_subscribers_email ON subscribers(email)")
|
|
logging.info("Created index idx_subscribers_email")
|
|
|
|
if not index_exists(cursor, 'idx_subscribers_created_at'):
|
|
cursor.execute("CREATE INDEX idx_subscribers_created_at ON subscribers(created_at DESC)")
|
|
logging.info("Created index idx_subscribers_created_at")
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
logging.info("Database tables and indexes initialized successfully")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error initializing database: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
if conn:
|
|
return_connection(conn)
|
|
|
|
def add_email(email):
|
|
"""Add email to subscribers with connection pooling"""
|
|
conn = None
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,))
|
|
conn.commit()
|
|
cursor.close()
|
|
logging.info(f"Email added successfully: {email}")
|
|
return True
|
|
|
|
except IntegrityError:
|
|
# Email already exists
|
|
if conn:
|
|
conn.rollback()
|
|
logging.info(f"Email already exists: {email}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
logging.error(f"Error adding email {email}: {e}")
|
|
return False
|
|
|
|
finally:
|
|
if conn:
|
|
return_connection(conn)
|
|
|
|
def remove_email(email):
|
|
"""Remove email from subscribers with connection pooling"""
|
|
conn = None
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,))
|
|
conn.commit()
|
|
rows_affected = cursor.rowcount
|
|
cursor.close()
|
|
|
|
if rows_affected > 0:
|
|
logging.info(f"Email removed successfully: {email}")
|
|
return True
|
|
else:
|
|
logging.info(f"Email not found for removal: {email}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
logging.error(f"Error removing email {email}: {e}")
|
|
return False
|
|
|
|
finally:
|
|
if conn:
|
|
return_connection(conn)
|
|
|
|
def get_subscriber_count():
|
|
"""Get total number of subscribers"""
|
|
conn = None
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM subscribers")
|
|
count = cursor.fetchone()[0]
|
|
cursor.close()
|
|
return count
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting subscriber count: {e}")
|
|
return 0
|
|
|
|
finally:
|
|
if conn:
|
|
return_connection(conn)
|
|
|
|
# Cleanup function for graceful shutdown
|
|
import atexit
|
|
atexit.register(close_all_connections) |