浏览代码

use parser to completely analyze given format into meta and data, use generator to create format from meta and data

Thomas Waldmann 10 年之前
父节点
当前提交
7cecee0b29
共有 1 个文件被更改,包括 78 次插入44 次删除
  1. 78 44
      attic/key.py

+ 78 - 44
attic/key.py

@@ -3,6 +3,7 @@ from getpass import getpass
 import os
 import msgpack
 import textwrap
+from collections import namedtuple
 import hmac
 from hashlib import sha256, sha512
 import zlib
@@ -143,6 +144,9 @@ class LzmaCompressor(object):  # uses 10..19 in the mapping
 COMPR_DEFAULT = ZlibCompressor.TYPE + 6  # zlib level 6
 
 
+Meta = namedtuple('Meta', 'compr_type, crypt_type, mac_type, hmac, iv, stored_iv')
+
+
 class KeyBase(object):
     TYPE = 0x00  # override in derived classes
 
@@ -175,21 +179,23 @@ class PlaintextKey(KeyBase):
 
     @classmethod
     def detect(cls, repository, manifest_data):
-        offset, compressor, crypter, maccer = parser(manifest_data)
+        meta, data, compressor, crypter, maccer = parser(manifest_data)
         return cls(compressor, maccer)
 
     def id_hash(self, data):
         return self.maccer(None, data).digest()
 
     def encrypt(self, data):
-        header = make_header(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE)
-        return b''.join([header, self.compressor.compress(data)])
+        meta = Meta(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE,
+                    hmac=None, iv=None, stored_iv=None)
+        data = self.compressor.compress(data)
+        return generate(meta, data)
 
     def decrypt(self, id, data):
-        offset, compressor, crypter, maccer = parser(data)
+        meta, data, compressor, crypter, maccer = parser(data)
         assert isinstance(self, crypter)
         assert self.maccer is maccer
-        data = self.compressor.decompress(memoryview(data)[offset:])
+        data = self.compressor.decompress(data)
         if id and self.id_hash(data) != id:
             raise IntegrityError('Chunk id verification failed')
         return data
@@ -207,7 +213,7 @@ class AESKeyBase(KeyBase):
     affect security but limits the maximum repository capacity to
     only 295 exabytes!
     """
-    PAYLOAD_OVERHEAD = 4 + 32 + 8  # HEADER + HMAC + NONCE
+    PAYLOAD_OVERHEAD = 4 + 32 + 8  # HEADER + HMAC + NONCE, TODO: get rid of this
 
     def id_hash(self, data):
         """Return HMAC hash using the "id" HMAC key
@@ -217,29 +223,31 @@ class AESKeyBase(KeyBase):
     def encrypt(self, data):
         data = self.compressor.compress(data)
         self.enc_cipher.reset()
-        data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
-        hmac = self.maccer(self.enc_hmac_key, data).digest()
-        header = make_header(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE)
-        return b''.join((header, hmac, data))
+        stored_iv = self.enc_cipher.iv[8:]
+        iv = PREFIX + stored_iv
+        data = self.enc_cipher.encrypt(data)
+        hmac = self.maccer(self.enc_hmac_key, stored_iv + data).digest()
+        meta = Meta(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE,
+                    hmac=hmac, iv=iv, stored_iv=stored_iv)
+        return generate(meta, data)
 
     def decrypt(self, id, data):
-        offset, compressor, crypter, maccer = parser(data)
+        meta, data, compressor, crypter, maccer = parser(data)
         assert isinstance(self, crypter)
         assert self.maccer is maccer
-        hmac = memoryview(data)[offset:offset+32]
-        computed_hmac = memoryview(self.maccer(self.enc_hmac_key, memoryview(data)[offset+32:]).digest())
-        if computed_hmac != hmac:
+        computed_hmac = self.maccer(self.enc_hmac_key, meta.stored_iv + data).digest()
+        if computed_hmac != meta.hmac:
             raise IntegrityError('Encryption envelope checksum mismatch')
-        self.dec_cipher.reset(iv=PREFIX + data[offset+32:offset+40])
-        data = self.compressor.decompress(self.dec_cipher.decrypt(data[offset+40:]))  # should use memoryview
+        self.dec_cipher.reset(iv=meta.iv)
+        data = self.compressor.decompress(self.dec_cipher.decrypt(data))
         if id and self.id_hash(data) != id:
             raise IntegrityError('Chunk id verification failed')
         return data
 
     def extract_nonce(self, payload):
-        offset, compressor, crypter, maccer = parser(payload)
+        meta, data, compressor, crypter, maccer = parser(payload)
         assert isinstance(self, crypter)
-        nonce = bytes_to_long(payload[offset+32:offset+40])
+        nonce = bytes_to_long(meta.stored_iv)
         return nonce
 
     def init_from_random_data(self, data):
@@ -286,7 +294,7 @@ class PassphraseKey(AESKeyBase):
     @classmethod
     def detect(cls, repository, manifest_data):
         prompt = 'Enter passphrase for %s: ' % repository._location.orig
-        offset, compressor, crypter, maccer = parser(manifest_data)
+        meta, data, compressor, crypter, maccer = parser(manifest_data)
         key = cls(compressor, maccer)
         passphrase = os.environ.get('ATTIC_PASSPHRASE')
         if passphrase is None:
@@ -295,7 +303,7 @@ class PassphraseKey(AESKeyBase):
             key.init(repository, passphrase)
             try:
                 key.decrypt(None, manifest_data)
-                num_blocks = num_aes_blocks(len(manifest_data) - offset - 40)
+                num_blocks = num_aes_blocks(len(data))
                 key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks))
                 return key
             except IntegrityError:
@@ -312,14 +320,14 @@ class KeyfileKey(AESKeyBase):
 
     @classmethod
     def detect(cls, repository, manifest_data):
-        offset, compressor, crypter, maccer = parser(manifest_data)
+        meta, data, compressor, crypter, maccer = parser(manifest_data)
         key = cls(compressor, maccer)
         path = cls.find_key_file(repository)
         prompt = 'Enter passphrase for key file %s: ' % path
         passphrase = os.environ.get('ATTIC_PASSPHRASE', '')
         while not key.load(path, passphrase):
             passphrase = getpass(prompt)
-        num_blocks = num_aes_blocks(len(manifest_data) - offset - 40)
+        num_blocks = num_aes_blocks(len(data))
         key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks))
         return key
 
@@ -460,10 +468,6 @@ maccer_mapping = {
 }
 
 
-from collections import namedtuple
-Meta = namedtuple('Meta', 'compr_type, crypt_type, mac_type')
-
-
 def get_implementations(meta):
     try:
         compressor = compressor_mapping[meta.compr_type]
@@ -475,31 +479,55 @@ def get_implementations(meta):
     return compressor, crypter, maccer
 
 
-def parser00(data):  # legacy, hardcoded
+def parser00(all_data):  # legacy, hardcoded
     offset = 1
-    meta = Meta(compr_type=6, crypt_type=KeyfileKey.TYPE, mac_type=HMAC_SHA256.TYPE)
+    hmac = all_data[offset:offset+32]
+    stored_iv = all_data[offset+32:offset+40]
+    iv = PREFIX + stored_iv
+    data = all_data[offset+40:]
+    meta = Meta(compr_type=6, crypt_type=KeyfileKey.TYPE, mac_type=HMAC_SHA256.TYPE,
+                hmac=hmac, iv=iv, stored_iv=stored_iv)
     compressor, crypter, maccer = get_implementations(meta)
-    return offset, compressor, crypter, maccer
+    return meta, data, compressor, crypter, maccer
 
-def parser01(data):  # legacy, hardcoded
+def parser01(all_data):  # legacy, hardcoded
     offset = 1
-    meta = Meta(compr_type=6, crypt_type=PassphraseKey.TYPE, mac_type=HMAC_SHA256.TYPE)
+    hmac = all_data[offset:offset+32]
+    stored_iv = all_data[offset+32:offset+40]
+    iv = PREFIX + stored_iv
+    data = all_data[offset+40:]
+    meta = Meta(compr_type=6, crypt_type=PassphraseKey.TYPE, mac_type=HMAC_SHA256.TYPE,
+                hmac=hmac, iv=iv, stored_iv=stored_iv)
     compressor, crypter, maccer = get_implementations(meta)
-    return offset, compressor, crypter, maccer
+    return meta, data, compressor, crypter, maccer
 
-def parser02(data):  # legacy, hardcoded
+def parser02(all_data):  # legacy, hardcoded
     offset = 1
-    meta = Meta(compr_type=6, crypt_type=PlaintextKey.TYPE, mac_type=SHA256.TYPE)
+    hmac = None
+    iv = stored_iv = None
+    data = all_data[offset:]
+    meta = Meta(compr_type=6, crypt_type=PlaintextKey.TYPE, mac_type=SHA256.TYPE,
+                hmac=hmac, iv=iv, stored_iv=stored_iv)
     compressor, crypter, maccer = get_implementations(meta)
-    return offset, compressor, crypter, maccer
+    return meta, data, compressor, crypter, maccer
 
 
-def parser03(data):  # new & flexible
+def parser03(all_data):  # new & flexible
     offset = 4
-    compr_type, crypt_type, mac_type = data[1:offset]
-    meta = Meta(compr_type=compr_type, crypt_type=crypt_type, mac_type=mac_type)
+    compr_type, crypt_type, mac_type = all_data[1:offset]
+    if crypt_type == PlaintextKey.TYPE:
+        hmac = None
+        iv = stored_iv = None
+        data = all_data[offset:]
+    else:
+        hmac = all_data[offset:offset+32]
+        stored_iv = all_data[offset+32:offset+40]
+        iv = PREFIX + stored_iv
+        data = all_data[offset+40:]
+    meta = Meta(compr_type=compr_type, crypt_type=crypt_type, mac_type=mac_type,
+                hmac=hmac, iv=iv, stored_iv=stored_iv)
     compressor, crypter, maccer = get_implementations(meta)
-    return offset, compressor, crypter, maccer
+    return meta, data, compressor, crypter, maccer
 
 
 def parser(data):
@@ -515,14 +543,20 @@ def parser(data):
 
 
 def key_factory(repository, manifest_data):
-    offset, compressor, crypter, maccer = parser(manifest_data)
+    meta, data, compressor, crypter, maccer = parser(manifest_data)
     return crypter.detect(repository, manifest_data)
 
 
-def make_header(compr_type, crypt_type, mac_type):
-    # always create new-style 0x03 headers
-    return bytes([0x03, compr_type, crypt_type, mac_type])
-
+def generate(meta, data):
+    # always create new-style 0x03 format
+    start = bytes([0x03, meta.compr_type, meta.crypt_type, meta.mac_type])
+    if meta.crypt_type == PlaintextKey.TYPE:
+        result = start + data
+    else:
+        assert len(meta.hmac) == 32
+        assert len(meta.stored_iv) == 8
+        result = start + meta.hmac + meta.stored_iv + data
+    return result
 
 def compressor_creator(args):
     # args == None is used by unit tests