import argparse import ctypes import json import os import socket import subprocess import sys import time from datetime import datetime, timezone from typing import Any, Dict, Optional import psutil try: from .telesec_couchdb import TeleSecCouchDB, TeleSecCouchDBError, ts_decrypt except ImportError: from telesec_couchdb import TeleSecCouchDB, TeleSecCouchDBError, ts_decrypt def utcnow_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") def parse_iso(value: str) -> Optional[datetime]: if not value: return None try: v = value.strip().replace("Z", "+00:00") dt = datetime.fromisoformat(v) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return None def get_current_username() -> str: try: return psutil.Process().username() or os.getlogin() except Exception: try: return os.getlogin() except Exception: return os.environ.get("USERNAME", "") def _window_title(hwnd: int) -> str: buf_len = ctypes.windll.user32.GetWindowTextLengthW(hwnd) if buf_len <= 0: return "" buf = ctypes.create_unicode_buffer(buf_len + 1) ctypes.windll.user32.GetWindowTextW(hwnd, buf, buf_len + 1) return buf.value or "" def get_active_app() -> Dict[str, str]: exe = "" title = "" try: hwnd = ctypes.windll.user32.GetForegroundWindow() if hwnd: title = _window_title(hwnd) pid = ctypes.c_ulong() ctypes.windll.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) if pid.value: try: proc = psutil.Process(pid.value) exe = proc.name() or "" except Exception: exe = "" except Exception: pass return {"exe": exe, "title": title} def build_payload(machine_id: str) -> Dict[str, Any]: app = get_active_app() return { "Hostname": machine_id, "UsuarioActual": get_current_username(), "AppActualEjecutable": app.get("exe", ""), "AppActualTitulo": app.get("title", ""), # campo local diagnóstico (no se usa para decisión remota) "AgentLocalSeenAt": utcnow_iso(), } def should_shutdown(data: Dict[str, Any], server_now: datetime) -> bool: target = parse_iso(str(data.get("ShutdownBeforeDate", "") or "")) if not target: return False return server_now >= target def execute_shutdown(dry_run: bool = False) -> None: if dry_run: print("[DRY-RUN] Ejecutaría: shutdown /s /t 0 /f") return subprocess.run(["shutdown", "/s", "/t", "0", "/f"], check=False) def run_once(client: TeleSecCouchDB, machine_id: str, dry_run: bool = False) -> None: server_now = client.get_server_datetime() server_now_iso = server_now.isoformat(timespec="milliseconds").replace("+00:00", "Z") raw = client.get(table="aulas_ordenadores", item_id=machine_id, decrypt=False) current: Dict[str, Any] = {} if raw is not None: current = ts_decrypt(raw, client.secret) if not isinstance(current, dict): current = {} update = build_payload(machine_id) update["LastSeenAt"] = server_now_iso for key in ["ShutdownBeforeDate", "ShutdownRequestedAt", "ShutdownRequestedBy"]: if key in current: update[key] = current.get(key) client.put(table="aulas_ordenadores", item_id=machine_id, data=update, encrypt=True) if should_shutdown(update, server_now): print(f"[{server_now_iso}] ShutdownBeforeDate alcanzado. Apagando {machine_id}...") execute_shutdown(dry_run=dry_run) else: print( f"[{server_now_iso}] Reportado {machine_id} user={update.get('UsuarioActual','')} " f"exe={update.get('AppActualEjecutable','')} title={update.get('AppActualTitulo','')}" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="TeleSec Windows Agent") parser.add_argument("--server", default="", help="CouchDB server URL, ej. https://couch.example") parser.add_argument("--db", default="telesec", help="Database name") parser.add_argument("--user", default="", help="CouchDB username") parser.add_argument("--password", default="", help="CouchDB password") parser.add_argument("--secret", default="", help="TeleSec secret para cifrado") parser.add_argument("--machine-id", default="", help="ID de máquina (default: hostname)") parser.add_argument("--interval", type=int, default=15, help="Intervalo en segundos") parser.add_argument("--once", action="store_true", help="Ejecutar una sola iteración") parser.add_argument("--dry-run", action="store_true", help="No apagar realmente, solo log") parser.add_argument( "--config", default="", help="Ruta de config JSON (default: ~/.telesec/windows_agent.json)", ) return parser.parse_args() def _default_config_path() -> str: return os.path.join(os.path.expanduser("~"), ".telesec", "windows_agent.json") def _load_or_init_config(path: str) -> Dict[str, Any]: if not os.path.exists(path): os.makedirs(os.path.dirname(path), exist_ok=True) default_cfg = { "server": "https://tu-couchdb", "db": "telesec", "user": "", "password": "", "secret": "", "machine_id": "", "interval": 15, } with open(path, "w", encoding="utf-8") as f: json.dump(default_cfg, f, ensure_ascii=False, indent=2) return default_cfg with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): return data return {} def _save_config(path: str, data: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def _pick(cli_value: Any, cfg_value: Any, default_value: Any = None) -> Any: if cli_value is None: return cfg_value if cfg_value not in [None, ""] else default_value if isinstance(cli_value, str): if cli_value.strip() == "": return cfg_value if cfg_value not in [None, ""] else default_value return cli_value return cli_value def main() -> int: args = parse_args() config_path = args.config or _default_config_path() try: cfg = _load_or_init_config(config_path) except Exception as exc: print(f"No se pudo cargar/crear config en {config_path}: {exc}", file=sys.stderr) return 3 server = _pick(args.server, cfg.get("server"), "") db = _pick(args.db, cfg.get("db"), "telesec") user = _pick(args.user, cfg.get("user"), "") password = _pick(args.password, cfg.get("password"), "") secret = _pick(args.secret, cfg.get("secret"), "") machine_id = _pick(args.machine_id, cfg.get("machine_id"), "") interval = _pick(args.interval, cfg.get("interval"), 15) machine_id = (machine_id or socket.gethostname() or "unknown-host").strip() if not server or not secret: print( "Falta configuración obligatoria. Edita el JSON en: " + config_path, file=sys.stderr, ) return 4 # Persist effective parameters for next runs try: persistent_cfg = { "server": server, "db": db, "user": user, "password": password, "secret": secret, "machine_id": machine_id, "interval": int(interval), } _save_config(config_path, persistent_cfg) except Exception as exc: print(f"No se pudo guardar config en {config_path}: {exc}", file=sys.stderr) client = TeleSecCouchDB( server_url=server, dbname=db, secret=secret, username=user or None, password=password or None, ) try: client.check_connection() except TeleSecCouchDBError as exc: print(f"Error de conexión CouchDB: {exc}", file=sys.stderr) return 2 if args.once: run_once(client=client, machine_id=machine_id, dry_run=args.dry_run) return 0 while True: try: run_once(client=client, machine_id=machine_id, dry_run=args.dry_run) except Exception as exc: print(f"Error en iteración agente: {exc}", file=sys.stderr) time.sleep(max(5, int(interval))) if __name__ == "__main__": raise SystemExit(main())