admin-panel/backend/app/routes/auth.py

202 lines
6.0 KiB
Python

"""
Admin Authentication Routes
"""
from flask import Blueprint, request, jsonify
from app.models import db, AdminUser, AuditLog
from datetime import datetime, timedelta
import jwt
from app.config import Config
from functools import wraps
auth_bp = Blueprint('auth', __name__)
def token_required(f):
"""Decorator to require valid JWT token"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token is missing'}), 401
try:
if token.startswith('Bearer '):
token = token[7:]
data = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=['HS256'])
current_admin = AdminUser.query.get(data['admin_id'])
if not current_admin or not current_admin.is_active:
return jsonify({'error': 'Invalid token'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(current_admin, *args, **kwargs)
return decorated
@auth_bp.route('/login', methods=['POST'])
def login():
"""Admin login"""
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
admin = AdminUser.query.filter_by(username=username).first()
if not admin or not admin.check_password(password):
return jsonify({'error': 'Invalid credentials'}), 401
if not admin.is_active:
return jsonify({'error': 'Account is disabled'}), 403
# Update last login
admin.last_login = datetime.utcnow()
db.session.commit()
# Create JWT token
token = jwt.encode({
'admin_id': admin.id,
'username': admin.username,
'role': admin.role,
'exp': datetime.utcnow() + timedelta(hours=Config.JWT_EXPIRATION_HOURS),
'iat': datetime.utcnow()
}, Config.JWT_SECRET_KEY, algorithm='HS256')
# Log action
log = AuditLog(
admin_id=admin.id,
action='login',
ip_address=request.remote_addr
)
db.session.add(log)
db.session.commit()
return jsonify({
'status': 'success',
'message': 'Login successful',
'token': token,
'admin': admin.to_dict()
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@auth_bp.route('/me', methods=['GET'])
@token_required
def get_current_admin(current_admin):
"""Get current admin info"""
return jsonify({
'status': 'success',
'admin': current_admin.to_dict()
}), 200
@auth_bp.route('/logout', methods=['POST'])
@token_required
def logout(current_admin):
"""Admin logout"""
# Log action
log = AuditLog(
admin_id=current_admin.id,
action='logout',
ip_address=request.remote_addr
)
db.session.add(log)
db.session.commit()
return jsonify({
'status': 'success',
'message': 'Logged out successfully'
}), 200
@auth_bp.route('/profile', methods=['PUT'])
@token_required
def update_profile(current_admin):
"""Update admin profile"""
try:
data = request.get_json()
# Update allowed fields
if 'full_name' in data:
current_admin.full_name = data['full_name']
if 'email' in data:
# Check if email is already taken by another admin
existing = AdminUser.query.filter_by(email=data['email']).first()
if existing and existing.id != current_admin.id:
return jsonify({'error': 'Email already in use'}), 400
current_admin.email = data['email']
db.session.commit()
# Log action
log = AuditLog(
admin_id=current_admin.id,
action='update_profile',
details=f"Updated profile information",
ip_address=request.remote_addr
)
db.session.add(log)
db.session.commit()
return jsonify({
'status': 'success',
'message': 'Profile updated successfully',
'admin': current_admin.to_dict()
}), 200
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
@auth_bp.route('/change-password', methods=['POST'])
@token_required
def change_password(current_admin):
"""Change admin password"""
try:
data = request.get_json()
current_password = data.get('current_password')
new_password = data.get('new_password')
if not current_password or not new_password:
return jsonify({'error': 'Current and new password required'}), 400
# Verify current password
if not current_admin.check_password(current_password):
return jsonify({'error': 'Current password is incorrect'}), 401
# Validate new password
if len(new_password) < 8:
return jsonify({'error': 'New password must be at least 8 characters'}), 400
# Update password
current_admin.set_password(new_password)
db.session.commit()
# Log action
log = AuditLog(
admin_id=current_admin.id,
action='change_password',
details='Password changed successfully',
ip_address=request.remote_addr
)
db.session.add(log)
db.session.commit()
return jsonify({
'status': 'success',
'message': 'Password changed successfully'
}), 200
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500