""" Authentication service - JWT token generation and validation """ import jwt import secrets from datetime import datetime, timedelta from functools import wraps from flask import request, jsonify, current_app from app.models.user import User, Customer from app.models.domain import db class AuthService: """Authentication service for JWT tokens""" @staticmethod def generate_token(user_id, role='customer', expires_in=24): """ Generate JWT token Args: user_id: User ID role: User role (customer/admin) expires_in: Token expiration in hours (default 24) Returns: JWT token string """ payload = { 'user_id': user_id, 'role': role, 'exp': datetime.utcnow() + timedelta(hours=expires_in), 'iat': datetime.utcnow() } token = jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) return token @staticmethod def verify_token(token): """ Verify JWT token Args: token: JWT token string Returns: dict: Decoded payload or None if invalid """ try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256'] ) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None @staticmethod def register_user(email, password, full_name, company_name=None): """ Register new user Args: email: User email password: User password full_name: User full name company_name: Optional company name Returns: tuple: (user, customer, error) """ # Check if user exists existing_user = User.query.filter_by(email=email).first() if existing_user: return None, None, "Email already registered" # Create user user = User( email=email, full_name=full_name, role='customer', is_active=True, is_verified=False, verification_token=secrets.token_urlsafe(32) ) user.set_password(password) db.session.add(user) db.session.flush() # Get user.id # Create customer profile customer = Customer( user_id=user.id, company_name=company_name, subscription_plan='free', subscription_status='active', max_domains=5, max_containers=3 ) db.session.add(customer) db.session.commit() return user, customer, None @staticmethod def login_user(email, password): """ Login user Args: email: User email password: User password Returns: tuple: (user, token, error) """ user = User.query.filter_by(email=email).first() if not user: return None, None, "Invalid email or password" if not user.check_password(password): return None, None, "Invalid email or password" if not user.is_active: return None, None, "Account is deactivated" # Update last login user.last_login = datetime.utcnow() db.session.commit() # Generate token token = AuthService.generate_token(user.id, user.role) return user, token, None @staticmethod def get_current_user(token): """ Get current user from token Args: token: JWT token Returns: User object or None """ payload = AuthService.verify_token(token) if not payload: return None user = User.query.get(payload['user_id']) return user # Decorators for route protection def token_required(f): """Decorator to require valid JWT token""" @wraps(f) def decorated(*args, **kwargs): token = None # Get token from header if 'Authorization' in request.headers: auth_header = request.headers['Authorization'] try: token = auth_header.split(' ')[1] # Bearer except IndexError: return jsonify({'error': 'Invalid token format'}), 401 if not token: return jsonify({'error': 'Token is missing'}), 401 # Verify token payload = AuthService.verify_token(token) if not payload: return jsonify({'error': 'Token is invalid or expired'}), 401 # Get user current_user = User.query.get(payload['user_id']) if not current_user or not current_user.is_active: return jsonify({'error': 'User not found or inactive'}), 401 return f(current_user, *args, **kwargs) return decorated def admin_required(f): """Decorator to require admin role""" @wraps(f) def decorated(*args, **kwargs): token = None # Get token from header if 'Authorization' in request.headers: auth_header = request.headers['Authorization'] try: token = auth_header.split(' ')[1] except IndexError: return jsonify({'error': 'Invalid token format'}), 401 if not token: return jsonify({'error': 'Token is missing'}), 401 # Verify token payload = AuthService.verify_token(token) if not payload: return jsonify({'error': 'Token is invalid or expired'}), 401 # Check admin role if payload.get('role') != 'admin': return jsonify({'error': 'Admin access required'}), 403 # Get user current_user = User.query.get(payload['user_id']) if not current_user or not current_user.is_active: return jsonify({'error': 'User not found or inactive'}), 401 return f(current_user, *args, **kwargs) return decorated