admin-panel/backend/app/routes/auth.py

119 lines
3.4 KiB
Python

"""
Admin Authentication Routes
"""
from flask import Blueprint, request, jsonify
from app.models import db, AdminUser, AuditLog
from datetime import datetime, timedelta
import jwt
from app.config import Config
from functools import wraps
auth_bp = Blueprint('auth', __name__)
def token_required(f):
"""Decorator to require valid JWT token"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token is missing'}), 401
try:
if token.startswith('Bearer '):
token = token[7:]
data = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=['HS256'])
current_admin = AdminUser.query.get(data['admin_id'])
if not current_admin or not current_admin.is_active:
return jsonify({'error': 'Invalid token'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(current_admin, *args, **kwargs)
return decorated
@auth_bp.route('/login', methods=['POST'])
def login():
"""Admin login"""
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
admin = AdminUser.query.filter_by(username=username).first()
if not admin or not admin.check_password(password):
return jsonify({'error': 'Invalid credentials'}), 401
if not admin.is_active:
return jsonify({'error': 'Account is disabled'}), 403
# Update last login
admin.last_login = datetime.utcnow()
db.session.commit()
# Create JWT token
token = jwt.encode({
'admin_id': admin.id,
'username': admin.username,
'role': admin.role,
'exp': datetime.utcnow() + timedelta(hours=Config.JWT_EXPIRATION_HOURS),
'iat': datetime.utcnow()
}, Config.JWT_SECRET_KEY, algorithm='HS256')
# Log action
log = AuditLog(
admin_id=admin.id,
action='login',
ip_address=request.remote_addr
)
db.session.add(log)
db.session.commit()
return jsonify({
'status': 'success',
'message': 'Login successful',
'token': token,
'admin': admin.to_dict()
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@auth_bp.route('/me', methods=['GET'])
@token_required
def get_current_admin(current_admin):
"""Get current admin info"""
return jsonify({
'status': 'success',
'admin': current_admin.to_dict()
}), 200
@auth_bp.route('/logout', methods=['POST'])
@token_required
def logout(current_admin):
"""Admin logout"""
# Log action
log = AuditLog(
admin_id=current_admin.id,
action='logout',
ip_address=request.remote_addr
)
db.session.add(log)
db.session.commit()
return jsonify({
'status': 'success',
'message': 'Logged out successfully'
}), 200