import base64 from Crypto.Cipher import ChaCha20_Poly1305 from Crypto.Protocol.KDF import HKDF from Crypto.Hash import SHA256 HKDF_INFO = b"espilon-c2-v1" class CryptoContext: """Per-device AEAD crypto context. Derives a 32-byte encryption key from the device's master key using HKDF-SHA256 with device_id as salt. """ def __init__(self, master_key: bytes, device_id: str): if len(master_key) != 32: raise ValueError(f"master_key must be 32 bytes, got {len(master_key)}") self.derived_key = HKDF( master=master_key, key_len=32, salt=device_id.encode(), hashmod=SHA256, context=HKDF_INFO, ) # ========================= # ChaCha20-Poly1305 AEAD # ========================= def encrypt(self, data: bytes) -> bytes: """Encrypt and authenticate. Returns nonce[12] || ciphertext || tag[16].""" cipher = ChaCha20_Poly1305.new(key=self.derived_key) ct, tag = cipher.encrypt_and_digest(data) return cipher.nonce + ct + tag def decrypt(self, data: bytes) -> bytes: """Decrypt and verify. Input: nonce[12] || ciphertext || tag[16]. Raises ValueError on authentication failure. """ if len(data) < 28: raise ValueError(f"Encrypted payload too short ({len(data)} bytes)") nonce = data[:12] tag = data[-16:] ct = data[12:-16] cipher = ChaCha20_Poly1305.new(key=self.derived_key, nonce=nonce) return cipher.decrypt_and_verify(ct, tag) # ========================= # Base64 # ========================= @staticmethod def b64_encode(data: bytes) -> bytes: return base64.b64encode(data) @staticmethod def b64_decode(data: bytes) -> bytes: return base64.b64decode(data)