3 Commits

Author SHA1 Message Date
Naiel
f655a736b3 fix 2026-03-03 10:59:02 +01:00
Naiel
89a68f27da fix 2026-03-03 10:50:05 +01:00
Naiel
4d322e5696 fix 2026-03-03 10:36:42 +01:00
2 changed files with 40 additions and 80 deletions

View File

@@ -76,52 +76,32 @@ def ts_encrypt(input_value: Any, secret: str) -> str:
return f"RSA{{{b64}}}"
def ts_decrypt(input_value: Any, secret: str) -> Any:
"""
Compatible with JS TS_decrypt behavior:
- If not string: return as-is.
- If RSA{...}: decrypt AES(CryptoJS passphrase mode), parse JSON when possible.
- If plain string JSON: parse JSON.
- Else: return raw string.
"""
def ts_encrypt(input_value: Any, secret: str) -> str:
if not isinstance(input_value, str):
return input_value
payload = json.dumps(input_value, separators=(",", ":"), ensure_ascii=False)
else:
payload = input_value
is_wrapped = input_value.startswith("RSA{") and input_value.endswith("}")
if is_wrapped:
if not secret:
raise TeleSecCryptoError("Secret is required to decrypt RSA payload")
b64 = input_value[4:-1]
try:
raw = base64.b64decode(b64)
except Exception as exc:
raise TeleSecCryptoError("Invalid base64 payload") from exc
payload_bytes = payload.encode("utf-8")
salt = os.urandom(8)
if len(raw) < 16 or not raw.startswith(b"Salted__"):
raise TeleSecCryptoError("Unsupported encrypted payload format")
# OpenSSL EVP_BytesToKey (MD5)
dx = b""
salted = b""
while len(salted) < 48: # 32 key + 16 iv
dx = hashlib.md5(dx + secret.encode() + salt).digest()
salted += dx
salt = raw[8:16]
ciphertext = raw[16:]
key, iv = _evp_bytes_to_key(secret.encode("utf-8"), salt, 32, 16)
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted = cipher.decrypt(ciphertext)
decrypted = _pkcs7_unpad(decrypted, 16)
key = salted[:32]
iv = salted[32:48]
try:
text = decrypted.decode("utf-8")
except UnicodeDecodeError:
text = decrypted.decode("latin-1")
cipher = AES.new(key, AES.MODE_CBC, iv)
encrypted = cipher.encrypt(_pkcs7_pad(payload_bytes, 16))
try:
return json.loads(text)
except Exception:
return text
try:
return json.loads(input_value)
except Exception:
return input_value
openssl_blob = b"Salted__" + salt + encrypted
b64 = base64.b64encode(openssl_blob).decode("utf-8")
return f"RSA{{{b64}}}"
@dataclass
class TeleSecDoc:

View File

@@ -7,7 +7,7 @@ import subprocess
import sys
import time
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
import psutil
import base64
@@ -84,52 +84,32 @@ def ts_encrypt(input_value: Any, secret: str) -> str:
return f"RSA{{{b64}}}"
def ts_decrypt(input_value: Any, secret: str) -> Any:
"""
Compatible with JS TS_decrypt behavior:
- If not string: return as-is.
- If RSA{...}: decrypt AES(CryptoJS passphrase mode), parse JSON when possible.
- If plain string JSON: parse JSON.
- Else: return raw string.
"""
def ts_encrypt(input_value: Any, secret: str) -> str:
if not isinstance(input_value, str):
return input_value
payload = json.dumps(input_value, separators=(",", ":"), ensure_ascii=False)
else:
payload = input_value
is_wrapped = input_value.startswith("RSA{") and input_value.endswith("}")
if is_wrapped:
if not secret:
raise TeleSecCryptoError("Secret is required to decrypt RSA payload")
b64 = input_value[4:-1]
try:
raw = base64.b64decode(b64)
except Exception as exc:
raise TeleSecCryptoError("Invalid base64 payload") from exc
payload_bytes = payload.encode("utf-8")
salt = os.urandom(8)
if len(raw) < 16 or not raw.startswith(b"Salted__"):
raise TeleSecCryptoError("Unsupported encrypted payload format")
# OpenSSL EVP_BytesToKey (MD5)
dx = b""
salted = b""
while len(salted) < 48: # 32 key + 16 iv
dx = hashlib.md5(dx + secret.encode() + salt).digest()
salted += dx
salt = raw[8:16]
ciphertext = raw[16:]
key, iv = _evp_bytes_to_key(secret.encode("utf-8"), salt, 32, 16)
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted = cipher.decrypt(ciphertext)
decrypted = _pkcs7_unpad(decrypted, 16)
key = salted[:32]
iv = salted[32:48]
try:
text = decrypted.decode("utf-8")
except UnicodeDecodeError:
text = decrypted.decode("latin-1")
cipher = AES.new(key, AES.MODE_CBC, iv)
encrypted = cipher.encrypt(_pkcs7_pad(payload_bytes, 16))
try:
return json.loads(text)
except Exception:
return text
try:
return json.loads(input_value)
except Exception:
return input_value
openssl_blob = b"Salted__" + salt + encrypted
b64 = base64.b64encode(openssl_blob).decode("utf-8")
return f"RSA{{{b64}}}"
@dataclass
class TeleSecDoc:
@@ -415,7 +395,7 @@ def run_once(client: TeleSecCouchDB, machine_id: str, dry_run: bool = False) ->
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="TeleSec Windows Agent")
parser.add_argument("--server", default="", help="CouchDB server URL, ej. https://couch.example")
parser.add_argument("--db", default="telesec", help="Database name")
parser.add_argument("--db", default="", help="Database name")
parser.add_argument("--user", default="", help="CouchDB username")
parser.add_argument("--password", default="", help="CouchDB password")
parser.add_argument("--secret", default="", help="TeleSec secret para cifrado")