浏览代码

reintegrate AEAD cipher made from AES CTR + HMAC-SHA256

Thomas Waldmann 10 年之前
父节点
当前提交
6aca9383d7
共有 3 个文件被更改,包括 82 次插入25 次删除
  1. 28 12
      attic/crypto.pyx
  2. 36 9
      attic/key.py
  3. 18 4
      attic/testsuite/crypto.py

+ 28 - 12
attic/crypto.pyx

@@ -7,6 +7,9 @@ from libc.stdlib cimport malloc, free
 
 
 API_VERSION = 2
 API_VERSION = 2
 
 
+AES_CTR_MODE = 1
+AES_GCM_MODE = 2
+
 TAG_SIZE = 16  # bytes; 128 bits is the maximum allowed value. see "hack" below.
 TAG_SIZE = 16  # bytes; 128 bits is the maximum allowed value. see "hack" below.
 IV_SIZE = 16  # bytes; 128 bits
 IV_SIZE = 16  # bytes; 128 bits
 
 
@@ -25,6 +28,7 @@ cdef extern from "openssl/evp.h":
     ctypedef struct ENGINE:
     ctypedef struct ENGINE:
         pass
         pass
     const EVP_MD *EVP_sha256()
     const EVP_MD *EVP_sha256()
+    const EVP_CIPHER *EVP_aes_256_ctr()
     const EVP_CIPHER *EVP_aes_256_gcm()
     const EVP_CIPHER *EVP_aes_256_gcm()
     void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
     void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
     void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
     void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
@@ -99,12 +103,19 @@ cdef class AES:
     """
     """
     cdef EVP_CIPHER_CTX ctx
     cdef EVP_CIPHER_CTX ctx
     cdef int is_encrypt
     cdef int is_encrypt
+    cdef int mode
 
 
-    def __cinit__(self, is_encrypt, key, iv=None):
+    def __cinit__(self, mode, is_encrypt, key, iv=None):
         EVP_CIPHER_CTX_init(&self.ctx)
         EVP_CIPHER_CTX_init(&self.ctx)
+        self.mode = mode
         self.is_encrypt = is_encrypt
         self.is_encrypt = is_encrypt
         # Set cipher type and mode
         # Set cipher type and mode
-        cipher_mode = EVP_aes_256_gcm()
+        if mode == AES_CTR_MODE:
+            cipher_mode = EVP_aes_256_ctr()
+        elif mode == AES_GCM_MODE:
+            cipher_mode = EVP_aes_256_gcm()
+        else:
+            raise Exception('unknown mode')
         if self.is_encrypt:
         if self.is_encrypt:
             if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
             if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
                 raise Exception('EVP_EncryptInit_ex failed')
                 raise Exception('EVP_EncryptInit_ex failed')
@@ -123,9 +134,10 @@ cdef class AES:
             key2 = key
             key2 = key
         if iv:
         if iv:
             iv2 = iv
             iv2 = iv
-        # Set IV length (bytes)
-        if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_IVLEN, IV_SIZE, NULL):
-            raise Exception('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
+        if self.mode == AES_GCM_MODE:
+            # Set IV length (bytes)
+            if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_IVLEN, IV_SIZE, NULL):
+                raise Exception('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
         # Initialise key and IV
         # Initialise key and IV
         if self.is_encrypt:
         if self.is_encrypt:
             if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
             if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
@@ -137,6 +149,8 @@ cdef class AES:
     def add(self, aad):
     def add(self, aad):
         cdef int aadl = len(aad)
         cdef int aadl = len(aad)
         cdef int outl
         cdef int outl
+        if self.mode != AES_GCM_MODE:
+            raise Exception('additional data only supported for AES GCM mode')
         # Zero or more calls to specify any AAD
         # Zero or more calls to specify any AAD
         if self.is_encrypt:
         if self.is_encrypt:
             if not EVP_EncryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
             if not EVP_EncryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
@@ -161,9 +175,10 @@ cdef class AES:
             if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
             if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
                 raise Exception('EVP_EncryptFinal failed')
                 raise Exception('EVP_EncryptFinal failed')
             ctl += outl
             ctl += outl
-            # Get tag
-            if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_GET_TAG, TAG_SIZE, tag):
-                raise Exception('EVP_CIPHER_CTX_ctrl GET TAG failed')
+            if self.mode == AES_GCM_MODE:
+                # Get tag (only GCM mode. for CTR, the returned tag is undefined)
+                if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_GET_TAG, TAG_SIZE, tag):
+                    raise Exception('EVP_CIPHER_CTX_ctrl GET TAG failed')
             # hack: caller wants 32B tags (256b), so we give back that amount
             # hack: caller wants 32B tags (256b), so we give back that amount
             return (tag[:TAG_SIZE] + b'\x00'*16), out[:ctl]
             return (tag[:TAG_SIZE] + b'\x00'*16), out[:ctl]
         finally:
         finally:
@@ -184,11 +199,12 @@ cdef class AES:
             if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl):
             if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl):
                 raise Exception('EVP_DecryptUpdate failed')
                 raise Exception('EVP_DecryptUpdate failed')
             ptl = outl
             ptl = outl
-            # Set expected tag value.
-            if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_TAG, TAG_SIZE, tag):
-                raise Exception('EVP_CIPHER_CTX_ctrl SET TAG failed')
+            if self.mode == AES_GCM_MODE:
+                # Set expected tag value.
+                if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_TAG, TAG_SIZE, tag):
+                    raise Exception('EVP_CIPHER_CTX_ctrl SET TAG failed')
             if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
             if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
-                # a failure here means corrupted / tampered tag or data
+                # for GCM mode, a failure here means corrupted / tampered tag or data
                 raise Exception('EVP_DecryptFinal failed')
                 raise Exception('EVP_DecryptFinal failed')
             ptl += outl
             ptl += outl
             return out[:ptl]
             return out[:ptl]

+ 36 - 9
attic/key.py

@@ -16,7 +16,8 @@ except ImportError:
     except ImportError:
     except ImportError:
         lzma = None
         lzma = None
 
 
-from attic.crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
+from attic.crypto import pbkdf2_sha256, get_random_bytes, AES, AES_CTR_MODE, AES_GCM_MODE, \
+    bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
 from attic.helpers import IntegrityError, get_keys_dir, Error
 from attic.helpers import IntegrityError, get_keys_dir, Error
 
 
 # we do not store the full IV on disk, as the upper 8 bytes are expected to be
 # we do not store the full IV on disk, as the upper 8 bytes are expected to be
@@ -124,7 +125,7 @@ class GMAC:
         self.data = data
         self.data = data
 
 
     def digest(self):
     def digest(self):
-        mac_cipher = AES(is_encrypt=True, key=self.key, iv=b'\0' * 16)
+        mac_cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=self.key, iv=b'\0' * 16)
         # GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data
         # GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data
         mac_cipher.add(bytes(self.data))
         mac_cipher.add(bytes(self.data))
         tag, _ = mac_cipher.compute_tag_and_encrypt(b'')
         tag, _ = mac_cipher.compute_tag_and_encrypt(b'')
@@ -187,7 +188,30 @@ class PLAIN:
 
 
 class AES_CTR_HMAC:
 class AES_CTR_HMAC:
     TYPE = 1
     TYPE = 1
-    # TODO
+
+    def __init__(self, enc_key=b'\0' * 32, enc_iv=b'\0' * 16, enc_hmac_key=b'\0' * 32, **kw):
+        self.hmac_key = enc_hmac_key
+        self.enc_iv = enc_iv
+        self.enc_cipher = AES(mode=AES_CTR_MODE, is_encrypt=True, key=enc_key, iv=enc_iv)
+        self.dec_cipher = AES(mode=AES_CTR_MODE, is_encrypt=False, key=enc_key)
+
+    def compute_tag_and_encrypt(self, data):
+        self.enc_cipher.reset(iv=self.enc_iv)
+        iv_last8 = self.enc_iv[8:]
+        _, data = self.enc_cipher.compute_tag_and_encrypt(data)
+        # increase the IV (counter) value so same value is never used twice
+        current_iv = bytes_to_long(iv_last8)
+        self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data)))
+        tag = HMAC(self.hmac_key, iv_last8 + data, sha256).digest()  # XXX mac / hash flexibility
+        return tag, iv_last8, data
+
+    def check_tag_and_decrypt(self, tag, iv_last8, data):
+        iv = PREFIX + iv_last8
+        if HMAC(self.hmac_key, iv_last8 + data, sha256).digest() != tag:
+            raise IntegrityError('Encryption envelope checksum mismatch')
+        self.dec_cipher.reset(iv=iv)
+        data = self.dec_cipher.check_tag_and_decrypt(None, data)
+        return data
 
 
 
 
 class AES_GCM:
 class AES_GCM:
@@ -196,8 +220,8 @@ class AES_GCM:
     def __init__(self, enc_key=b'\0' * 32, enc_iv=b'\0' * 16, **kw):
     def __init__(self, enc_key=b'\0' * 32, enc_iv=b'\0' * 16, **kw):
         # note: hmac_key is not used for aes-gcm, it does aes+gmac in 1 pass
         # note: hmac_key is not used for aes-gcm, it does aes+gmac in 1 pass
         self.enc_iv = enc_iv
         self.enc_iv = enc_iv
-        self.enc_cipher = AES(is_encrypt=True, key=enc_key, iv=enc_iv)
-        self.dec_cipher = AES(is_encrypt=False, key=enc_key)
+        self.enc_cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=enc_key, iv=enc_iv)
+        self.dec_cipher = AES(mode=AES_GCM_MODE, is_encrypt=False, key=enc_key)
 
 
     def compute_tag_and_encrypt(self, data):
     def compute_tag_and_encrypt(self, data):
         self.enc_cipher.reset(iv=self.enc_iv)
         self.enc_cipher.reset(iv=self.enc_iv)
@@ -292,9 +316,10 @@ class PlaintextKey(KeyBase):
 class AESKeyBase(KeyBase):
 class AESKeyBase(KeyBase):
     """Common base class shared by KeyfileKey and PassphraseKey
     """Common base class shared by KeyfileKey and PassphraseKey
 
 
-    Chunks are encrypted using 256bit AES in Galois Counter Mode (GCM)
+    Chunks are encrypted using 256bit AES in CTR or GCM mode.
+    Chunks are authenticated by a GCM GMAC or a HMAC.
 
 
-    Payload layout: TYPE(1) + TAG(32) + NONCE(8) + CIPHERTEXT
+    Payload layout: TYPE(1) + MAC(32) + NONCE(8) + CIPHERTEXT
 
 
     To reduce payload size only 8 bytes of the 16 bytes nonce is saved
     To reduce payload size only 8 bytes of the 16 bytes nonce is saved
     in the payload, the first 8 bytes are always zeros. This does not
     in the payload, the first 8 bytes are always zeros. This does not
@@ -433,7 +458,8 @@ class KeyfileKey(AESKeyBase):
         assert d[b'algorithm'] == b'gmac'
         assert d[b'algorithm'] == b'gmac'
         key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
         key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
         try:
         try:
-            data = AES(is_encrypt=False, key=key, iv=b'\0'*16).check_tag_and_decrypt(d[b'hash'], d[b'data'])
+            cipher = AES(mode=AES_GCM_MODE, is_encrypt=False, key=key, iv=b'\0'*16)
+            data = cipher.check_tag_and_decrypt(d[b'hash'], d[b'data'])
             return data
             return data
         except Exception:
         except Exception:
             return None
             return None
@@ -442,7 +468,8 @@ class KeyfileKey(AESKeyBase):
         salt = get_random_bytes(32)
         salt = get_random_bytes(32)
         iterations = 100000
         iterations = 100000
         key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
         key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
-        tag, cdata = AES(is_encrypt=True, key=key, iv=b'\0'*16).compute_tag_and_encrypt(data)
+        cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=key, iv=b'\0'*16)
+        tag, cdata = cipher.compute_tag_and_encrypt(data)
         d = {
         d = {
             'version': 1,
             'version': 1,
             'salt': salt,
             'salt': salt,

+ 18 - 4
attic/testsuite/crypto.py

@@ -1,6 +1,7 @@
 from binascii import hexlify
 from binascii import hexlify
 from attic.testsuite import AtticTestCase
 from attic.testsuite import AtticTestCase
-from attic.crypto import AES, bytes_to_long, bytes_to_int, long_to_bytes, pbkdf2_sha256, get_random_bytes
+from attic.crypto import pbkdf2_sha256, get_random_bytes, AES, AES_GCM_MODE, AES_CTR_MODE, \
+    bytes_to_long, bytes_to_int, long_to_bytes
 
 
 
 
 class CryptoTestCase(AtticTestCase):
 class CryptoTestCase(AtticTestCase):
@@ -27,20 +28,33 @@ class CryptoTestCase(AtticTestCase):
         self.assert_equal(len(bytes2), 10)
         self.assert_equal(len(bytes2), 10)
         self.assert_not_equal(bytes, bytes2)
         self.assert_not_equal(bytes, bytes2)
 
 
+    def test_aes_ctr(self):
+        key = b'X' * 32
+        iv = b'\0' * 16
+        data = b'foo' * 10
+        # encrypt
+        aes = AES(mode=AES_CTR_MODE, is_encrypt=True, key=key, iv=iv)
+        _, cdata = aes.compute_tag_and_encrypt(data)
+        self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
+        # decrypt (correct tag/cdata)
+        aes = AES(mode=AES_CTR_MODE, is_encrypt=False, key=key, iv=iv)
+        pdata = aes.check_tag_and_decrypt(None, cdata)
+        self.assert_equal(data, pdata)
+
     def test_aes_gcm(self):
     def test_aes_gcm(self):
         key = b'X' * 32
         key = b'X' * 32
         iv = b'A' * 16
         iv = b'A' * 16
         data = b'foo' * 10
         data = b'foo' * 10
         # encrypt
         # encrypt
-        aes = AES(is_encrypt=True, key=key, iv=iv)
+        aes = AES(mode=AES_GCM_MODE, is_encrypt=True, key=key, iv=iv)
         tag, cdata = aes.compute_tag_and_encrypt(data)
         tag, cdata = aes.compute_tag_and_encrypt(data)
         self.assert_equal(hexlify(tag), b'c98aa10eb6b7031bcc2160878d9438fb00000000000000000000000000000000')
         self.assert_equal(hexlify(tag), b'c98aa10eb6b7031bcc2160878d9438fb00000000000000000000000000000000')
         self.assert_equal(hexlify(cdata), b'841bcce405df769d22ee9f7f012edf5dc7fb2594d924c7400ffd050f2741')
         self.assert_equal(hexlify(cdata), b'841bcce405df769d22ee9f7f012edf5dc7fb2594d924c7400ffd050f2741')
         # decrypt (correct tag/cdata)
         # decrypt (correct tag/cdata)
-        aes = AES(is_encrypt=False, key=key, iv=iv)
+        aes = AES(mode=AES_GCM_MODE, is_encrypt=False, key=key, iv=iv)
         pdata = aes.check_tag_and_decrypt(tag, cdata)
         pdata = aes.check_tag_and_decrypt(tag, cdata)
         self.assert_equal(data, pdata)
         self.assert_equal(data, pdata)
         # decrypt (incorrect tag/cdata)
         # decrypt (incorrect tag/cdata)
-        aes = AES(is_encrypt=False, key=key, iv=iv)
+        aes = AES(mode=AES_GCM_MODE, is_encrypt=False, key=key, iv=iv)
         cdata = b'x' + cdata[1:]  # corrupt cdata
         cdata = b'x' + cdata[1:]  # corrupt cdata
         self.assertRaises(Exception, aes.check_tag_and_decrypt, tag, cdata)
         self.assertRaises(Exception, aes.check_tag_and_decrypt, tag, cdata)