Files
Axia4/django_app/core/auth.py
2026-03-16 12:43:36 +00:00

302 lines
9.7 KiB
Python

import hashlib
import secrets
from dataclasses import dataclass
from typing import Optional
import bcrypt
from django.conf import settings
from django.contrib.auth.hashers import check_password, identify_hasher, make_password
from django.http import HttpRequest
from django.utils import timezone
from .models import Organization, User, UserOrg, UserSession
SESSION_USER_KEY = "axia4.user"
SESSION_ACTIVE_ORG_KEY = "axia4.active_org"
@dataclass
class AxiaUser:
record: User
active_org: str
active_org_name: str
organizations: list[dict]
permissions: list[str]
org_permissions: list[str]
permissions_display: str
aulas: list[str]
@property
def username(self) -> str:
return self.record.username
@property
def display_name(self) -> str:
return self.record.display_name or self.record.username
@property
def email(self) -> str:
return self.record.email
@property
def initials(self) -> str:
parts = [part for part in self.display_name.split() if part]
first = parts[0][0] if parts else "?"
second = parts[1][0] if len(parts) > 1 else ""
return (first + second).upper()
@property
def google_auth(self) -> bool:
return bool(self.record.google_auth)
def has_permission(self, permission: str) -> bool:
return permission in self.permissions
@property
def role(self) -> str:
# Alias legacy para plantillas/codigo existente.
return self.permissions_display
def _session_token(request: HttpRequest) -> Optional[str]:
session_key = request.session.session_key
if not session_key:
return None
return hashlib.sha256(session_key.encode("utf-8")).hexdigest()
def _remember_hash(raw_token: str) -> str:
return hashlib.sha256(raw_token.encode("utf-8")).hexdigest()
def _normalize_password_hash(password_hash: str) -> bytes:
normalized = password_hash or ""
if normalized.startswith("$2y$") or normalized.startswith("$2a$"):
normalized = "$2b$" + normalized[4:]
return normalized.encode("utf-8")
def _is_django_password_hash(password_hash: str) -> bool:
try:
identify_hasher(password_hash)
return True
except Exception:
return False
def get_user(identifier: str) -> Optional[User]:
identifier = (identifier or "").strip().lower()
if not identifier:
return None
if "@" in identifier:
return User.objects.filter(email__iexact=identifier).first()
return User.objects.filter(username__iexact=identifier).first()
def verify_password(user: User, raw_password: str) -> bool:
if not user or not user.password_hash:
return False
if _is_django_password_hash(user.password_hash):
return check_password(raw_password, user.password_hash)
try:
valid = bcrypt.checkpw(raw_password.encode("utf-8"), _normalize_password_hash(user.password_hash))
except ValueError:
return False
if valid:
user.password_hash = make_password(raw_password)
user.save(update_fields=["password_hash"])
return valid
def authenticate(identifier: str, raw_password: str) -> Optional[User]:
user = get_user(identifier)
if user and verify_password(user, raw_password):
return user
return None
def _organization_links(user: User) -> list[UserOrg]:
return list(
UserOrg.objects.filter(user=user)
.select_related("org")
.order_by("org__org_id")
)
def build_axia_user(user: User, request: Optional[HttpRequest] = None) -> AxiaUser:
links = _organization_links(user)
global_permissions = [str(value) for value in user.permissions_data if isinstance(value, str)]
organizations = [
{
"id": link.org.org_id,
"name": link.org.org_name or link.org.org_id,
"permissions": link.permissions_display,
"permissions_values": link.permissions_values,
"aulas": link.aulas,
}
for link in links
]
session_active = request.session.get(SESSION_ACTIVE_ORG_KEY) if request else None
org_ids = [org["id"] for org in organizations]
active_org = session_active if session_active in org_ids else (org_ids[0] if org_ids else "")
active_link = next((org for org in organizations if org["id"] == active_org), None)
active_org_permissions = [str(value) for value in (active_link.get("permissions_values", []) if active_link else []) if isinstance(value, str)]
merged_permissions = []
seen = set()
for value in global_permissions + active_org_permissions:
if value and value not in seen:
merged_permissions.append(value)
seen.add(value)
return AxiaUser(
record=user,
active_org=active_org,
active_org_name=active_link["name"] if active_link else active_org,
organizations=organizations,
permissions=merged_permissions,
org_permissions=active_org_permissions,
permissions_display=active_link["permissions"] if active_link else "",
aulas=active_link["aulas"] if active_link else [],
)
def _touch_session(token: Optional[str]) -> None:
if token:
UserSession.objects.filter(session_token=token).update(last_active=timezone.now())
def load_user_from_session(request: HttpRequest) -> Optional[AxiaUser]:
username = request.session.get(SESSION_USER_KEY)
if not username:
return None
token = _session_token(request)
if not token:
return None
valid = UserSession.objects.filter(session_token=token, username__iexact=username).exists()
if not valid:
request.session.flush()
request._axia_clear_auth_cookie = True
return None
user = get_user(username)
if not user:
request.session.flush()
request._axia_clear_auth_cookie = True
return None
_touch_session(token)
return build_axia_user(user, request)
def restore_user_from_cookie(request: HttpRequest) -> Optional[AxiaUser]:
raw_token = request.COOKIES.get(settings.AXIA4_AUTH_COOKIE)
if not raw_token:
return None
remember_hash = _remember_hash(raw_token)
session_row = UserSession.objects.filter(remember_token_hash=remember_hash).first()
if not session_row:
request._axia_clear_auth_cookie = True
return None
user = get_user(session_row.username)
if not user:
request._axia_clear_auth_cookie = True
return None
request.session.cycle_key()
request.session[SESSION_USER_KEY] = user.username
request.session.setdefault(SESSION_ACTIVE_ORG_KEY, request.session.get(SESSION_ACTIVE_ORG_KEY, ""))
request.session.save()
new_token = _session_token(request)
if new_token:
session_row.session_token = new_token
session_row.last_active = timezone.now()
session_row.save(update_fields=["session_token", "last_active"])
return build_axia_user(user, request)
def login_user(request: HttpRequest, user: User) -> AxiaUser:
request.session.flush()
request.session.cycle_key()
request.session[SESSION_USER_KEY] = user.username
axia_user = build_axia_user(user, request)
request.session[SESSION_ACTIVE_ORG_KEY] = axia_user.active_org
request.session.save()
token = _session_token(request)
raw_remember = secrets.token_hex(32)
remember_hash = _remember_hash(raw_remember)
if token:
ip_address = (request.META.get("HTTP_X_FORWARDED_FOR") or request.META.get("REMOTE_ADDR") or "").split(",")[0].strip()
user_agent = (request.META.get("HTTP_USER_AGENT") or "")[:512]
UserSession.objects.update_or_create(
session_token=token,
defaults={
"username": user.username.lower(),
"ip_address": ip_address,
"user_agent": user_agent,
"remember_token_hash": remember_hash,
},
)
request._axia_auth_cookie_value = raw_remember
return build_axia_user(user, request)
def logout_user(request: HttpRequest) -> None:
token = _session_token(request)
if token:
UserSession.objects.filter(session_token=token).delete()
request.session.flush()
request._axia_clear_auth_cookie = True
def set_active_organization(request: HttpRequest, organization_id: str) -> None:
request.session[SESSION_ACTIVE_ORG_KEY] = organization_id
def parse_user_agent(user_agent: str) -> dict:
ua = user_agent or ""
operating_system = "Desconocido"
if "Android" in ua:
operating_system = "Android"
elif "iPhone" in ua or "iPad" in ua:
operating_system = "iOS"
elif "Windows" in ua:
operating_system = "Windows"
elif "Macintosh" in ua or "Mac OS" in ua:
operating_system = "macOS"
elif "Linux" in ua:
operating_system = "Linux"
elif "CrOS" in ua:
operating_system = "ChromeOS"
browser = "Desconocido"
if ua.startswith("Axia4Auth/"):
browser = "Axia4 App"
elif "Edg/" in ua:
browser = "Edge"
elif "OPR/" in ua or "Opera" in ua:
browser = "Opera"
elif "Chrome" in ua:
browser = "Chrome"
elif "Firefox" in ua:
browser = "Firefox"
elif "Safari" in ua:
browser = "Safari"
icons = {
"Chrome": "🌐",
"Firefox": "🦊",
"Safari": "🧭",
"Edge": "🔷",
"Opera": "🔴",
"Axia4 App": "📱",
}
return {
"browser": browser,
"os": operating_system,
"icon": icons.get(browser, "💻"),
}
def list_user_sessions(username: str):
return UserSession.objects.filter(username__iexact=username).order_by("-last_active")
def organization_name_map() -> dict:
return {org.org_id: (org.org_name or org.org_id) for org in Organization.objects.all()}