| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874 | from binascii import hexlify, a2b_base64, b2a_base64from getpass import getpassimport osimport msgpackimport textwrapfrom collections import namedtupleimport hmacfrom hashlib import sha1, sha256, sha512import zlibtry:    import lzma  # python >= 3.3except ImportError:    try:        from backports import lzma  # backports.lzma from pypi    except ImportError:        lzma = Nonetry:    import bloscexcept ImportError:    blosc = Nonefrom attic.crypto import pbkdf2_sha256, get_random_bytes, AES, AES_CTR_MODE, AES_GCM_MODE, \    bytes_to_int, increment_ivfrom attic.helpers import IntegrityError, get_keys_dir, ErrorMeta = namedtuple('Meta', 'compr_type, key_type, mac_type, cipher_type, iv, legacy')class UnsupportedPayloadError(Error):    """Unsupported payload type {}. A newer version is required to access this repository.    """class sha512_256(object):  # note: can't subclass sha512    """sha512, but digest truncated to 256bit - faster than sha256 on 64bit platforms"""    digestsize = digest_size = 32    block_size = 64    def __init__(self, data=None):        self.name = 'sha512-256'        self._h = sha512()        if data:            self.update(data)    def update(self, data):        self._h.update(data)    def digest(self):        return self._h.digest()[:self.digest_size]    def hexdigest(self):        return self._h.hexdigest()[:self.digest_size * 2]    def copy(self):        new = sha512_256.__new__(sha512_256)        new._h = self._h.copy()        return new# HASH / MAC stuff below all has a mac-like interface, so it can be used in the same way.# special case: hashes do not use keys (and thus, do not sign/authenticate)class HASH:  # note: we can't subclass sha1/sha256/sha512    TYPE = 0  # override in subclass    digest_size = 0  # override in subclass    hash_func = None  # override in subclass    def __init__(self, key, data=b''):        # signature is like for a MAC, we ignore the key as this is a simple hash        if key is not None:            raise Exception("use a HMAC if you have a key")        self.h = self.hash_func(data)    def update(self, data):        self.h.update(data)    def digest(self):        return self.h.digest()    def hexdigest(self):        return self.h.hexdigest()class SHA256(HASH):    TYPE = 0    digest_size = 32    hash_func = sha256class SHA512_256(HASH):    TYPE = 1    digest_size = 32    hash_func = sha512_256class GHASH:    TYPE = 2    digest_size = 16    def __init__(self, key, data=b''):        # signature is like for a MAC, we ignore the key as this is a simple hash        if key is not None:            raise Exception("use a MAC if you have a key")        self.mac_cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=b'\0' * 32, iv=b'\0' * 16)        if data:            self.update(data)    def update(self, data):        # GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data        self.mac_cipher.add(bytes(data))    def digest(self):        hash, _ = self.mac_cipher.compute_mac_and_encrypt(b'')        return hashclass SHA1(HASH):    TYPE = 3    digest_size = 20    hash_func = sha1class SHA512(HASH):    TYPE = 4    digest_size = 64    hash_func = sha512class HMAC(hmac.HMAC):    TYPE = 0  # override in subclass    digest_size = 0  # override in subclass    hash_func = None  # override in subclass    def __init__(self, key, data):        if key is None:            raise Exception("do not use HMAC if you don't have a key")        super().__init__(key, data, self.hash_func)    def update(self, msg):        # Workaround a bug in Python < 3.4 Where HMAC does not accept memoryviews        self.inner.update(msg)class HMAC_SHA256(HMAC):    TYPE = 10    digest_size = 32    hash_func = sha256class HMAC_SHA512_256(HMAC):    TYPE = 11    digest_size = 32    hash_func = sha512_256class HMAC_SHA1(HMAC):    TYPE = 13    digest_size = 20    hash_func = sha1class HMAC_SHA512(HMAC):    TYPE = 14    digest_size = 64    hash_func = sha512class GMAC(GHASH):    TYPE = 20    digest_size = 16    def __init__(self, key, data=b''):        if key is None:            raise Exception("do not use GMAC if you don't have a key")        self.mac_cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=key, iv=b'\0' * 16)        if data:            self.update(data)# defaults are optimized for speed on modern CPUs with AES hw supportHASH_DEFAULT = GHASH.TYPEMAC_DEFAULT = GMAC.TYPE# compressor classes, all same interfaceclass NullCompressor(object):  # uses 0 in the mapping    TYPE = 0    def compress(self, data):        return bytes(data)    def decompress(self, data):        return bytes(data)class ZlibCompressor(object):  # uses 1..9 in the mapping    TYPE = 0    LEVELS = range(10)    def compress(self, data):        level = self.TYPE - ZlibCompressor.TYPE        return zlib.compress(data, level)    def decompress(self, data):        return zlib.decompress(data)class LzmaCompressor(object):  # uses 10..19 in the mapping    TYPE = 10    PRESETS = range(10)    def __init__(self):        if lzma is None:            raise NotImplemented("lzma compression needs Python >= 3.3 or backports.lzma from PyPi")    def compress(self, data):        preset = self.TYPE - LzmaCompressor.TYPE        return lzma.compress(data, preset=preset)    def decompress(self, data):        return lzma.decompress(data)class BLOSCCompressor(object):    TYPE = 0  # override in subclass    LEVELS = range(10)    CNAME = ''  # override in subclass    def __init__(self):        if blosc is None:            raise NotImplemented("%s compression needs blosc from PyPi" % self.CNAME)        if self.CNAME not in blosc.compressor_list():            raise NotImplemented("%s compression is not supported by blosc" % self.CNAME)        blosc.set_blocksize(16384)  # 16kiB is the minimum, so 64kiB are enough for 4 threads    def _get_level(self):        raise NotImplemented    def compress(self, data):        return blosc.compress(bytes(data), 1, cname=self.CNAME, clevel=self._get_level())    def decompress(self, data):        return blosc.decompress(data)class LZ4Compressor(BLOSCCompressor):    TYPE = 20    CNAME = 'lz4'    def _get_level(self):        return self.TYPE - LZ4Compressor.TYPEclass LZ4HCCompressor(BLOSCCompressor):    TYPE = 30    CNAME = 'lz4hc'    def _get_level(self):        return self.TYPE - LZ4HCCompressor.TYPEclass BLOSCLZCompressor(BLOSCCompressor):    TYPE = 40    CNAME = 'blosclz'    def _get_level(self):        return self.TYPE - BLOSCLZCompressor.TYPEclass SnappyCompressor(BLOSCCompressor):    TYPE = 50    CNAME = 'snappy'    def _get_level(self):        return self.TYPE - SnappyCompressor.TYPEclass BLOSCZlibCompressor(BLOSCCompressor):    TYPE = 60    CNAME = 'zlib'    def _get_level(self):        return self.TYPE - BLOSCZlibCompressor.TYPE# default is optimized for speedCOMPR_DEFAULT = NullCompressor.TYPE # no compression# ciphers - AEAD (authenticated encryption with assoc. data) style interface# special case: PLAIN dummy does not encrypt / authenticateclass PLAIN:    TYPE = 0    enc_iv = None  # dummy    def __init__(self, **kw):        pass    def compute_mac_and_encrypt(self, meta, data):        return None, data    def check_mac_and_decrypt(self, mac, meta, data):        return datadef get_aad(meta):    """get additional authenticated data for AEAD ciphers"""    if meta.legacy:        # legacy format computed the mac over (iv_last8 +  data)        return meta.iv[8:]    else:        return msgpack.packb(meta)class AES_CTR_HMAC:    TYPE = 1    def __init__(self, enc_key=b'\0' * 32, enc_iv=b'\0' * 16, enc_hmac_key=b'\0' * 32, **kw):        self.hmac_key = enc_hmac_key        self.enc_iv = enc_iv        self.enc_cipher = AES(mode=AES_CTR_MODE, is_encrypt=True, key=enc_key, iv=enc_iv)        self.dec_cipher = AES(mode=AES_CTR_MODE, is_encrypt=False, key=enc_key)    def compute_mac_and_encrypt(self, meta, data):        self.enc_cipher.reset(iv=meta.iv)        _, data = self.enc_cipher.compute_mac_and_encrypt(data)        self.enc_iv = increment_iv(meta.iv, len(data))        aad = get_aad(meta)        mac = HMAC(self.hmac_key, aad + data, sha256).digest()  # XXX mac / hash flexibility        return mac, data    def check_mac_and_decrypt(self, mac, meta, data):        aad = get_aad(meta)        if HMAC(self.hmac_key, aad + data, sha256).digest() != mac:            raise IntegrityError('Encryption envelope checksum mismatch')        self.dec_cipher.reset(iv=meta.iv)        data = self.dec_cipher.check_mac_and_decrypt(None, data)        return dataclass AES_GCM:    TYPE = 2    def __init__(self, enc_key=b'\0' * 32, enc_iv=b'\0' * 16, **kw):        # note: hmac_key is not used for aes-gcm, it does aes+gmac in 1 pass        self.enc_iv = enc_iv        self.enc_cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=enc_key, iv=enc_iv)        self.dec_cipher = AES(mode=AES_GCM_MODE, is_encrypt=False, key=enc_key)    def compute_mac_and_encrypt(self, meta, data):        self.enc_cipher.reset(iv=meta.iv)        aad = get_aad(meta)        self.enc_cipher.add(aad)        mac, data = self.enc_cipher.compute_mac_and_encrypt(data)        self.enc_iv = increment_iv(meta.iv, len(data))        return mac, data    def check_mac_and_decrypt(self, mac, meta, data):        self.dec_cipher.reset(iv=meta.iv)        aad = get_aad(meta)        self.dec_cipher.add(aad)        try:            data = self.dec_cipher.check_mac_and_decrypt(mac, data)        except Exception:            raise IntegrityError('Encryption envelope checksum mismatch')        return data# cipher default is optimized for speed on modern CPUs with AES hw supportPLAIN_DEFAULT = PLAIN.TYPECIPHER_DEFAULT = AES_GCM.TYPE# misc. types of keys# special case: no keys (thus: no encryption, no signing/authentication)class KeyBase(object):    TYPE = 0x00  # override in derived classes    def __init__(self, compressor_cls, maccer_cls, cipher_cls):        self.compressor = compressor_cls()        self.maccer_cls = maccer_cls  # hasher/maccer used by id_hash        self.cipher_cls = cipher_cls  # plaintext dummy or AEAD cipher        self.cipher = cipher_cls()        self.id_key = None    def id_hash(self, data):        """Return a HASH (no id_key) or a MAC (using the "id_key" key)        XXX do we need a cryptographic hash function here or is a keyed hash        function like GMAC / GHASH good enough? See NIST SP 800-38D.        IMPORTANT: in 1 repo, there should be only 1 kind of id_hash, otherwise        data hashed/maced with one id_hash might result in same ID as already        exists in the repo for other data created with another id_hash method.        somehow unlikely considering 128 or 256bits, but still.        """        return self.maccer_cls(self.id_key, data).digest()    def encrypt(self, data):        data = self.compressor.compress(data)        meta = Meta(compr_type=self.compressor.TYPE, key_type=self.TYPE,                    mac_type=self.maccer_cls.TYPE, cipher_type=self.cipher.TYPE,                    iv=self.cipher.enc_iv, legacy=False)        mac, data = self.cipher.compute_mac_and_encrypt(meta, data)        return generate(mac, meta, data)    def decrypt(self, id, data):        mac, meta, data = parser(data)        compressor, keyer, maccer, cipher = get_implementations(meta)        assert isinstance(self, keyer)        assert self.maccer_cls is maccer        assert self.cipher_cls is cipher        data = self.cipher.check_mac_and_decrypt(mac, meta, data)        data = self.compressor.decompress(data)        if id and self.id_hash(data) != id:            raise IntegrityError('Chunk id verification failed')        return dataclass PlaintextKey(KeyBase):    TYPE = 0x02    chunk_seed = 0    @classmethod    def create(cls, repository, args):        print('Encryption NOT enabled.\nUse the "--encryption=passphrase|keyfile" to enable encryption.')        compressor = compressor_creator(args)        maccer = maccer_creator(args, cls)        cipher = cipher_creator(args, cls)        return cls(compressor, maccer, cipher)    @classmethod    def detect(cls, repository, manifest_data):        mac, meta, data = parser(manifest_data)        compressor, keyer, maccer, cipher = get_implementations(meta)        return cls(compressor, maccer, cipher)class AESKeyBase(KeyBase):    """Common base class shared by KeyfileKey and PassphraseKey    Chunks are encrypted using 256bit AES in CTR or GCM mode.    Chunks are authenticated by a GCM GMAC or a HMAC.    Payload layout: TYPE(1) + MAC(32) + NONCE(8) + CIPHERTEXT    To reduce payload size only 8 bytes of the 16 bytes nonce is saved    in the payload, the first 8 bytes are always zeros. This does not    affect security but limits the maximum repository capacity to    only 295 exabytes!    """    def extract_iv(self, payload):        _, meta, _ = parser(payload)        return meta.iv    def init_from_random_data(self, data):        self.enc_key = data[0:32]        self.enc_hmac_key = data[32:64]        self.id_key = data[64:96]        self.chunk_seed = bytes_to_int(data[96:100])        # Convert to signed int32        if self.chunk_seed & 0x80000000:            self.chunk_seed = self.chunk_seed - 0xffffffff - 1    def init_ciphers(self, enc_iv=b'\0' * 16):        self.cipher = self.cipher_cls(enc_key=self.enc_key, enc_iv=enc_iv,                                      enc_hmac_key=self.enc_hmac_key)    @property    def enc_iv(self):        return self.cipher.enc_ivclass PassphraseKey(AESKeyBase):    TYPE = 0x01    iterations = 100000    @classmethod    def create(cls, repository, args):        compressor = compressor_creator(args)        maccer = maccer_creator(args, cls)        cipher = cipher_creator(args, cls)        key = cls(compressor, maccer, cipher)        passphrase = os.environ.get('ATTIC_PASSPHRASE')        if passphrase is not None:            passphrase2 = passphrase        else:            passphrase, passphrase2 = 1, 2        while passphrase != passphrase2:            passphrase = getpass('Enter passphrase: ')            if not passphrase:                print('Passphrase must not be blank')                continue            passphrase2 = getpass('Enter same passphrase again: ')            if passphrase != passphrase2:                print('Passphrases do not match')        key.init(repository, passphrase)        if passphrase:            print('Remember your passphrase. Your data will be inaccessible without it.')        return key    @classmethod    def detect(cls, repository, manifest_data):        prompt = 'Enter passphrase for %s: ' % repository._location.orig        mac, meta, data = parser(manifest_data)        compressor, keyer, maccer, cipher = get_implementations(meta)        key = cls(compressor, maccer, cipher)        passphrase = os.environ.get('ATTIC_PASSPHRASE')        if passphrase is None:            passphrase = getpass(prompt)        while True:            key.init(repository, passphrase)            try:                key.decrypt(None, manifest_data)                key.init_ciphers(increment_iv(key.extract_iv(manifest_data), len(data)))                return key            except IntegrityError:                passphrase = getpass(prompt)    def change_passphrase(self):        class ImmutablePassphraseError(Error):            """The passphrase for this encryption key type can't be changed."""        raise ImmutablePassphraseError    def init(self, repository, passphrase):        self.init_from_random_data(pbkdf2_sha256(passphrase.encode('utf-8'), repository.id, self.iterations, 100))        self.init_ciphers()class KeyfileKey(AESKeyBase):    FILE_ID = 'ATTIC KEY'    TYPE = 0x00    @classmethod    def detect(cls, repository, manifest_data):        mac, meta, data = parser(manifest_data)        compressor, keyer, maccer, cipher = get_implementations(meta)        key = cls(compressor, maccer, cipher)        path = cls.find_key_file(repository)        prompt = 'Enter passphrase for key file %s: ' % path        passphrase = os.environ.get('ATTIC_PASSPHRASE', '')        while not key.load(path, passphrase):            passphrase = getpass(prompt)        key.init_ciphers(increment_iv(key.extract_iv(manifest_data), len(data)))        return key    @classmethod    def find_key_file(cls, repository):        id = hexlify(repository.id).decode('ascii')        keys_dir = get_keys_dir()        for name in os.listdir(keys_dir):            filename = os.path.join(keys_dir, name)            with open(filename, 'r') as fd:                line = fd.readline().strip()                if line and line.startswith(cls.FILE_ID) and line[10:] == id:                    return filename        raise Exception('Key file for repository with ID %s not found' % id)    def load(self, filename, passphrase):        with open(filename, 'r') as fd:            cdata = a2b_base64(''.join(fd.readlines()[1:]).encode('ascii'))  # .encode needed for Python 3.[0-2]        data = self.decrypt_key_file(cdata, passphrase)        if data:            key = msgpack.unpackb(data)            if key[b'version'] != 1:                raise IntegrityError('Invalid key file header')            self.repository_id = key[b'repository_id']            self.enc_key = key[b'enc_key']            self.enc_hmac_key = key[b'enc_hmac_key']            self.id_key = key[b'id_key']            self.chunk_seed = key[b'chunk_seed']            self.path = filename            return True    def decrypt_key_file(self, data, passphrase):        d = msgpack.unpackb(data)        assert d[b'version'] == 1        assert d[b'algorithm'] == b'gmac'        key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)        try:            cipher = AES(mode=AES_GCM_MODE, is_encrypt=False, key=key, iv=b'\0'*16)            data = cipher.check_mac_and_decrypt(d[b'hash'], d[b'data'])            return data        except Exception:            return None    def encrypt_key_file(self, data, passphrase):        salt = get_random_bytes(32)        iterations = 100000        key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)        cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=key, iv=b'\0'*16)        mac, cdata = cipher.compute_mac_and_encrypt(data)        d = {            'version': 1,            'salt': salt,            'iterations': iterations,            'algorithm': 'gmac',            'hash': mac,            'data': cdata,        }        return msgpack.packb(d)    def save(self, path, passphrase):        key = {            'version': 1,            'repository_id': self.repository_id,            'enc_key': self.enc_key,            'enc_hmac_key': self.enc_hmac_key,            'id_key': self.id_key,            'chunk_seed': self.chunk_seed,        }        data = self.encrypt_key_file(msgpack.packb(key), passphrase)        with open(path, 'w') as fd:            fd.write('%s %s\n' % (self.FILE_ID, hexlify(self.repository_id).decode('ascii')))            fd.write('\n'.join(textwrap.wrap(b2a_base64(data).decode('ascii'))))            fd.write('\n')        self.path = path    def change_passphrase(self):        passphrase, passphrase2 = 1, 2        while passphrase != passphrase2:            passphrase = getpass('New passphrase: ')            passphrase2 = getpass('Enter same passphrase again: ')            if passphrase != passphrase2:                print('Passphrases do not match')        self.save(self.path, passphrase)        print('Key file "%s" updated' % self.path)    @classmethod    def create(cls, repository, args):        filename = args.repository.to_key_filename()        path = filename        i = 1        while os.path.exists(path):            i += 1            path = filename + '.%d' % i        passphrase = os.environ.get('ATTIC_PASSPHRASE')        if passphrase is not None:            passphrase2 = passphrase        else:            passphrase, passphrase2 = 1, 2        while passphrase != passphrase2:            passphrase = getpass('Enter passphrase (empty for no passphrase):')            passphrase2 = getpass('Enter same passphrase again: ')            if passphrase != passphrase2:                print('Passphrases do not match')        compressor = compressor_creator(args)        maccer = maccer_creator(args, cls)        cipher = cipher_creator(args, cls)        key = cls(compressor, maccer, cipher)        key.repository_id = repository.id        key.init_from_random_data(get_random_bytes(100))        key.init_ciphers()        key.save(path, passphrase)        print('Key file "%s" created.' % key.path)        print('Keep this file safe. Your data will be inaccessible without it.')        return key# note: key 0 nicely maps to a zlib compressor with level 0 which means "no compression"compressor_mapping = {}for level in ZlibCompressor.LEVELS:    compressor_mapping[ZlibCompressor.TYPE + level] = \        type('ZlibCompressorLevel%d' % level, (ZlibCompressor, ), dict(TYPE=ZlibCompressor.TYPE + level))for preset in LzmaCompressor.PRESETS:    compressor_mapping[LzmaCompressor.TYPE + preset] = \        type('LzmaCompressorPreset%d' % preset, (LzmaCompressor, ), dict(TYPE=LzmaCompressor.TYPE + preset))for level in LZ4Compressor.LEVELS:    compressor_mapping[LZ4Compressor.TYPE + level] = \        type('LZ4CompressorLevel%d' % level, (LZ4Compressor, ), dict(TYPE=LZ4Compressor.TYPE + level))for level in LZ4HCCompressor.LEVELS:    compressor_mapping[LZ4HCCompressor.TYPE + level] = \        type('LZ4HCCompressorLevel%d' % level, (LZ4HCCompressor, ), dict(TYPE=LZ4HCCompressor.TYPE + level))for level in BLOSCLZCompressor.LEVELS:    compressor_mapping[BLOSCLZCompressor.TYPE + level] = \        type('BLOSCLZCompressorLevel%d' % level, (BLOSCLZCompressor, ), dict(TYPE=BLOSCLZCompressor.TYPE + level))for level in SnappyCompressor.LEVELS:    compressor_mapping[SnappyCompressor.TYPE + level] = \        type('SnappyCompressorLevel%d' % level, (SnappyCompressor, ), dict(TYPE=SnappyCompressor.TYPE + level))for level in BLOSCZlibCompressor.LEVELS:    compressor_mapping[BLOSCZlibCompressor.TYPE + level] = \        type('BLOSCZlibCompressorLevel%d' % level, (BLOSCZlibCompressor, ), dict(TYPE=BLOSCZlibCompressor.TYPE + level))# overwrite 0 with NullCompressorcompressor_mapping[NullCompressor.TYPE] = NullCompressorkeyer_mapping = {    KeyfileKey.TYPE: KeyfileKey,    PassphraseKey.TYPE: PassphraseKey,    PlaintextKey.TYPE: PlaintextKey,}maccer_mapping = {    # simple hashes, not MACs (but MAC-like class __init__ method signature):    SHA1.TYPE: SHA1,    SHA256.TYPE: SHA256,    SHA512_256.TYPE: SHA512_256,    SHA512.TYPE: SHA512,    GHASH.TYPE: GHASH,    # MACs:    HMAC_SHA1.TYPE: HMAC_SHA1,    HMAC_SHA256.TYPE: HMAC_SHA256,    HMAC_SHA512_256.TYPE: HMAC_SHA512_256,    HMAC_SHA512.TYPE: HMAC_SHA512,    GMAC.TYPE: GMAC,}cipher_mapping = {    # no cipher (but cipher-like class __init__ method signature):    PLAIN.TYPE: PLAIN,    # AEAD cipher implementations    AES_CTR_HMAC.TYPE: AES_CTR_HMAC,    AES_GCM.TYPE: AES_GCM,}def get_implementations(meta):    try:        compressor = compressor_mapping[meta.compr_type]        keyer = keyer_mapping[meta.key_type]        maccer = maccer_mapping[meta.mac_type]        cipher = cipher_mapping[meta.cipher_type]    except KeyError:        raise UnsupportedPayloadError("compr_type %x key_type %x mac_type %x cipher_type %x" % (            meta.compr_type, meta.key_type, meta.mac_type, meta.cipher_type))    return compressor, keyer, maccer, cipherdef legacy_parser(all_data, key_type):  # all rather hardcoded    """    Payload layout:    no encryption:   TYPE(1) + data    with encryption: TYPE(1) + HMAC(32) + NONCE(8) + data    data is compressed with zlib level 6 and (in the 2nd case) encrypted.    To reduce payload size only 8 bytes of the 16 bytes nonce is saved    in the payload, the first 8 bytes are always zeros. This does not    affect security but limits the maximum repository capacity to    only 295 exabytes!    """    offset = 1    if key_type == PlaintextKey.TYPE:        mac_type = SHA256.TYPE        mac = None        cipher_type = PLAIN.TYPE        iv = None        data = all_data[offset:]    else:        mac_type = HMAC_SHA256.TYPE        mac = all_data[offset:offset+32]        cipher_type = AES_CTR_HMAC.TYPE        # legacy attic did not store the full IV on disk, as the upper 8 bytes        # are expected to be zero anyway as the full IV is a 128bit counter.        iv = b'\0' * 8 + all_data[offset+32:offset+40]        data = all_data[offset+40:]    meta = Meta(compr_type=6, key_type=key_type, mac_type=mac_type,                cipher_type=cipher_type, iv=iv, legacy=True)    return mac, meta, datadef parser00(all_data):    return legacy_parser(all_data, KeyfileKey.TYPE)def parser01(all_data):    return legacy_parser(all_data, PassphraseKey.TYPE)def parser02(all_data):    return legacy_parser(all_data, PlaintextKey.TYPE)def parser03(all_data):  # new & flexible    """    Payload layout:    always: TYPE(1) + MSGPACK((mac, meta, data))    meta is a Meta namedtuple and contains all required information about data.    data is maybe compressed (see meta) and maybe encrypted (see meta).    """    max_len = 10000000  # XXX formula?    unpacker = msgpack.Unpacker(        use_list=False,        # avoid memory allocation issues causes by tampered input data.        max_buffer_size=max_len,  # does not work in 0.4.6 unpackb C implementation        max_array_len=10,  # meta_tuple        max_bin_len=max_len,  # data        max_str_len=0,  # not used yet        max_map_len=0,  # not used yet        max_ext_len=0,  # not used yet        )    unpacker.feed(all_data[1:])    mac, meta_tuple, data = unpacker.unpack()    meta = Meta(*meta_tuple)    return mac, meta, datadef parser(data):    parser_mapping = {        0x00: parser00,        0x01: parser01,        0x02: parser02,        0x03: parser03,    }    header_type = data[0]    parser_func = parser_mapping[header_type]    return parser_func(data)def key_factory(repository, manifest_data):    mac, meta, data = parser(manifest_data)    compressor, keyer, maccer, cipher = get_implementations(meta)    return keyer.detect(repository, manifest_data)def generate(mac, meta, data):    # always create new-style 0x03 format    return b'\x03' + msgpack.packb((mac, meta, data), use_bin_type=True)def compressor_creator(args):    # args == None is used by unit tests    compression = COMPR_DEFAULT if args is None else args.compression    compressor = compressor_mapping.get(compression)    if compressor is None:        raise NotImplementedError("no compression %d" % args.compression)    return compressordef key_creator(args):    if args.encryption == 'keyfile':        return KeyfileKey    if args.encryption == 'passphrase':        return PassphraseKey    if args.encryption == 'none':        return PlaintextKey    raise NotImplemented("no encryption %s" % args.encryption)def maccer_creator(args, key_cls):    # args == None is used by unit tests    mac = None if args is None else args.mac    if mac is None:        if key_cls is PlaintextKey:            mac = HASH_DEFAULT        elif key_cls in (KeyfileKey, PassphraseKey):            mac = MAC_DEFAULT        else:            raise NotImplementedError("unknown key class")    maccer = maccer_mapping.get(mac)    if maccer is None:        raise NotImplementedError("no mac %d" % args.mac)    return maccerdef cipher_creator(args, key_cls):    # args == None is used by unit tests    cipher = None if args is None else args.cipher    if cipher is None:        if key_cls is PlaintextKey:            cipher = PLAIN_DEFAULT        elif key_cls in (KeyfileKey, PassphraseKey):            cipher = CIPHER_DEFAULT        else:            raise NotImplementedError("unknown key class")    cipher = cipher_mapping.get(cipher)    if cipher is None:        raise NotImplementedError("no cipher %d" % args.cipher)    return cipher
 |