소스 검색

Crypto code cleanup and test improvements.

Jonas Borgström 11 년 전
부모
커밋
b92a620600
4개의 변경된 파일50개의 추가작업 그리고 15개의 파일을 삭제
  1. 8 0
      attic/crypto.py
  2. 22 6
      attic/key.py
  3. 3 2
      attic/testsuite/archiver.py
  4. 17 7
      attic/testsuite/key.py

+ 8 - 0
attic/crypto.py

@@ -1,3 +1,5 @@
+"""A thin ctypes based wrapper for OpenSSL 1.0
+"""
 import sys
 from ctypes import cdll, c_char_p, c_int, c_uint, c_void_p, POINTER, create_string_buffer
 from ctypes.util import find_library
@@ -7,8 +9,10 @@ libcrypto = cdll.LoadLibrary(find_library('crypto'))
 # Default libcrypto on OS X is too old, try the brew version
 if not hasattr(libcrypto, 'PKCS5_PBKDF2_HMAC') and sys.platform == 'darwin':
     libcrypto = cdll.LoadLibrary('/usr/local/opt/openssl/lib/libcrypto.dylib')
+# Default libcrypto on FreeBSD is too old, try the ports version
 if not hasattr(libcrypto, 'PKCS5_PBKDF2_HMAC') and sys.platform.startswith('freebsd'):
     libcrypto = cdll.LoadLibrary('/usr/local/lib/libcrypto.so')
+
 libcrypto.PKCS5_PBKDF2_HMAC.argtypes = (c_char_p, c_int, c_char_p, c_int, c_int, c_void_p, c_int, c_char_p)
 libcrypto.EVP_sha256.restype = c_void_p
 libcrypto.AES_set_encrypt_key.argtypes = (c_char_p, c_int, c_char_p)
@@ -29,6 +33,8 @@ def num_aes_blocks(length):
 
 
 def pbkdf2_sha256(password, salt, iterations, size):
+    """Password based key derivation function 2 (RFC2898)
+    """
     key = create_string_buffer(size)
     rv = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), iterations, libcrypto.EVP_sha256(), size, key)
     if not rv:
@@ -46,6 +52,8 @@ def get_random_bytes(n):
 
 
 class AES:
+    """A thin wrapper around the OpenSSL AES CTR_MODE cipher
+    """
     def __init__(self, key, iv=None):
         self.key = create_string_buffer(2000)
         self.iv = create_string_buffer(16)

+ 22 - 6
attic/key.py

@@ -7,14 +7,15 @@ import hmac
 from hashlib import sha256
 import zlib
 
-from .crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int
-from .helpers import IntegrityError, get_keys_dir
+from attic.crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
+from attic.helpers import IntegrityError, get_keys_dir
 
 PREFIX = b'\0' * 8
 
 
 class HMAC(hmac.HMAC):
-
+    """Workaround a bug in Python < 3.4 Where HMAC does not accept memoryviews
+    """
     def update(self, msg):
         self.inner.update(msg)
 
@@ -85,6 +86,19 @@ class PlaintextKey(KeyBase):
 
 
 class AESKeyBase(KeyBase):
+    """Common base class shared by KeyfileKey and PassphraseKey
+
+    Chunks are encrypted using 256bit AES in Counter Mode (CTR)
+
+    Payload layout: TYPE(1) + HMAC(32) + NONCE(8) + CIPHERTEXT
+
+    To reduce payload size only 8 bytes of the 16 bytes nonce is saved
+    in the payload, the first 8 bytes are always zeros. This does not
+    affect security but limits the maximum repository capacity to
+    only 295 exabytes!
+    """
+
+    PAYLOAD_OVERHEAD = 1 + 32 + 8  # TYPE + HMAC + NONCE
 
     def id_hash(self, data):
         """Return HMAC hash using the "id" HMAC key
@@ -110,7 +124,7 @@ class AESKeyBase(KeyBase):
             raise IntegrityError('Chunk id verification failed')
         return data
 
-    def extract_iv(self, payload):
+    def extract_nonce(self, payload):
         if payload[0] != self.TYPE:
             raise IntegrityError('Invalid encryption envelope')
         nonce = bytes_to_long(payload[33:41])
@@ -166,7 +180,8 @@ class PassphraseKey(AESKeyBase):
             key.init(repository, passphrase)
             try:
                 key.decrypt(None, manifest_data)
-                key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000))
+                num_blocks = num_aes_blocks(len(manifest_data) - 41)
+                key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks))
                 return key
             except IntegrityError:
                 passphrase = getpass(prompt)
@@ -188,7 +203,8 @@ class KeyfileKey(AESKeyBase):
         passphrase = os.environ.get('ATTIC_PASSPHRASE', '')
         while not key.load(path, passphrase):
             passphrase = getpass(prompt)
-        key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000))
+        num_blocks = num_aes_blocks(len(manifest_data) - 41)
+        key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks))
         return key
 
     @classmethod

+ 3 - 2
attic/testsuite/archiver.py

@@ -267,8 +267,8 @@ class ArchiverTestCase(AtticTestCase):
                 if not hash in seen:
                     seen.add(hash)
                     num_blocks = num_aes_blocks(len(data) - 41)
-                    start = bytes_to_long(data[33:41])
-                    for counter in range(start, start + num_blocks):
+                    nonce = bytes_to_long(data[33:41])
+                    for counter in range(nonce, nonce + num_blocks):
                         self.assert_not_in(counter, used)
                         used.add(counter)
 
@@ -282,6 +282,7 @@ class ArchiverTestCase(AtticTestCase):
         verify_uniqueness()
         self.attic('delete', self.repository_location + '::test.2')
         verify_uniqueness()
+        self.assert_equal(used, set(range(len(used))))
 
     def test_aes_counter_uniqueness_keyfile(self):
         self.verify_aes_counter_uniqueness('keyfile')

+ 17 - 7
attic/testsuite/key.py

@@ -3,7 +3,7 @@ import re
 import shutil
 import tempfile
 from binascii import hexlify
-from attic.crypto import bytes_to_long
+from attic.crypto import bytes_to_long, num_aes_blocks
 from attic.testsuite import AtticTestCase
 from attic.key import PlaintextKey, PassphraseKey, KeyfileKey
 from attic.helpers import Location, unhexlify
@@ -54,10 +54,15 @@ class KeyTestCase(AtticTestCase):
         os.environ['ATTIC_PASSPHRASE'] = 'test'
         key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
         self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0)
-        manifest = key.encrypt(b'')
-        iv = key.extract_iv(manifest)
+        manifest = key.encrypt(b'XXX')
+        self.assert_equal(key.extract_nonce(manifest), 0)
+        manifest2 = key.encrypt(b'XXX')
+        self.assert_not_equal(manifest, manifest2)
+        self.assert_equal(key.decrypt(None, manifest), key.decrypt(None, manifest2))
+        self.assert_equal(key.extract_nonce(manifest2), 1)
+        iv = key.extract_nonce(manifest)
         key2 = KeyfileKey.detect(self.MockRepository(), manifest)
-        self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000)
+        self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD))
         # Key data sanity check
         self.assert_equal(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3)
         self.assert_equal(key2.chunk_seed == 0, False)
@@ -79,10 +84,15 @@ class KeyTestCase(AtticTestCase):
         self.assert_equal(hexlify(key.enc_hmac_key), b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901')
         self.assert_equal(hexlify(key.enc_key), b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a')
         self.assert_equal(key.chunk_seed, -775740477)
-        manifest = key.encrypt(b'')
-        iv = key.extract_iv(manifest)
+        manifest = key.encrypt(b'XXX')
+        self.assert_equal(key.extract_nonce(manifest), 0)
+        manifest2 = key.encrypt(b'XXX')
+        self.assert_not_equal(manifest, manifest2)
+        self.assert_equal(key.decrypt(None, manifest), key.decrypt(None, manifest2))
+        self.assert_equal(key.extract_nonce(manifest2), 1)
+        iv = key.extract_nonce(manifest)
         key2 = PassphraseKey.detect(self.MockRepository(), manifest)
-        self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000)
+        self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD))
         self.assert_equal(key.id_key, key2.id_key)
         self.assert_equal(key.enc_hmac_key, key2.enc_hmac_key)
         self.assert_equal(key.enc_key, key2.enc_key)