Files
TeleSec/python_sdk/telesec_couchdb.py
Naiel f655a736b3 fix
2026-03-03 10:59:02 +01:00

274 lines
9.2 KiB
Python

import base64
import email.utils
import hashlib
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import requests
from Crypto.Cipher import AES
class TeleSecCryptoError(Exception):
pass
class TeleSecCouchDBError(Exception):
pass
def _pkcs7_pad(data: bytes, block_size: int = 16) -> bytes:
pad_len = block_size - (len(data) % block_size)
return data + bytes([pad_len]) * pad_len
def _pkcs7_unpad(data: bytes, block_size: int = 16) -> bytes:
if not data or len(data) % block_size != 0:
raise TeleSecCryptoError("Invalid padded data length")
pad_len = data[-1]
if pad_len < 1 or pad_len > block_size:
raise TeleSecCryptoError("Invalid PKCS7 padding")
if data[-pad_len:] != bytes([pad_len]) * pad_len:
raise TeleSecCryptoError("Invalid PKCS7 padding bytes")
return data[:-pad_len]
def _evp_bytes_to_key(passphrase: bytes, salt: bytes, key_len: int, iv_len: int) -> tuple[bytes, bytes]:
d = b""
prev = b""
while len(d) < key_len + iv_len:
prev = hashlib.md5(prev + passphrase + salt).digest()
d += prev
return d[:key_len], d[key_len : key_len + iv_len]
def _json_dumps_like_js(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def ts_encrypt(input_value: Any, secret: str) -> str:
"""
Compatible with JS: CryptoJS.AES.encrypt(payload, secret).toString()
wrapped as RSA{<ciphertext>}.
"""
if secret is None or secret == "":
if isinstance(input_value, str):
return input_value
return _json_dumps_like_js(input_value)
payload = input_value
if not isinstance(input_value, str):
try:
payload = _json_dumps_like_js(input_value)
except Exception:
payload = str(input_value)
payload_bytes = payload.encode("utf-8")
salt = os.urandom(8)
key, iv = _evp_bytes_to_key(secret.encode("utf-8"), salt, 32, 16)
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
encrypted = cipher.encrypt(_pkcs7_pad(payload_bytes, 16))
openssl_blob = b"Salted__" + salt + encrypted
b64 = base64.b64encode(openssl_blob).decode("utf-8")
return f"RSA{{{b64}}}"
def ts_encrypt(input_value: Any, secret: str) -> str:
if not isinstance(input_value, str):
payload = json.dumps(input_value, separators=(",", ":"), ensure_ascii=False)
else:
payload = input_value
payload_bytes = payload.encode("utf-8")
salt = os.urandom(8)
# OpenSSL EVP_BytesToKey (MD5)
dx = b""
salted = b""
while len(salted) < 48: # 32 key + 16 iv
dx = hashlib.md5(dx + secret.encode() + salt).digest()
salted += dx
key = salted[:32]
iv = salted[32:48]
cipher = AES.new(key, AES.MODE_CBC, iv)
encrypted = cipher.encrypt(_pkcs7_pad(payload_bytes, 16))
openssl_blob = b"Salted__" + salt + encrypted
b64 = base64.b64encode(openssl_blob).decode("utf-8")
return f"RSA{{{b64}}}"
@dataclass
class TeleSecDoc:
id: str
data: Any
raw: Dict[str, Any]
class TeleSecCouchDB:
"""
Direct CouchDB client for TeleSec docs (_id = "<table>:<id>").
No local replication layer.
"""
def __init__(
self,
server_url: str,
dbname: str,
secret: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
timeout: int = 30,
session: Optional[requests.Session] = None,
) -> None:
self.server_url = server_url.rstrip("/")
self.dbname = dbname
self.secret = secret or ""
self.timeout = timeout
self.base_url = f"{self.server_url}/{quote(self.dbname, safe='')}"
self.session = session or requests.Session()
self.session.headers.update({"Accept": "application/json"})
if username is not None:
self.session.auth = (username, password or "")
def _iso_now(self) -> str:
return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
def _doc_id(self, table: str, item_id: str) -> str:
return f"{table}:{item_id}"
def _request(self, method: str, path: str = "", **kwargs) -> requests.Response:
url = self.base_url if not path else f"{self.base_url}/{path.lstrip('/')}"
kwargs.setdefault("timeout", self.timeout)
res = self.session.request(method=method, url=url, **kwargs)
return res
def get_server_datetime(self) -> datetime:
"""
Returns server datetime using HTTP Date header from CouchDB.
Avoids reliance on local machine clock.
"""
candidates = [
("HEAD", self.base_url),
("GET", self.base_url),
("HEAD", self.server_url),
("GET", self.server_url),
]
for method, url in candidates:
try:
res = self.session.request(method=method, url=url, timeout=self.timeout)
date_header = res.headers.get("Date")
if not date_header:
continue
dt = email.utils.parsedate_to_datetime(date_header)
if dt is None:
continue
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except Exception:
continue
raise TeleSecCouchDBError("Unable to retrieve server time from CouchDB Date header")
def iso_from_server_plus_minutes(self, minutes: int = 0) -> str:
now = self.get_server_datetime()
target = now.timestamp() + (minutes * 60)
out = datetime.fromtimestamp(target, tz=timezone.utc)
return out.isoformat(timespec="milliseconds").replace("+00:00", "Z")
def check_connection(self) -> Dict[str, Any]:
res = self._request("GET")
if res.status_code >= 400:
raise TeleSecCouchDBError(f"CouchDB connection failed: {res.status_code} {res.text}")
return res.json()
def get_raw(self, doc_id: str) -> Optional[Dict[str, Any]]:
res = self._request("GET", quote(doc_id, safe=""))
if res.status_code == 404:
return None
if res.status_code >= 400:
raise TeleSecCouchDBError(f"GET doc failed: {res.status_code} {res.text}")
return res.json()
def put_raw(self, doc: Dict[str, Any]) -> Dict[str, Any]:
if "_id" not in doc:
raise ValueError("Document must include _id")
res = self._request(
"PUT",
quote(doc["_id"], safe=""),
headers={"Content-Type": "application/json"},
data=_json_dumps_like_js(doc).encode("utf-8"),
)
if res.status_code >= 400:
raise TeleSecCouchDBError(f"PUT doc failed: {res.status_code} {res.text}")
return res.json()
def delete_raw(self, doc_id: str) -> bool:
doc = self.get_raw(doc_id)
if not doc:
return False
res = self._request("DELETE", f"{quote(doc_id, safe='')}?rev={quote(doc['_rev'], safe='')}")
if res.status_code >= 400:
raise TeleSecCouchDBError(f"DELETE doc failed: {res.status_code} {res.text}")
return True
def put(self, table: str, item_id: str, data: Any, encrypt: bool = True) -> Dict[str, Any]:
doc_id = self._doc_id(table, item_id)
if data is None:
self.delete_raw(doc_id)
return {"ok": True, "id": doc_id, "deleted": True}
existing = self.get_raw(doc_id)
doc: Dict[str, Any] = existing if existing else {"_id": doc_id}
to_store = data
is_encrypted_string = isinstance(data, str) and data.startswith("RSA{") and data.endswith("}")
if encrypt and self.secret and not is_encrypted_string:
to_store = ts_encrypt(data, self.secret)
doc["data"] = to_store
doc["table"] = table
doc["ts"] = self._iso_now()
return self.put_raw(doc)
def get(self, table: str, item_id: str, decrypt: bool = True) -> Optional[Any]:
doc_id = self._doc_id(table, item_id)
doc = self.get_raw(doc_id)
if not doc:
return None
value = doc.get("data")
if decrypt:
return ts_decrypt(value, self.secret)
return value
def delete(self, table: str, item_id: str) -> bool:
return self.delete_raw(self._doc_id(table, item_id))
def list(self, table: str, decrypt: bool = True) -> List[TeleSecDoc]:
params = {
"include_docs": "true",
"startkey": f'"{table}:"',
"endkey": f'"{table}:\uffff"',
}
res = self._request("GET", "_all_docs", params=params)
if res.status_code >= 400:
raise TeleSecCouchDBError(f"LIST docs failed: {res.status_code} {res.text}")
rows = res.json().get("rows", [])
out: List[TeleSecDoc] = []
for row in rows:
doc = row.get("doc") or {}
item_id = row.get("id", "").split(":", 1)[1] if ":" in row.get("id", "") else row.get("id", "")
value = doc.get("data")
if decrypt:
value = ts_decrypt(value, self.secret)
out.append(TeleSecDoc(id=item_id, data=value, raw=doc))
return out