fix
This commit is contained in:
@@ -76,52 +76,32 @@ def ts_encrypt(input_value: Any, secret: str) -> str:
|
|||||||
return f"RSA{{{b64}}}"
|
return f"RSA{{{b64}}}"
|
||||||
|
|
||||||
|
|
||||||
def ts_decrypt(input_value: Any, secret: str) -> Any:
|
def ts_encrypt(input_value: Any, secret: str) -> str:
|
||||||
"""
|
|
||||||
Compatible with JS TS_decrypt behavior:
|
|
||||||
- If not string: return as-is.
|
|
||||||
- If RSA{...}: decrypt AES(CryptoJS passphrase mode), parse JSON when possible.
|
|
||||||
- If plain string JSON: parse JSON.
|
|
||||||
- Else: return raw string.
|
|
||||||
"""
|
|
||||||
if not isinstance(input_value, str):
|
if not isinstance(input_value, str):
|
||||||
return input_value
|
payload = json.dumps(input_value, separators=(",", ":"), ensure_ascii=False)
|
||||||
|
else:
|
||||||
|
payload = input_value
|
||||||
|
|
||||||
is_wrapped = input_value.startswith("RSA{") and input_value.endswith("}")
|
payload_bytes = payload.encode("utf-8")
|
||||||
if is_wrapped:
|
salt = os.urandom(8)
|
||||||
if not secret:
|
|
||||||
raise TeleSecCryptoError("Secret is required to decrypt RSA payload")
|
|
||||||
b64 = input_value[4:-1]
|
|
||||||
try:
|
|
||||||
raw = base64.b64decode(b64)
|
|
||||||
except Exception as exc:
|
|
||||||
raise TeleSecCryptoError("Invalid base64 payload") from exc
|
|
||||||
|
|
||||||
if len(raw) < 16 or not raw.startswith(b"Salted__"):
|
# OpenSSL EVP_BytesToKey (MD5)
|
||||||
raise TeleSecCryptoError("Unsupported encrypted payload format")
|
dx = b""
|
||||||
|
salted = b""
|
||||||
|
while len(salted) < 48: # 32 key + 16 iv
|
||||||
|
dx = hashlib.md5(dx + secret.encode() + salt).digest()
|
||||||
|
salted += dx
|
||||||
|
|
||||||
salt = raw[8:16]
|
key = salted[:32]
|
||||||
ciphertext = raw[16:]
|
iv = salted[32:48]
|
||||||
key, iv = _evp_bytes_to_key(secret.encode("utf-8"), salt, 32, 16)
|
|
||||||
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
|
|
||||||
decrypted = cipher.decrypt(ciphertext)
|
|
||||||
decrypted = _pkcs7_unpad(decrypted, 16)
|
|
||||||
|
|
||||||
try:
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||||
text = decrypted.decode("utf-8")
|
encrypted = cipher.encrypt(_pkcs7_pad(payload_bytes, 16))
|
||||||
except UnicodeDecodeError:
|
|
||||||
text = decrypted.decode("latin-1")
|
|
||||||
|
|
||||||
try:
|
openssl_blob = b"Salted__" + salt + encrypted
|
||||||
return json.loads(text)
|
b64 = base64.b64encode(openssl_blob).decode("utf-8")
|
||||||
except Exception:
|
|
||||||
return text
|
|
||||||
|
|
||||||
try:
|
|
||||||
return json.loads(input_value)
|
|
||||||
except Exception:
|
|
||||||
return input_value
|
|
||||||
|
|
||||||
|
return f"RSA{{{b64}}}"
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TeleSecDoc:
|
class TeleSecDoc:
|
||||||
|
|||||||
@@ -84,52 +84,32 @@ def ts_encrypt(input_value: Any, secret: str) -> str:
|
|||||||
return f"RSA{{{b64}}}"
|
return f"RSA{{{b64}}}"
|
||||||
|
|
||||||
|
|
||||||
def ts_decrypt(input_value: Any, secret: str) -> Any:
|
def ts_encrypt(input_value: Any, secret: str) -> str:
|
||||||
"""
|
|
||||||
Compatible with JS TS_decrypt behavior:
|
|
||||||
- If not string: return as-is.
|
|
||||||
- If RSA{...}: decrypt AES(CryptoJS passphrase mode), parse JSON when possible.
|
|
||||||
- If plain string JSON: parse JSON.
|
|
||||||
- Else: return raw string.
|
|
||||||
"""
|
|
||||||
if not isinstance(input_value, str):
|
if not isinstance(input_value, str):
|
||||||
return input_value
|
payload = json.dumps(input_value, separators=(",", ":"), ensure_ascii=False)
|
||||||
|
else:
|
||||||
|
payload = input_value
|
||||||
|
|
||||||
is_wrapped = input_value.startswith("RSA{") and input_value.endswith("}")
|
payload_bytes = payload.encode("utf-8")
|
||||||
if is_wrapped:
|
salt = os.urandom(8)
|
||||||
if not secret:
|
|
||||||
raise TeleSecCryptoError("Secret is required to decrypt RSA payload")
|
|
||||||
b64 = input_value[4:-1]
|
|
||||||
try:
|
|
||||||
raw = base64.b64decode(b64)
|
|
||||||
except Exception as exc:
|
|
||||||
raise TeleSecCryptoError("Invalid base64 payload") from exc
|
|
||||||
|
|
||||||
if len(raw) < 16 or not raw.startswith(b"Salted__"):
|
# OpenSSL EVP_BytesToKey (MD5)
|
||||||
raise TeleSecCryptoError("Unsupported encrypted payload format")
|
dx = b""
|
||||||
|
salted = b""
|
||||||
|
while len(salted) < 48: # 32 key + 16 iv
|
||||||
|
dx = hashlib.md5(dx + secret.encode() + salt).digest()
|
||||||
|
salted += dx
|
||||||
|
|
||||||
salt = raw[8:16]
|
key = salted[:32]
|
||||||
ciphertext = raw[16:]
|
iv = salted[32:48]
|
||||||
key, iv = _evp_bytes_to_key(secret.encode("utf-8"), salt, 32, 16)
|
|
||||||
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
|
|
||||||
decrypted = cipher.decrypt(ciphertext)
|
|
||||||
decrypted = _pkcs7_unpad(decrypted, 16)
|
|
||||||
|
|
||||||
try:
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||||
text = decrypted.decode("utf-8")
|
encrypted = cipher.encrypt(_pkcs7_pad(payload_bytes, 16))
|
||||||
except UnicodeDecodeError:
|
|
||||||
text = decrypted.decode("latin-1")
|
|
||||||
|
|
||||||
try:
|
openssl_blob = b"Salted__" + salt + encrypted
|
||||||
return json.loads(text)
|
b64 = base64.b64encode(openssl_blob).decode("utf-8")
|
||||||
except Exception:
|
|
||||||
return text
|
|
||||||
|
|
||||||
try:
|
|
||||||
return json.loads(input_value)
|
|
||||||
except Exception:
|
|
||||||
return input_value
|
|
||||||
|
|
||||||
|
return f"RSA{{{b64}}}"
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TeleSecDoc:
|
class TeleSecDoc:
|
||||||
|
|||||||
Reference in New Issue
Block a user