import os import logging import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import concurrent.futures from threading import Lock from flask import ( Flask, render_template, request, redirect, url_for, flash, session, jsonify, abort ) from dotenv import load_dotenv from werkzeug.security import check_password_hash from functools import wraps import re from database import ( get_db_connection, init_db, get_all_emails, get_admin, create_default_admin, get_subscriber_stats, add_email, remove_email, save_newsletter, update_newsletter_stats, log_email_delivery, get_recent_newsletters ) load_dotenv() app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY") base_url = os.getenv("BASE_URL") # SMTP settings SMTP_SERVER = os.getenv("SMTP_SERVER") SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") SENDER_EMAIL = os.getenv("SENDER_EMAIL", SMTP_USER) SENDER_NAME = os.getenv("SENDER_NAME", "Newsletter Admin") # Email sending configuration MAX_EMAIL_WORKERS = 5 email_send_lock = Lock() # Logging setup logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize the database and create default admin user init_db() create_default_admin() # Security decorators def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if "username" not in session: if request.is_json: return jsonify({"error": "Authentication required"}), 401 flash("Please log in to access this page.", "warning") return redirect(url_for("login")) return f(*args, **kwargs) return decorated_function def validate_email(email): """Validate email format.""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def send_single_email(email_data): """Send a single email (for thread pool execution).""" email, subject, body, newsletter_id = email_data try: server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=15) server.login(SMTP_USER, SMTP_PASSWORD) # Create message msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = f"{SENDER_NAME} <{SENDER_EMAIL}>" msg['To'] = email # Create unsubscribe link unsub_link = f"https://{base_url}/unsubscribe?email={email}" # HTML body with unsubscribe link html_body = f""" {body}

If you wish to unsubscribe, please click here

""" # Attach HTML part html_part = MIMEText(html_body, 'html') msg.attach(html_part) server.sendmail(SENDER_EMAIL, email, msg.as_string()) server.quit() # Log successful delivery if newsletter_id: log_email_delivery(newsletter_id, email, 'sent') logger.info(f"Email sent successfully to: {email}") return True, email, None except Exception as e: error_msg = str(e) logger.error(f"Failed to send email to {email}: {error_msg}") # Log failed delivery if newsletter_id: log_email_delivery(newsletter_id, email, 'failed', error_msg) return False, email, error_msg def send_newsletter_batch(subject, body, email_list, newsletter_id=None): """Send newsletter to multiple recipients using thread pool.""" success_count = 0 failure_count = 0 failed_emails = [] # Prepare email data for thread pool email_data_list = [(email, subject, body, newsletter_id) for email in email_list] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_EMAIL_WORKERS) as executor: future_to_email = { executor.submit(send_single_email, email_data): email_data[0] for email_data in email_data_list } for future in concurrent.futures.as_completed(future_to_email): success, email, error = future.result() if success: success_count += 1 else: failure_count += 1 failed_emails.append({'email': email, 'error': error}) return success_count, failure_count, failed_emails @app.route("/") @login_required def index(): """Dashboard with subscriber statistics and recent activity.""" page = request.args.get('page', 1, type=int) search = request.args.get('search', '') per_page = 25 stats = get_subscriber_stats() subscribers_data = get_all_emails(page=page, per_page=per_page, search=search) recent_newsletters = get_recent_newsletters(limit=5) return render_template( "admin_index.html", stats=stats, subscribers=subscribers_data['subscribers'], pagination={ 'page': subscribers_data['page'], 'per_page': subscribers_data['per_page'], 'total_pages': subscribers_data['total_pages'], 'total_count': subscribers_data['total_count'] }, search=search, recent_newsletters=recent_newsletters ) @app.route("/subscribers") @login_required def subscribers(): """Detailed subscriber management page.""" page = request.args.get('page', 1, type=int) search = request.args.get('search', '') per_page = 50 subscribers_data = get_all_emails(page=page, per_page=per_page, search=search) return render_template( "subscribers.html", subscribers=subscribers_data['subscribers'], pagination={ 'page': subscribers_data['page'], 'per_page': subscribers_data['per_page'], 'total_pages': subscribers_data['total_pages'], 'total_count': subscribers_data['total_count'] }, search=search ) @app.route("/add_subscriber", methods=["POST"]) @login_required def add_subscriber(): """Add a new subscriber via AJAX.""" try: data = request.get_json() email = data.get('email', '').strip().lower() if not email or not validate_email(email): return jsonify({"success": False, "message": "Invalid email format"}), 400 success = add_email(email, source='admin_manual') if success: return jsonify({"success": True, "message": f"Successfully added {email}"}) else: return jsonify({"success": False, "message": "Email already exists or failed to add"}), 400 except Exception as e: logger.error(f"Error adding subscriber: {e}") return jsonify({"success": False, "message": "Server error occurred"}), 500 @app.route("/remove_subscriber", methods=["POST"]) @login_required def remove_subscriber(): """Remove/unsubscribe a subscriber via AJAX.""" try: data = request.get_json() email = data.get('email', '').strip().lower() if not email: return jsonify({"success": False, "message": "Email is required"}), 400 success = remove_email(email) if success: return jsonify({"success": True, "message": f"Successfully unsubscribed {email}"}) else: return jsonify({"success": False, "message": "Email not found"}), 404 except Exception as e: logger.error(f"Error removing subscriber: {e}") return jsonify({"success": False, "message": "Server error occurred"}), 500 @app.route("/send_newsletter", methods=["GET", "POST"]) @login_required def send_newsletter(): """Enhanced newsletter sending with preview and batch processing.""" if request.method == "POST": action = request.form.get('action', 'send') subject = request.form.get('subject', '').strip() body = request.form.get('body', '').strip() if not subject or not body: flash("Subject and body are required", "error") return redirect(url_for('send_newsletter')) if action == 'preview': # Return preview preview_html = f"""

Subject: {subject}

{body}

Unsubscribe link will be automatically added to all emails

""" return render_template("send_newsletter.html", preview=preview_html, subject=subject, body=body) elif action == 'send': # Get all active subscribers (backwards compatible) try: with get_db_connection() as conn: cursor = conn.cursor() # Check if status column exists for backwards compatibility cursor.execute( """ SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'subscribers' AND column_name = 'status' ) """ ) has_status = cursor.fetchone()[0] if has_status: cursor.execute("SELECT email FROM subscribers WHERE status = 'active'") else: cursor.execute("SELECT email FROM subscribers") email_list = [row[0] for row in cursor.fetchall()] except Exception as e: logger.error(f"Error fetching subscriber emails: {e}") flash("Error retrieving subscriber list", "error") return redirect(url_for('send_newsletter')) if not email_list: flash("No active subscribers found", "warning") return redirect(url_for('send_newsletter')) # Save newsletter to database newsletter_id = save_newsletter(subject, body, session['username'], len(email_list)) # Send emails in batches try: success_count, failure_count, failed_emails = send_newsletter_batch( subject, body, email_list, newsletter_id ) # Update newsletter statistics if newsletter_id: update_newsletter_stats(newsletter_id, success_count, failure_count) # Flash results if success_count > 0: flash(f"Newsletter sent successfully to {success_count} subscribers!", "success") if failure_count > 0: flash(f"Failed to send to {failure_count} subscribers", "error") for failed in failed_emails[:5]: # Show first 5 failures flash(f"Failed: {failed['email']} - {failed['error'][:100]}", "warning") except Exception as e: logger.error(f"Error sending newsletter: {e}") flash(f"Error sending newsletter: {str(e)}", "error") return redirect(url_for('send_newsletter')) return render_template("send_newsletter.html") @app.route("/newsletter_history") @login_required def newsletter_history(): """View newsletter sending history.""" newsletters = get_recent_newsletters(limit=50) return render_template("newsletter_history.html", newsletters=newsletters) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") if not username or not password: flash("Username and password are required", "error") return redirect(url_for("login")) admin = get_admin(username) if admin and len(admin) >= 3 and check_password_hash(admin[2], password): if len(admin) >= 4 and not admin[3]: # Check is_active flash("Account is disabled", "error") return redirect(url_for("login")) session["username"] = username session["admin_id"] = admin[0] flash("Logged in successfully!", "success") # Redirect to intended page or dashboard next_page = request.args.get('next') if next_page: return redirect(next_page) return redirect(url_for("index")) else: flash("Invalid username or password", "error") return redirect(url_for("login")) return render_template("login.html") @app.route("/logout") def logout(): session.clear() flash("You have been logged out successfully", "info") return redirect(url_for("login")) # Public unsubscribe endpoint @app.route("/unsubscribe") def unsubscribe(): """Public unsubscribe endpoint.""" email = request.args.get('email', '').strip().lower() if not email or not validate_email(email): return render_template("unsubscribe.html", error="Invalid email address") success = remove_email(email) if success: return render_template("unsubscribe.html", success=True, email=email) else: return render_template("unsubscribe.html", error="Email not found or already unsubscribed") # API endpoints for AJAX requests @app.route("/api/stats") @login_required def api_stats(): """API endpoint for dashboard statistics.""" stats = get_subscriber_stats() return jsonify(stats) # Error handlers @app.errorhandler(404) def not_found(error): return render_template("error.html", error="Page not found", code=404), 404 @app.errorhandler(500) def internal_error(error): logger.error(f"Internal error: {error}") return render_template("error.html", error="Internal server error", code=500), 500 # Context processors @app.context_processor def inject_user(): return dict(current_user=session.get('username')) if __name__ == "__main__": app.run(host="0.0.0.0", port=5001, debug=True)