234 lines
6.4 KiB
Python
234 lines
6.4 KiB
Python
"""
|
|
Authentication service - JWT token generation and validation
|
|
"""
|
|
import jwt
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from flask import request, jsonify, current_app
|
|
from app.models.user import User, Customer
|
|
from app.models.domain import db
|
|
|
|
|
|
class AuthService:
|
|
"""Authentication service for JWT tokens"""
|
|
|
|
@staticmethod
|
|
def generate_token(user_id, role='customer', expires_in=24):
|
|
"""
|
|
Generate JWT token
|
|
|
|
Args:
|
|
user_id: User ID
|
|
role: User role (customer/admin)
|
|
expires_in: Token expiration in hours (default 24)
|
|
|
|
Returns:
|
|
JWT token string
|
|
"""
|
|
payload = {
|
|
'user_id': user_id,
|
|
'role': role,
|
|
'exp': datetime.utcnow() + timedelta(hours=expires_in),
|
|
'iat': datetime.utcnow()
|
|
}
|
|
|
|
token = jwt.encode(
|
|
payload,
|
|
current_app.config['SECRET_KEY'],
|
|
algorithm='HS256'
|
|
)
|
|
|
|
return token
|
|
|
|
@staticmethod
|
|
def verify_token(token):
|
|
"""
|
|
Verify JWT token
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
dict: Decoded payload or None if invalid
|
|
"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
current_app.config['SECRET_KEY'],
|
|
algorithms=['HS256']
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
return None
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
@staticmethod
|
|
def register_user(email, password, full_name, company_name=None):
|
|
"""
|
|
Register new user
|
|
|
|
Args:
|
|
email: User email
|
|
password: User password
|
|
full_name: User full name
|
|
company_name: Optional company name
|
|
|
|
Returns:
|
|
tuple: (user, customer, error)
|
|
"""
|
|
# Check if user exists
|
|
existing_user = User.query.filter_by(email=email).first()
|
|
if existing_user:
|
|
return None, None, "Email already registered"
|
|
|
|
# Create user
|
|
user = User(
|
|
email=email,
|
|
full_name=full_name,
|
|
role='customer',
|
|
is_active=True,
|
|
is_verified=False,
|
|
verification_token=secrets.token_urlsafe(32)
|
|
)
|
|
user.set_password(password)
|
|
|
|
db.session.add(user)
|
|
db.session.flush() # Get user.id
|
|
|
|
# Create customer profile
|
|
customer = Customer(
|
|
user_id=user.id,
|
|
company_name=company_name,
|
|
subscription_plan='free',
|
|
subscription_status='active',
|
|
max_domains=5,
|
|
max_containers=3
|
|
)
|
|
|
|
db.session.add(customer)
|
|
db.session.commit()
|
|
|
|
return user, customer, None
|
|
|
|
@staticmethod
|
|
def login_user(email, password):
|
|
"""
|
|
Login user
|
|
|
|
Args:
|
|
email: User email
|
|
password: User password
|
|
|
|
Returns:
|
|
tuple: (user, token, error)
|
|
"""
|
|
user = User.query.filter_by(email=email).first()
|
|
|
|
if not user:
|
|
return None, None, "Invalid email or password"
|
|
|
|
if not user.check_password(password):
|
|
return None, None, "Invalid email or password"
|
|
|
|
if not user.is_active:
|
|
return None, None, "Account is deactivated"
|
|
|
|
# Update last login
|
|
user.last_login = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
# Generate token
|
|
token = AuthService.generate_token(user.id, user.role)
|
|
|
|
return user, token, None
|
|
|
|
@staticmethod
|
|
def get_current_user(token):
|
|
"""
|
|
Get current user from token
|
|
|
|
Args:
|
|
token: JWT token
|
|
|
|
Returns:
|
|
User object or None
|
|
"""
|
|
payload = AuthService.verify_token(token)
|
|
if not payload:
|
|
return None
|
|
|
|
user = User.query.get(payload['user_id'])
|
|
return user
|
|
|
|
|
|
# Decorators for route protection
|
|
def token_required(f):
|
|
"""Decorator to require valid JWT token"""
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
token = None
|
|
|
|
# Get token from header
|
|
if 'Authorization' in request.headers:
|
|
auth_header = request.headers['Authorization']
|
|
try:
|
|
token = auth_header.split(' ')[1] # Bearer <token>
|
|
except IndexError:
|
|
return jsonify({'error': 'Invalid token format'}), 401
|
|
|
|
if not token:
|
|
return jsonify({'error': 'Token is missing'}), 401
|
|
|
|
# Verify token
|
|
payload = AuthService.verify_token(token)
|
|
if not payload:
|
|
return jsonify({'error': 'Token is invalid or expired'}), 401
|
|
|
|
# Get user
|
|
current_user = User.query.get(payload['user_id'])
|
|
if not current_user or not current_user.is_active:
|
|
return jsonify({'error': 'User not found or inactive'}), 401
|
|
|
|
return f(current_user, *args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin role"""
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
token = None
|
|
|
|
# Get token from header
|
|
if 'Authorization' in request.headers:
|
|
auth_header = request.headers['Authorization']
|
|
try:
|
|
token = auth_header.split(' ')[1]
|
|
except IndexError:
|
|
return jsonify({'error': 'Invalid token format'}), 401
|
|
|
|
if not token:
|
|
return jsonify({'error': 'Token is missing'}), 401
|
|
|
|
# Verify token
|
|
payload = AuthService.verify_token(token)
|
|
if not payload:
|
|
return jsonify({'error': 'Token is invalid or expired'}), 401
|
|
|
|
# Check admin role
|
|
if payload.get('role') != 'admin':
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
# Get user
|
|
current_user = User.query.get(payload['user_id'])
|
|
if not current_user or not current_user.is_active:
|
|
return jsonify({'error': 'User not found or inactive'}), 401
|
|
|
|
return f(current_user, *args, **kwargs)
|
|
|
|
return decorated
|
|
|