ソースを参照

new crypto api, blackbox/AEAD. also adds AES256-GCM.

includes:

- aes256-ctr-hmac-sha256 (attic/borg legacy, optional aad support)

- aes256-gcm (new, optional aad support)
  uses 96bits for iv, 128bit for auth tag.

- header support
  the caller-provided header will be just copied in front of the rest -
  this avoids expensive operations (memcpy, garbage collection) in Python.
  the first bytes in the header may be non-authenticated data if aad_offset > 0.
  this is to support legacy attic/borg envelope layout, where the type byte
  is not authenticated.

- aad support
  additional authenticated data - it just contributes to the computed mac,
  but is not encrypted). the current api assumes that aad starts at some
  aad_offset inside the given header and extends to the end of it.

- iv handling helpers, compute next iv based on amount of processed data

- unit tests

Note: the changes are intentionally kept isolated / not integrated into the
      rest of the code, so this has to be done later.
Thomas Waldmann 9 年 前
コミット
67567fc432
2 ファイル変更468 行追加101 行削除
  1. 351 87
      src/borg/crypto/low_level.pyx
  2. 117 14
      src/borg/testsuite/crypto.py

+ 351 - 87
src/borg/crypto/low_level.pyx

@@ -1,4 +1,49 @@
-"""A thin OpenSSL wrapper"""
+"""An AEAD style OpenSSL wrapper
+
+Note: AES-GCM mode needs OpenSSL >= 1.0.1d due to bug fixes in OpenSSL.
+
+API:
+
+    encrypt(data, header=b'', aad_offset=0) -> envelope
+    decrypt(envelope, header_len=0, aad_offset=0) -> data
+
+Envelope layout:
+
+|<--------------------------- envelope ------------------------------------------>|
+|<------------ header ----------->|<---------- ciphersuite specific ------------->|
+|<-- not auth data -->|<-- aad -->|<-- e.g.:  S(aad, iv, E(data)), iv, E(data) -->|
+
+|--- #aad_offset ---->|
+|------------- #header_len ------>|
+
+S means a cryptographic signature function (like HMAC or GMAC).
+E means a encryption function (like AES).
+iv is the initialization vector / nonce, if needed.
+
+The split of header into not authenticated data and aad (additional authenticated
+data) is done to support the legacy envelope layout as used in attic and early borg
+(where the TYPE byte was not authenticated) and avoid unneeded memcpy and string
+garbage.
+
+Newly designed envelope layouts can just authenticate the whole header.
+
+IV handling:
+
+    iv = ...  # just never repeat!
+    cs = CS(hmac_key, enc_key, iv=iv)
+    envelope = cs.encrypt(data, header, aad_offset)
+    iv = cs.next_iv(len(data))
+    (repeat)
+"""
+
+# TODO: get rid of small malloc
+# as @enkore mentioned on github:
+# "Since we do many small-object allocations here it is probably better to use
+# PyMem_Alloc/Free instead of malloc/free (PyMem has many optimizations for
+# small allocs)."
+#
+# Small mallocs currently happen if the total input file length is small, so
+# the 1 chunk's size is less than what the chunker would produce for big files.
 
 import hashlib
 import hmac
@@ -10,6 +55,8 @@ from cpython.bytes cimport PyBytes_FromStringAndSize
 
 API_VERSION = '1.1_02'
 
+cdef extern from "openssl/crypto.h":
+    int CRYPTO_memcmp(const void *a, const void *b, size_t len)
 
 cdef extern from "../algorithms/blake2-libselect.h":
     ctypedef struct blake2b_state:
@@ -29,9 +76,12 @@ cdef extern from "openssl/evp.h":
         pass
     ctypedef struct ENGINE:
         pass
+
     const EVP_CIPHER *EVP_aes_256_ctr()
-    EVP_CIPHER_CTX *EVP_CIPHER_CTX_new()
-    void EVP_CIPHER_CTX_free(EVP_CIPHER_CTX *a)
+    const EVP_CIPHER *EVP_aes_256_gcm()
+
+    void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
+    void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
 
     int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
                            const unsigned char *key, const unsigned char *iv)
@@ -44,10 +94,27 @@ cdef extern from "openssl/evp.h":
     int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
     int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
 
-    EVP_MD *EVP_sha256() nogil
+    int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, void *ptr)
+    int EVP_CTRL_GCM_GET_TAG
+    int EVP_CTRL_GCM_SET_TAG
+    int EVP_CTRL_GCM_SET_IVLEN
 
+    const EVP_MD *EVP_sha256() nogil
+
+    EVP_CIPHER_CTX *EVP_CIPHER_CTX_new()
+    void EVP_CIPHER_CTX_free(EVP_CIPHER_CTX *a)
 
 cdef extern from "openssl/hmac.h":
+    ctypedef struct HMAC_CTX:
+        pass
+
+    void HMAC_CTX_init(HMAC_CTX *ctx)
+    void HMAC_CTX_cleanup(HMAC_CTX *ctx)
+
+    int HMAC_Init_ex(HMAC_CTX *ctx, const void *key, int key_len, const EVP_MD *md, ENGINE *impl)
+    int HMAC_Update(HMAC_CTX *ctx, const unsigned char *data, int len)
+    int HMAC_Final(HMAC_CTX *ctx, unsigned char *md, unsigned int *len)
+
     unsigned char *HMAC(const EVP_MD *evp_md,
                     const void *key, int key_len,
                     const unsigned char *data, int data_len,
@@ -91,114 +158,311 @@ def increment_iv(iv, amount=1):
     return iv
 
 
-def num_aes_blocks(int length):
+def num_aes_blocks(length):
     """Return the number of AES blocks required to encrypt/decrypt *length* bytes of data.
        Note: this is only correct for modes without padding, like AES-CTR.
     """
     return (length + 15) // 16
 
 
+class CryptoError(Exception):
+    """Malfunction in the crypto module."""
+
+
+class IntegrityError(CryptoError):
+    """Integrity checks failed. Corrupted or tampered data."""
+
+
 cdef Py_buffer ro_buffer(object data) except *:
     cdef Py_buffer view
     PyObject_GetBuffer(data, &view, PyBUF_SIMPLE)
     return view
 
 
-cdef class AES:
-    """A thin wrapper around the OpenSSL EVP cipher API
-    """
+cdef class AES256_CTR_HMAC_SHA256:
+    # Layout: HEADER + HMAC 32 + IV 8 + CT (same as attic / borg < 1.2 IF HEADER = TYPE_BYTE, no AAD)
+
     cdef EVP_CIPHER_CTX *ctx
-    cdef int is_encrypt
-    cdef unsigned char iv_orig[16]
+    cdef HMAC_CTX hmac_ctx
+    cdef unsigned char *mac_key
+    cdef unsigned char *enc_key
+    cdef unsigned char iv[16]
     cdef long long blocks
 
-    def __cinit__(self, is_encrypt, key, iv=None):
+    def __init__(self, mac_key, enc_key, iv=None):
+        assert isinstance(mac_key, bytes) and len(mac_key) == 32
+        assert isinstance(enc_key, bytes) and len(enc_key) == 32
+        assert iv is None or isinstance(iv, bytes) and len(iv) == 16
+        self.mac_key = mac_key
+        self.enc_key = enc_key
+        if iv is not None:
+            self.set_iv(iv)
+
+    def __cinit__(self, mac_key, enc_key, iv=None):
         self.ctx = EVP_CIPHER_CTX_new()
-        self.is_encrypt = is_encrypt
-        # Set cipher type and mode
-        cipher_mode = EVP_aes_256_ctr()
-        if self.is_encrypt:
-            if not EVP_EncryptInit_ex(self.ctx, cipher_mode, NULL, NULL, NULL):
-                raise Exception('EVP_EncryptInit_ex failed')
-        else:  # decrypt
-            if not EVP_DecryptInit_ex(self.ctx, cipher_mode, NULL, NULL, NULL):
-                raise Exception('EVP_DecryptInit_ex failed')
-        self.reset(key, iv)
+        HMAC_CTX_init(&self.hmac_ctx) # XXX
 
     def __dealloc__(self):
         EVP_CIPHER_CTX_free(self.ctx)
+        HMAC_CTX_cleanup(&self.hmac_ctx) # XXX
+
+    def encrypt(self, data, header=b'', aad_offset=0):
+        """
+        encrypt data, compute mac over aad + iv + cdata, prepend header.
+        aad_offset is the offset into the header where aad starts.
+        """
+        cdef int ilen = len(data)
+        cdef int hlen = len(header)
+        cdef int aoffset = aad_offset
+        cdef int alen = hlen - aoffset
+        cdef unsigned char *odata = <unsigned char *>malloc(hlen + 32 + 8 + ilen + 16)
+        if not odata:
+            raise MemoryError
+        cdef int olen
+        cdef int offset
+        cdef Py_buffer idata = ro_buffer(data)
+        cdef Py_buffer hdata = ro_buffer(header)
+        try:
+            offset = 0
+            for i in range(hlen):
+                odata[offset+i] = header[i]
+            offset += hlen
+            offset += 32
+            self.store_iv(odata+offset, self.iv)
+            offset += 8
+            rc = EVP_EncryptInit_ex(self.ctx, EVP_aes_256_ctr(), NULL, self.enc_key, self.iv)
+            if not rc:
+                raise CryptoError('EVP_EncryptInit_ex failed')
+            rc = EVP_EncryptUpdate(self.ctx, odata+offset, &olen, <const unsigned char*> idata.buf, ilen)
+            if not rc:
+                raise CryptoError('EVP_EncryptUpdate failed')
+            offset += olen
+            rc = EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen)
+            if not rc:
+                raise CryptoError('EVP_EncryptFinal_ex failed')
+            offset += olen
+            if not HMAC_Init_ex(&self.hmac_ctx, self.mac_key, 32, EVP_sha256(), NULL):
+                raise CryptoError('HMAC_Init_ex failed')
+            if not HMAC_Update(&self.hmac_ctx, <const unsigned char *> hdata.buf+aoffset, alen):
+                raise CryptoError('HMAC_Update failed')
+            if not HMAC_Update(&self.hmac_ctx, odata+hlen+32, offset-hlen-32):
+                raise CryptoError('HMAC_Update failed')
+            if not HMAC_Final(&self.hmac_ctx, odata+hlen, NULL):
+                raise CryptoError('HMAC_Final failed')
+            self.blocks += num_aes_blocks(ilen)
+            return odata[:offset]
+        finally:
+            free(odata)
+            PyBuffer_Release(&hdata)
+            PyBuffer_Release(&idata)
+
+    def decrypt(self, envelope, header_len=0, aad_offset=0):
+        """
+        authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
+        """
+        cdef int ilen = len(envelope)
+        cdef int hlen = header_len
+        cdef int aoffset = aad_offset
+        cdef int alen = hlen - aoffset
+        cdef unsigned char *odata = <unsigned char *>malloc(ilen + 16)
+        if not odata:
+            raise MemoryError
+        cdef int olen
+        cdef int offset
+        cdef unsigned char hmac_buf[32]
+        cdef Py_buffer idata = ro_buffer(envelope)
+        try:
+            if not HMAC_Init_ex(&self.hmac_ctx, self.mac_key, 32, EVP_sha256(), NULL):
+                raise CryptoError('HMAC_Init_ex failed')
+            if not HMAC_Update(&self.hmac_ctx, <const unsigned char *> idata.buf+aoffset, alen):
+                raise CryptoError('HMAC_Update failed')
+            if not HMAC_Update(&self.hmac_ctx, <const unsigned char *> idata.buf+hlen+32, ilen-hlen-32):
+                raise CryptoError('HMAC_Update failed')
+            if not HMAC_Final(&self.hmac_ctx, hmac_buf, NULL):
+                raise CryptoError('HMAC_Final failed')
+            if CRYPTO_memcmp(hmac_buf, idata.buf+hlen, 32):
+                raise IntegrityError('HMAC Authentication failed')
+            iv = self.fetch_iv(<unsigned char *> idata.buf+hlen+32)
+            self.set_iv(iv)
+            if not EVP_DecryptInit_ex(self.ctx, EVP_aes_256_ctr(), NULL, self.enc_key, iv):
+                raise CryptoError('EVP_DecryptInit_ex failed')
+            offset = 0
+            rc = EVP_DecryptUpdate(self.ctx, odata+offset, &olen, <const unsigned char*> idata.buf+hlen+32+8, ilen-hlen-32-8)
+            if not rc:
+                raise CryptoError('EVP_DecryptUpdate failed')
+            offset += olen
+            rc = EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen)
+            if rc <= 0:
+                raise CryptoError('EVP_DecryptFinal_ex failed')
+            offset += olen
+            self.blocks += num_aes_blocks(offset)
+            return odata[:offset]
+        finally:
+            free(odata)
+            PyBuffer_Release(&idata)
+
+    def set_iv(self, iv):
+        self.blocks = 0  # how many AES blocks got encrypted with this IV?
+        for i in range(16):
+            self.iv[i] = iv[i]
+
+    def next_iv(self):
+        return increment_iv(self.iv[:16], self.blocks)
+
+    cdef fetch_iv(self, unsigned char * iv_in):
+        # fetch lower 8 bytes of iv and add upper 8 zero bytes
+        return b"\0" * 8 + iv_in[0:8]
+
+    cdef store_iv(self, unsigned char * iv_out, unsigned char * iv):
+        # store only lower 8 bytes, upper 8 bytes are assumed to be 0
+        cdef int i
+        for i in range(8):
+            iv_out[i] = iv[8+i]
+
+
+cdef class AES256_GCM:
+    # Layout: HEADER + GMAC 16 + IV 12 + CT
 
-    def reset(self, key=None, iv=None):
-        cdef const unsigned char *key2 = NULL
-        cdef const unsigned char *iv2 = NULL
-        if key:
-            key2 = key
-        if iv:
-            iv2 = iv
-            assert isinstance(iv, bytes) and len(iv) == 16
-            for i in range(16):
-                self.iv_orig[i] = iv[i]
-            self.blocks = 0  # number of AES blocks encrypted starting with iv_orig
-        # Initialise key and IV
-        if self.is_encrypt:
-            if not EVP_EncryptInit_ex(self.ctx, NULL, NULL, key2, iv2):
-                raise Exception('EVP_EncryptInit_ex failed')
-        else:  # decrypt
-            if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, key2, iv2):
-                raise Exception('EVP_DecryptInit_ex failed')
-
-    @property
-    def iv(self):
-        return increment_iv(self.iv_orig[:16], self.blocks)
-
-    def encrypt(self, data):
-        cdef Py_buffer data_buf = ro_buffer(data)
-        cdef int inl = len(data)
-        cdef int ctl = 0
-        cdef int outl = 0
-        # note: modes that use padding, need up to one extra AES block (16b)
-        cdef unsigned char *out = <unsigned char *>malloc(inl+16)
-        if not out:
+    cdef EVP_CIPHER_CTX *ctx
+    cdef unsigned char *enc_key
+    cdef unsigned char iv[12]
+    cdef long long blocks
+
+    def __init__(self, mac_key, enc_key, iv=None):
+        assert mac_key is None
+        assert isinstance(enc_key, bytes) and len(enc_key) == 32
+        assert iv is None or isinstance(iv, bytes) and len(iv) == 12
+        self.enc_key = enc_key
+        if iv is not None:
+            self.set_iv(iv)
+
+    def __cinit__(self, mac_key, enc_key, iv=None):
+        self.ctx = EVP_CIPHER_CTX_new()
+
+    def __dealloc__(self):
+        EVP_CIPHER_CTX_free(self.ctx)
+
+    def encrypt(self, data, header=b'', aad_offset=0):
+        """
+        encrypt data, compute mac over aad + iv + cdata, prepend header.
+        aad_offset is the offset into the header where aad starts.
+        """
+        cdef int ilen = len(data)
+        cdef int hlen = len(header)
+        cdef int aoffset = aad_offset
+        cdef int alen = hlen - aoffset
+        cdef unsigned char *odata = <unsigned char *>malloc(hlen + 16 + 12 + ilen + 16)
+        if not odata:
             raise MemoryError
+        cdef int olen
+        cdef int offset
+        cdef Py_buffer idata = ro_buffer(data)
+        cdef Py_buffer hdata = ro_buffer(header)
         try:
-            if not EVP_EncryptUpdate(self.ctx, out, &outl, <const unsigned char*> data_buf.buf, inl):
-                raise Exception('EVP_EncryptUpdate failed')
-            ctl = outl
-            if not EVP_EncryptFinal_ex(self.ctx, out+ctl, &outl):
-                raise Exception('EVP_EncryptFinal failed')
-            ctl += outl
-            self.blocks += num_aes_blocks(ctl)
-            return out[:ctl]
+            offset = 0
+            for i in range(hlen):
+                odata[offset+i] = header[i]
+            offset += hlen
+            offset += 16
+            self.store_iv(odata+offset, self.iv)
+            rc = EVP_EncryptInit_ex(self.ctx, EVP_aes_256_gcm(), NULL, NULL, NULL)
+            if not rc:
+                raise CryptoError('EVP_EncryptInit_ex failed')
+            if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_IVLEN, 12, NULL):
+                raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
+            rc = EVP_EncryptInit_ex(self.ctx, NULL, NULL, self.enc_key, self.iv)
+            if not rc:
+                raise CryptoError('EVP_EncryptInit_ex failed')
+            rc = EVP_EncryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> hdata.buf+aoffset, alen)
+            if not rc:
+                raise CryptoError('EVP_EncryptUpdate failed')
+            if not EVP_EncryptUpdate(self.ctx, NULL, &olen, odata+offset, 12):
+                raise CryptoError('EVP_EncryptUpdate failed')
+            offset += 12
+            rc = EVP_EncryptUpdate(self.ctx, odata+offset, &olen, <const unsigned char*> idata.buf, ilen)
+            if not rc:
+                raise CryptoError('EVP_EncryptUpdate failed')
+            offset += olen
+            rc = EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen)
+            if not rc:
+                raise CryptoError('EVP_EncryptFinal_ex failed')
+            offset += olen
+            if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_GET_TAG, 16, odata+hlen):
+                raise CryptoError('EVP_CIPHER_CTX_ctrl GET TAG failed')
+            self.blocks += num_aes_blocks(ilen)
+            return odata[:offset]
         finally:
-            free(out)
-            PyBuffer_Release(&data_buf)
-
-    def decrypt(self, data):
-        cdef Py_buffer data_buf = ro_buffer(data)
-        cdef int inl = len(data)
-        cdef int ptl = 0
-        cdef int outl = 0
-        # note: modes that use padding, need up to one extra AES block (16b).
-        # This is what the openssl docs say. I am not sure this is correct,
-        # but OTOH it will not cause any harm if our buffer is a little bigger.
-        cdef unsigned char *out = <unsigned char *>malloc(inl+16)
-        if not out:
+            free(odata)
+            PyBuffer_Release(&hdata)
+            PyBuffer_Release(&idata)
+
+    def decrypt(self, envelope, header_len=0, aad_offset=0):
+        """
+        authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
+        """
+        cdef int ilen = len(envelope)
+        cdef int hlen = header_len
+        cdef int aoffset = aad_offset
+        cdef int alen = hlen - aoffset
+        cdef unsigned char *odata = <unsigned char *>malloc(ilen + 16)
+        if not odata:
             raise MemoryError
+        cdef int olen
+        cdef int offset
+        cdef Py_buffer idata = ro_buffer(envelope)
         try:
-            if not EVP_DecryptUpdate(self.ctx, out, &outl, <const unsigned char*> data_buf.buf, inl):
-                raise Exception('EVP_DecryptUpdate failed')
-            ptl = outl
-            if EVP_DecryptFinal_ex(self.ctx, out+ptl, &outl) <= 0:
-                # this error check is very important for modes with padding or
-                # authentication. for them, a failure here means corrupted data.
-                # CTR mode does not use padding nor authentication.
-                raise Exception('EVP_DecryptFinal failed')
-            ptl += outl
-            self.blocks += num_aes_blocks(inl)
-            return out[:ptl]
+            if not EVP_DecryptInit_ex(self.ctx, EVP_aes_256_gcm(), NULL, NULL, NULL):
+                raise CryptoError('EVP_DecryptInit_ex failed')
+            iv = self.fetch_iv(<unsigned char *> idata.buf+hlen+16)
+            self.set_iv(iv)
+            if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_IVLEN, 12, NULL):
+                raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
+            if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, self.enc_key, iv):
+                raise CryptoError('EVP_DecryptInit_ex failed')
+            if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_TAG, 16, <void *> idata.buf+hlen):
+                raise CryptoError('EVP_CIPHER_CTX_ctrl SET TAG failed')
+            rc = EVP_DecryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> idata.buf+aoffset, alen)
+            if not rc:
+                raise CryptoError('EVP_DecryptUpdate failed')
+            if not EVP_DecryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> idata.buf+hlen+16, 12):
+                raise CryptoError('EVP_DecryptUpdate failed')
+            offset = 0
+            rc = EVP_DecryptUpdate(self.ctx, odata+offset, &olen, <const unsigned char*> idata.buf+hlen+16+12, ilen-hlen-16-12)
+            if not rc:
+                raise CryptoError('EVP_DecryptUpdate failed')
+            offset += olen
+            rc = EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen)
+            if rc <= 0:
+                # a failure here means corrupted or tampered tag (mac) or data.
+                raise IntegrityError('GCM Authentication / EVP_DecryptFinal_ex failed')
+            offset += olen
+            self.blocks += num_aes_blocks(offset)
+            return odata[:offset]
         finally:
-            free(out)
-            PyBuffer_Release(&data_buf)
+            free(odata)
+            PyBuffer_Release(&idata)
+
+    def set_iv(self, iv):
+        self.blocks = 0  # number of AES blocks encrypted with this IV
+        for i in range(12):
+            self.iv[i] = iv[i]
+
+    def next_iv(self):
+        assert self.blocks < 2**32
+        # we need 16 bytes for increment_iv:
+        last_iv = b'\0\0\0\0' + self.iv[:12]
+        # gcm mode is special: it appends a internal 32bit counter to the 96bit (12 byte) we provide, thus we only
+        # need to increment the 96bit counter by 1 (and we must not encrypt more than 2^32 AES blocks with same IV):
+        next_iv = increment_iv(last_iv, 1)
+        return next_iv[-12:]
+
+    cdef fetch_iv(self, unsigned char * iv_in):
+        return iv_in[0:12]
+
+    cdef store_iv(self, unsigned char * iv_out, unsigned char * iv):
+        cdef int i
+        for i in range(12):
+            iv_out[i] = iv[i]
 
 
 def hmac_sha256(key, data):
@@ -210,7 +474,7 @@ def hmac_sha256(key, data):
         with nogil:
             rc = HMAC(EVP_sha256(), key_ptr, key_len, <const unsigned char*> data_buf.buf, data_buf.len, md, NULL)
         if rc != md:
-            raise Exception('HMAC(EVP_sha256) failed')
+            raise CryptoError('HMAC(EVP_sha256) failed')
     finally:
         PyBuffer_Release(&data_buf)
     return PyBytes_FromStringAndSize(<char*> &md[0], 32)

+ 117 - 14
src/borg/testsuite/crypto.py

@@ -1,8 +1,9 @@
 from binascii import hexlify, unhexlify
 
-from ..crypto.low_level import AES, bytes_to_long, bytes_to_int, long_to_bytes, hmac_sha256, blake2b_256
-from ..crypto.low_level import increment_iv, bytes16_to_int, int_to_bytes16
+from ..crypto.low_level import AES256_CTR_HMAC_SHA256, AES256_GCM, IntegrityError, hmac_sha256, blake2b_256
+from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes, bytes16_to_int, int_to_bytes16, increment_iv
 from ..crypto.low_level import hkdf_hmac_sha512
+
 from . import BaseTestCase
 
 # Note: these tests are part of the self test, do not use or import py.test functionality here.
@@ -39,21 +40,123 @@ class CryptoTestCase(BaseTestCase):
         self.assert_equal(increment_iv(iva, 2), ivc)
         self.assert_equal(increment_iv(iv0, 2**64), ivb)
 
-    def test_aes(self):
-        key = b'X' * 32
+    def test_AES256_CTR_HMAC_SHA256(self):
+        # this tests the layout as in attic / borg < 1.2 (1 type byte, no aad)
+        mac_key = b'Y' * 32
+        enc_key = b'X' * 32
+        iv = b'\0' * 16
+        data = b'foo' * 10
+        header = b'\x42'
+        # encrypt-then-mac
+        cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv)
+        hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
+        hdr = hdr_mac_iv_cdata[0:1]
+        mac = hdr_mac_iv_cdata[1:33]
+        iv = hdr_mac_iv_cdata[33:41]
+        cdata = hdr_mac_iv_cdata[41:]
+        self.assert_equal(hexlify(hdr), b'42')
+        self.assert_equal(hexlify(mac), b'af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8')
+        self.assert_equal(hexlify(iv), b'0000000000000000')
+        self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
+        self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
+        # auth-then-decrypt
+        cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
+        pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
+        self.assert_equal(data, pdata)
+        self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
+        # auth-failure due to corruption (corrupted data)
+        cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
+        hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:41] + b'\0' + hdr_mac_iv_cdata[42:]
+        self.assert_raises(IntegrityError,
+                           lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
+
+    def test_AES256_CTR_HMAC_SHA256_aad(self):
+        mac_key = b'Y' * 32
+        enc_key = b'X' * 32
+        iv = b'\0' * 16
         data = b'foo' * 10
-        # encrypt
-        aes = AES(is_encrypt=True, key=key)
-        self.assert_equal(bytes_to_long(aes.iv, 8), 0)
-        cdata = aes.encrypt(data)
+        header = b'\x12\x34\x56'
+        # encrypt-then-mac
+        cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv)
+        hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
+        hdr = hdr_mac_iv_cdata[0:3]
+        mac = hdr_mac_iv_cdata[3:35]
+        iv = hdr_mac_iv_cdata[35:43]
+        cdata = hdr_mac_iv_cdata[43:]
+        self.assert_equal(hexlify(hdr), b'123456')
+        self.assert_equal(hexlify(mac), b'7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138')
+        self.assert_equal(hexlify(iv), b'0000000000000000')
         self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
-        self.assert_equal(bytes_to_long(aes.iv, 8), 2)
-        # decrypt
-        aes = AES(is_encrypt=False, key=key)
-        self.assert_equal(bytes_to_long(aes.iv, 8), 0)
-        pdata = aes.decrypt(cdata)
+        self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
+        # auth-then-decrypt
+        cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
+        pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
+        self.assert_equal(data, pdata)
+        self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
+        # auth-failure due to corruption (corrupted aad)
+        cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
+        hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
+        self.assert_raises(IntegrityError,
+                           lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
+
+    def test_AES_GCM_256_GMAC(self):
+        # gcm used in legacy-like layout (1 type byte, no aad)
+        mac_key = None
+        enc_key = b'X' * 32
+        iv = b'\0' * 12
+        data = b'foo' * 10
+        header = b'\x23'
+        # encrypt-then-mac
+        cs = AES256_GCM(mac_key, enc_key, iv)
+        hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
+        hdr = hdr_mac_iv_cdata[0:1]
+        mac = hdr_mac_iv_cdata[1:17]
+        iv = hdr_mac_iv_cdata[17:29]
+        cdata = hdr_mac_iv_cdata[29:]
+        self.assert_equal(hexlify(hdr), b'23')
+        self.assert_equal(hexlify(mac), b'66a438843aa41a087d6a7ed1dc1f3c4c')
+        self.assert_equal(hexlify(iv), b'000000000000000000000000')
+        self.assert_equal(hexlify(cdata), b'5bbb40be14e4bcbfc75715b77b1242d590d2bf9f7f8a8a910b4469888689')
+        self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
+        # auth-then-decrypt
+        cs = AES256_GCM(mac_key, enc_key)
+        pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
+        self.assert_equal(data, pdata)
+        self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
+        # auth-failure due to corruption (corrupted data)
+        cs = AES256_GCM(mac_key, enc_key)
+        hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:29] + b'\0' + hdr_mac_iv_cdata[30:]
+        self.assert_raises(IntegrityError,
+                           lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
+
+    def test_AES_GCM_256_GMAC_aad(self):
+        mac_key = None
+        enc_key = b'X' * 32
+        iv = b'\0' * 12
+        data = b'foo' * 10
+        header = b'\x12\x34\x56'
+        # encrypt-then-mac
+        cs = AES256_GCM(mac_key, enc_key, iv)
+        hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
+        hdr = hdr_mac_iv_cdata[0:3]
+        mac = hdr_mac_iv_cdata[3:19]
+        iv = hdr_mac_iv_cdata[19:31]
+        cdata = hdr_mac_iv_cdata[31:]
+        self.assert_equal(hexlify(hdr), b'123456')
+        self.assert_equal(hexlify(mac), b'4fb0e5b0a0bca57527352cc6240e7cca')
+        self.assert_equal(hexlify(iv), b'000000000000000000000000')
+        self.assert_equal(hexlify(cdata), b'5bbb40be14e4bcbfc75715b77b1242d590d2bf9f7f8a8a910b4469888689')
+        self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
+        # auth-then-decrypt
+        cs = AES256_GCM(mac_key, enc_key)
+        pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
         self.assert_equal(data, pdata)
-        self.assert_equal(bytes_to_long(aes.iv, 8), 2)
+        self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
+        # auth-failure due to corruption (corrupted aad)
+        cs = AES256_GCM(mac_key, enc_key)
+        hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
+        self.assert_raises(IntegrityError,
+                           lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
 
     def test_hmac_sha256(self):
         # RFC 4231 test vectors