Files
SIB/backend/app/api/auth/__init__.py

315 lines
11 KiB
Python
Raw Normal View History

2026-02-21 09:53:31 -05:00
import os
import json
2026-02-21 09:53:31 -05:00
import shutil
from uuid import uuid4
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status, Depends, UploadFile, File, Form
from sqlmodel import Session, select
from app.core.database import get_session
from app.core.security import verify_password, get_password_hash, create_access_token, get_token_payload
from app.models.user import User, DriverProfile, UserRole, VehicleType
from app.api.deps import oauth2_scheme
from app.schemas.user import PassengerCreate, Token, UserResponse, LoginRequest, GoogleLoginRequest
import firebase_admin
from firebase_admin import auth as firebase_auth, credentials
2026-02-21 09:53:31 -05:00
# Initialize Firebase Admin SDK
# Supports two methods:
# 1. FIREBASE_SERVICE_ACCOUNT_JSON env var with the full JSON as a string (recommended for Render/production)
# 2. GOOGLE_APPLICATION_CREDENTIALS env var pointing to a JSON file path (local dev)
try:
if not firebase_admin._apps:
sa_json = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON")
if sa_json:
# Parse the JSON string directly from environment variable
sa_dict = json.loads(sa_json)
cred = credentials.Certificate(sa_dict)
firebase_admin.initialize_app(cred)
print("DEBUG: Firebase Admin initialized from FIREBASE_SERVICE_ACCOUNT_JSON env var")
elif os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
# Use file path from GOOGLE_APPLICATION_CREDENTIALS
firebase_admin.initialize_app()
print("DEBUG: Firebase Admin initialized from GOOGLE_APPLICATION_CREDENTIALS file")
else:
print("WARNING: No Firebase credentials found. Set FIREBASE_SERVICE_ACCOUNT_JSON or GOOGLE_APPLICATION_CREDENTIALS.")
firebase_admin.initialize_app()
except Exception as e:
print(f"WARNING: Firebase Admin could not be initialized: {e}")
2026-02-21 09:53:31 -05:00
router = APIRouter(prefix="/api/auth", tags=["auth"])
UPLOAD_DIR = "uploads"
@router.post("/login", response_model=Token)
async def login(
data: LoginRequest,
session: Session = Depends(get_session)
):
print(f"DEBUG: Login attempt for email: {data.email}")
user = session.exec(select(User).where(User.email == data.email)).first()
if not user:
print(f"DEBUG: User not found: {data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(data.password, user.hashed_password):
print(f"DEBUG: Invalid password for: {data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
print(f"DEBUG: Successful login for: {data.email} as {user.role}")
# Token expiration can be extended if keep_session is true
import datetime
expires = datetime.timedelta(days=30) if data.keep_session else datetime.timedelta(days=1)
access_token = create_access_token(
subject=user.id,
role=user.role,
full_name=user.full_name,
expires_delta=expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.role,
"full_name": user.full_name,
"profile_photo_url": user.profile_photo_url
}
@router.post("/google", response_model=Token)
async def google_login(
data: GoogleLoginRequest,
session: Session = Depends(get_session)
):
try:
# Verify the ID token sent by the frontend
decoded_token = firebase_auth.verify_id_token(data.id_token)
email = decoded_token.get("email")
full_name = decoded_token.get("name", "")
profile_photo = decoded_token.get("picture", "")
# Check if user exists
user = session.exec(select(User).where(User.email == email)).first()
if not user:
# Create new user if it doesn't exist (Passenger as default)
user = User(
email=email,
full_name=full_name,
hashed_password=get_password_hash(str(uuid4())), # Random pass, won't be used
role=UserRole.PASSENGER,
profile_photo_url=profile_photo,
is_verified=True
)
session.add(user)
session.commit()
session.refresh(user)
print(f"DEBUG: Created new user via Google: {email}")
# Create access token
import datetime
expires = datetime.timedelta(days=30)
access_token = create_access_token(
subject=user.id,
role=user.role,
full_name=user.full_name,
expires_delta=expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.role,
"full_name": user.full_name,
"profile_photo_url": user.profile_photo_url
}
except Exception as e:
print(f"DEBUG: Google Login failed: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid Google Token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
2026-02-21 09:53:31 -05:00
@router.post("/register/passenger", response_model=UserResponse)
async def register_passenger(
data: PassengerCreate,
session: Session = Depends(get_session)
):
# Check if user exists
existing_user = session.exec(select(User).where(User.email == data.email)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
new_user = User(
email=data.email,
full_name=data.full_name,
hashed_password=get_password_hash(data.password),
role=UserRole.PASSENGER
)
session.add(new_user)
session.commit()
session.refresh(new_user)
return new_user
@router.post("/register/driver", response_model=UserResponse)
async def register_driver(
full_name: str = Form(...),
email: str = Form(...),
phone_number: str = Form(...),
password: str = Form(...),
cedula: str = Form(...),
vehicle_type: VehicleType = Form(...),
license_plate: str = Form(...),
cooperative_name: Optional[str] = Form(None),
profile_photo: Optional[UploadFile] = File(None),
vehicle_photo: UploadFile = File(...),
shift: Optional[str] = Form(None),
payment_methods: Optional[str] = Form(None),
speaks_english: bool = Form(False),
session: Session = Depends(get_session)
):
# Check if user exists
existing_user = session.exec(select(User).where(User.email == email)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Save photos
profile_photo_url = None
if profile_photo:
ext = os.path.splitext(profile_photo.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "profiles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(profile_photo.file, buffer)
profile_photo_url = f"/uploads/profiles/{filename}"
ext_v = os.path.splitext(vehicle_photo.filename)[1]
v_filename = f"{uuid4()}{ext_v}"
v_path = os.path.join(UPLOAD_DIR, "vehicles", v_filename)
with open(v_path, "wb") as buffer:
shutil.copyfileobj(vehicle_photo.file, buffer)
vehicle_photo_url = f"/uploads/vehicles/{v_filename}"
# Create User
new_user = User(
email=email,
full_name=full_name,
hashed_password=get_password_hash(password),
role=UserRole.DRIVER,
is_verified=True, # Auto verify since it's admin registered now
profile_photo_url=profile_photo_url
)
session.add(new_user)
session.commit()
session.refresh(new_user)
# Create Driver Profile
profile = DriverProfile(
user_id=new_user.id,
cedula=cedula,
vehicle_type=vehicle_type,
license_plate=license_plate,
photo_url=profile_photo_url,
vehicle_photo_url=vehicle_photo_url,
cooperative_name=cooperative_name,
shift=shift,
payment_methods=payment_methods,
speaks_english=speaks_english
)
session.add(profile)
session.commit()
return new_user
@router.get("/me")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Session = Depends(get_session)
):
"""Get current logged in user details."""
payload = get_token_payload(token)
user_id = payload.get("sub")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
result = {
"id": user.id,
"email": user.email,
"full_name": user.full_name,
"role": user.role,
"is_verified": user.is_verified,
"profile_photo_url": user.profile_photo_url,
"driver_profile": None
}
if user.driver_profile:
dp = user.driver_profile
result["driver_profile"] = {
"cedula": dp.cedula,
"vehicle_type": dp.vehicle_type,
"license_plate": dp.license_plate,
"cooperative_name": dp.cooperative_name,
"photo_url": dp.photo_url,
"shift": dp.shift,
"payment_methods": dp.payment_methods,
"speaks_english": dp.speaks_english
}
return result
@router.patch("/me", response_model=UserResponse)
async def update_me(
full_name: Optional[str] = Form(None),
password: Optional[str] = Form(None),
profile_photo: Optional[UploadFile] = File(None),
token: Annotated[str, Depends(oauth2_scheme)] = None,
session: Session = Depends(get_session)
):
"""Update current user profile info."""
payload = get_token_payload(token)
user_id = payload.get("sub")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if full_name:
user.full_name = full_name
if password:
user.hashed_password = get_password_hash(password)
if profile_photo:
# Create directory if not exists
profile_dir = os.path.join(UPLOAD_DIR, "profiles")
os.makedirs(profile_dir, exist_ok=True)
ext = os.path.splitext(profile_photo.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(profile_dir, filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(profile_photo.file, buffer)
user.profile_photo_url = f"/uploads/profiles/{filename}"
# If user is driver, also update driver profile photo_url for backwards compatibility/sync
if user.driver_profile:
user.driver_profile.photo_url = user.profile_photo_url
session.add(user.driver_profile)
session.add(user)
session.commit()
session.refresh(user)
return user