Files
TeleSec/python_sdk/windows_agent.py

263 lines
8.5 KiB
Python

import argparse
import ctypes
import json
import os
import socket
import subprocess
import sys
import time
from datetime import datetime, timezone
from typing import Any, Dict, Optional
import psutil
try:
from telesec_couchdb import TeleSecCouchDB, TeleSecCouchDBError, ts_decrypt
except ImportError:
from .telesec_couchdb import TeleSecCouchDB, TeleSecCouchDBError, ts_decrypt
def utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
def parse_iso(value: str) -> Optional[datetime]:
if not value:
return None
try:
v = value.strip().replace("Z", "+00:00")
dt = datetime.fromisoformat(v)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except Exception:
return None
def get_current_username() -> str:
try:
return psutil.Process().username() or os.getlogin()
except Exception:
try:
return os.getlogin()
except Exception:
return os.environ.get("USERNAME", "")
def _window_title(hwnd: int) -> str:
buf_len = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
if buf_len <= 0:
return ""
buf = ctypes.create_unicode_buffer(buf_len + 1)
ctypes.windll.user32.GetWindowTextW(hwnd, buf, buf_len + 1)
return buf.value or ""
def get_active_app() -> Dict[str, str]:
exe = ""
title = ""
try:
hwnd = ctypes.windll.user32.GetForegroundWindow()
if hwnd:
title = _window_title(hwnd)
pid = ctypes.c_ulong()
ctypes.windll.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
if pid.value:
try:
proc = psutil.Process(pid.value)
exe = proc.name() or ""
except Exception:
exe = ""
except Exception:
pass
return {"exe": exe, "title": title}
def build_payload(machine_id: str) -> Dict[str, Any]:
app = get_active_app()
return {
"Hostname": machine_id,
"UsuarioActual": get_current_username(),
"AppActualEjecutable": app.get("exe", ""),
"AppActualTitulo": app.get("title", ""),
# campo local diagnóstico (no se usa para decisión remota)
"AgentLocalSeenAt": utcnow_iso(),
}
def should_shutdown(data: Dict[str, Any], server_now: datetime) -> bool:
target = parse_iso(str(data.get("ShutdownBeforeDate", "") or ""))
if not target:
return False
return server_now >= target
def execute_shutdown(dry_run: bool = False) -> None:
if dry_run:
print("[DRY-RUN] Ejecutaría: shutdown /s /t 0 /f")
return
subprocess.run(["shutdown", "/s", "/t", "0", "/f"], check=False)
def run_once(client: TeleSecCouchDB, machine_id: str, dry_run: bool = False) -> None:
server_now = client.get_server_datetime()
server_now_iso = server_now.isoformat(timespec="milliseconds").replace("+00:00", "Z")
raw = client.get(table="aulas_ordenadores", item_id=machine_id, decrypt=False)
current: Dict[str, Any] = {}
if raw is not None:
current = ts_decrypt(raw, client.secret)
if not isinstance(current, dict):
current = {}
update = build_payload(machine_id)
update["LastSeenAt"] = server_now_iso
for key in ["ShutdownBeforeDate", "ShutdownRequestedAt", "ShutdownRequestedBy"]:
if key in current:
update[key] = current.get(key)
client.put(table="aulas_ordenadores", item_id=machine_id, data=update, encrypt=True)
if should_shutdown(update, server_now):
print(f"[{server_now_iso}] ShutdownBeforeDate alcanzado. Apagando {machine_id}...")
execute_shutdown(dry_run=dry_run)
else:
print(
f"[{server_now_iso}] Reportado {machine_id} user={update.get('UsuarioActual','')} "
f"exe={update.get('AppActualEjecutable','')} title={update.get('AppActualTitulo','')}"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="TeleSec Windows Agent")
parser.add_argument("--server", default="", help="CouchDB server URL, ej. https://couch.example")
parser.add_argument("--db", default="telesec", help="Database name")
parser.add_argument("--user", default="", help="CouchDB username")
parser.add_argument("--password", default="", help="CouchDB password")
parser.add_argument("--secret", default="", help="TeleSec secret para cifrado")
parser.add_argument("--machine-id", default="", help="ID de máquina (default: hostname)")
parser.add_argument("--interval", type=int, default=15, help="Intervalo en segundos")
parser.add_argument("--once", action="store_true", help="Ejecutar una sola iteración")
parser.add_argument("--dry-run", action="store_true", help="No apagar realmente, solo log")
parser.add_argument(
"--config",
default="",
help="Ruta de config JSON (default: ~/.telesec/windows_agent.json)",
)
return parser.parse_args()
def _default_config_path() -> str:
return os.path.join(os.path.expanduser("~"), ".telesec", "windows_agent.json")
def _load_or_init_config(path: str) -> Dict[str, Any]:
if not os.path.exists(path):
os.makedirs(os.path.dirname(path), exist_ok=True)
default_cfg = {
"server": "https://tu-couchdb",
"db": "telesec",
"user": "",
"password": "",
"secret": "",
"machine_id": "",
"interval": 15,
}
with open(path, "w", encoding="utf-8") as f:
json.dump(default_cfg, f, ensure_ascii=False, indent=2)
return default_cfg
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return data
return {}
def _save_config(path: str, data: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _pick(cli_value: Any, cfg_value: Any, default_value: Any = None) -> Any:
if cli_value is None:
return cfg_value if cfg_value not in [None, ""] else default_value
if isinstance(cli_value, str):
if cli_value.strip() == "":
return cfg_value if cfg_value not in [None, ""] else default_value
return cli_value
return cli_value
def main() -> int:
args = parse_args()
config_path = args.config or _default_config_path()
try:
cfg = _load_or_init_config(config_path)
except Exception as exc:
print(f"No se pudo cargar/crear config en {config_path}: {exc}", file=sys.stderr)
return 3
server = _pick(args.server, cfg.get("server"), "")
db = _pick(args.db, cfg.get("db"), "telesec")
user = _pick(args.user, cfg.get("user"), "")
password = _pick(args.password, cfg.get("password"), "")
secret = _pick(args.secret, cfg.get("secret"), "")
machine_id = _pick(args.machine_id, cfg.get("machine_id"), "")
interval = _pick(args.interval, cfg.get("interval"), 15)
machine_id = (machine_id or socket.gethostname() or "unknown-host").strip()
if not server or not secret:
print(
"Falta configuración obligatoria. Edita el JSON en: " + config_path,
file=sys.stderr,
)
return 4
# Persist effective parameters for next runs
try:
persistent_cfg = {
"server": server,
"db": db,
"user": user,
"password": password,
"secret": secret,
"machine_id": machine_id,
"interval": int(interval),
}
_save_config(config_path, persistent_cfg)
except Exception as exc:
print(f"No se pudo guardar config en {config_path}: {exc}", file=sys.stderr)
client = TeleSecCouchDB(
server_url=server,
dbname=db,
secret=secret,
username=user or None,
password=password or None,
)
try:
client.check_connection()
except TeleSecCouchDBError as exc:
print(f"Error de conexión CouchDB: {exc}", file=sys.stderr)
return 2
if args.once:
run_once(client=client, machine_id=machine_id, dry_run=args.dry_run)
return 0
while True:
try:
run_once(client=client, machine_id=machine_id, dry_run=args.dry_run)
except Exception as exc:
print(f"Error en iteración agente: {exc}", file=sys.stderr)
time.sleep(max(5, int(interval)))
if __name__ == "__main__":
raise SystemExit(main())