|
@@ -3,20 +3,59 @@ from getpass import getpass
|
|
|
import os
|
|
|
import msgpack
|
|
|
import textwrap
|
|
|
+from collections import namedtuple
|
|
|
import hmac
|
|
|
-from hashlib import sha256
|
|
|
+from hashlib import sha256, sha512
|
|
|
import zlib
|
|
|
|
|
|
+try:
|
|
|
+ import lzma # python >= 3.3
|
|
|
+except ImportError:
|
|
|
+ try:
|
|
|
+ from backports import lzma # backports.lzma from pypi
|
|
|
+ except ImportError:
|
|
|
+ lzma = None
|
|
|
+
|
|
|
from attic.crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
|
|
|
from attic.helpers import IntegrityError, get_keys_dir, Error
|
|
|
|
|
|
+# we do not store the full IV on disk, as the upper 8 bytes are expected to be
|
|
|
+# zero anyway as the full IV is a 128bit counter. PREFIX are the upper 8 bytes,
|
|
|
+# stored_iv are the lower 8 Bytes.
|
|
|
PREFIX = b'\0' * 8
|
|
|
+Meta = namedtuple('Meta', 'compr_type, crypt_type, mac_type, hmac, stored_iv')
|
|
|
|
|
|
|
|
|
class UnsupportedPayloadError(Error):
|
|
|
"""Unsupported payload type {}. A newer version is required to access this repository.
|
|
|
"""
|
|
|
|
|
|
+class sha512_256(object): # note: can't subclass sha512
|
|
|
+ """sha512, but digest truncated to 256bit - faster than sha256 on 64bit platforms"""
|
|
|
+ digestsize = digest_size = 32
|
|
|
+ block_size = 64
|
|
|
+
|
|
|
+ def __init__(self, data=None):
|
|
|
+ self.name = 'sha512-256'
|
|
|
+ self._h = sha512()
|
|
|
+ if data:
|
|
|
+ self.update(data)
|
|
|
+
|
|
|
+ def update(self, data):
|
|
|
+ self._h.update(data)
|
|
|
+
|
|
|
+ def digest(self):
|
|
|
+ return self._h.digest()[:self.digest_size]
|
|
|
+
|
|
|
+ def hexdigest(self):
|
|
|
+ return self._h.hexdigest()[:self.digest_size * 2]
|
|
|
+
|
|
|
+ def copy(self):
|
|
|
+ new = sha512_256.__new__(sha512_256)
|
|
|
+ new._h = self._h.copy()
|
|
|
+ return new
|
|
|
+
|
|
|
+
|
|
|
class HMAC(hmac.HMAC):
|
|
|
"""Workaround a bug in Python < 3.4 Where HMAC does not accept memoryviews
|
|
|
"""
|
|
@@ -24,30 +63,97 @@ class HMAC(hmac.HMAC):
|
|
|
self.inner.update(msg)
|
|
|
|
|
|
|
|
|
-def key_creator(repository, args):
|
|
|
- if args.encryption == 'keyfile':
|
|
|
- return KeyfileKey.create(repository, args)
|
|
|
- elif args.encryption == 'passphrase':
|
|
|
- return PassphraseKey.create(repository, args)
|
|
|
- else:
|
|
|
- return PlaintextKey.create(repository, args)
|
|
|
+class SHA256(object): # note: can't subclass sha256
|
|
|
+ TYPE = 0
|
|
|
|
|
|
+ def __init__(self, key, data=b''):
|
|
|
+ # signature is like for a MAC, we ignore the key as this is a simple hash
|
|
|
+ if key is not None:
|
|
|
+ raise Exception("use a HMAC if you have a key")
|
|
|
+ self.h = sha256(data)
|
|
|
|
|
|
-def key_factory(repository, manifest_data):
|
|
|
- if manifest_data[0] == KeyfileKey.TYPE:
|
|
|
- return KeyfileKey.detect(repository, manifest_data)
|
|
|
- elif manifest_data[0] == PassphraseKey.TYPE:
|
|
|
- return PassphraseKey.detect(repository, manifest_data)
|
|
|
- elif manifest_data[0] == PlaintextKey.TYPE:
|
|
|
- return PlaintextKey.detect(repository, manifest_data)
|
|
|
- else:
|
|
|
- raise UnsupportedPayloadError(manifest_data[0])
|
|
|
+ def update(self, data):
|
|
|
+ self.h.update(data)
|
|
|
|
|
|
+ def digest(self):
|
|
|
+ return self.h.digest()
|
|
|
+
|
|
|
+ def hexdigest(self):
|
|
|
+ return self.h.hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+class SHA512_256(sha512_256):
|
|
|
+ """sha512, but digest truncated to 256bit - faster than sha256 on 64bit platforms"""
|
|
|
+ TYPE = 1
|
|
|
+
|
|
|
+ def __init__(self, key, data):
|
|
|
+ # signature is like for a MAC, we ignore the key as this is a simple hash
|
|
|
+ if key is not None:
|
|
|
+ raise Exception("use a HMAC if you have a key")
|
|
|
+ super().__init__(data)
|
|
|
+
|
|
|
+
|
|
|
+HASH_DEFAULT = SHA256.TYPE
|
|
|
|
|
|
-class KeyBase(object):
|
|
|
+
|
|
|
+class HMAC_SHA256(HMAC):
|
|
|
+ TYPE = 10
|
|
|
+
|
|
|
+ def __init__(self, key, data):
|
|
|
+ if key is None:
|
|
|
+ raise Exception("do not use HMAC if you don't have a key")
|
|
|
+ super().__init__(key, data, sha256)
|
|
|
+
|
|
|
+
|
|
|
+class HMAC_SHA512_256(HMAC):
|
|
|
+ TYPE = 11
|
|
|
+
|
|
|
+ def __init__(self, key, data):
|
|
|
+ if key is None:
|
|
|
+ raise Exception("do not use HMAC if you don't have a key")
|
|
|
+ super().__init__(key, data, sha512_256)
|
|
|
+
|
|
|
+
|
|
|
+MAC_DEFAULT = HMAC_SHA256.TYPE
|
|
|
+
|
|
|
+
|
|
|
+class ZlibCompressor(object): # uses 0..9 in the mapping
|
|
|
+ TYPE = 0
|
|
|
+ LEVELS = range(10)
|
|
|
+
|
|
|
+ def compress(self, data):
|
|
|
+ level = self.TYPE - ZlibCompressor.TYPE
|
|
|
+ return zlib.compress(data, level)
|
|
|
+
|
|
|
+ def decompress(self, data):
|
|
|
+ return zlib.decompress(data)
|
|
|
+
|
|
|
+
|
|
|
+class LzmaCompressor(object): # uses 10..19 in the mapping
|
|
|
+ TYPE = 10
|
|
|
+ PRESETS = range(10)
|
|
|
|
|
|
def __init__(self):
|
|
|
- self.TYPE_STR = bytes([self.TYPE])
|
|
|
+ if lzma is None:
|
|
|
+ raise NotImplemented("lzma compression needs Python >= 3.3 or backports.lzma from PyPi")
|
|
|
+
|
|
|
+ def compress(self, data):
|
|
|
+ preset = self.TYPE - LzmaCompressor.TYPE
|
|
|
+ return lzma.compress(data, preset=preset)
|
|
|
+
|
|
|
+ def decompress(self, data):
|
|
|
+ return lzma.decompress(data)
|
|
|
+
|
|
|
+
|
|
|
+COMPR_DEFAULT = ZlibCompressor.TYPE + 6 # zlib level 6
|
|
|
+
|
|
|
+
|
|
|
+class KeyBase(object):
|
|
|
+ TYPE = 0x00 # override in derived classes
|
|
|
+
|
|
|
+ def __init__(self, compressor, maccer):
|
|
|
+ self.compressor = compressor()
|
|
|
+ self.maccer = maccer
|
|
|
|
|
|
def id_hash(self, data):
|
|
|
"""Return HMAC hash using the "id" HMAC key
|
|
@@ -68,23 +174,30 @@ class PlaintextKey(KeyBase):
|
|
|
@classmethod
|
|
|
def create(cls, repository, args):
|
|
|
print('Encryption NOT enabled.\nUse the "--encryption=passphrase|keyfile" to enable encryption.')
|
|
|
- return cls()
|
|
|
+ compressor = compressor_creator(args)
|
|
|
+ maccer = maccer_creator(args, cls)
|
|
|
+ return cls(compressor, maccer)
|
|
|
|
|
|
@classmethod
|
|
|
def detect(cls, repository, manifest_data):
|
|
|
- return cls()
|
|
|
+ meta, data, compressor, crypter, maccer = parser(manifest_data)
|
|
|
+ return cls(compressor, maccer)
|
|
|
|
|
|
def id_hash(self, data):
|
|
|
- return sha256(data).digest()
|
|
|
+ return self.maccer(None, data).digest()
|
|
|
|
|
|
def encrypt(self, data):
|
|
|
- return b''.join([self.TYPE_STR, zlib.compress(data)])
|
|
|
+ meta = Meta(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE,
|
|
|
+ hmac=None, stored_iv=None)
|
|
|
+ data = self.compressor.compress(data)
|
|
|
+ return generate(meta, data)
|
|
|
|
|
|
def decrypt(self, id, data):
|
|
|
- if data[0] != self.TYPE:
|
|
|
- raise IntegrityError('Invalid encryption envelope')
|
|
|
- data = zlib.decompress(memoryview(data)[1:])
|
|
|
- if id and sha256(data).digest() != id:
|
|
|
+ meta, data, compressor, crypter, maccer = parser(data)
|
|
|
+ assert isinstance(self, crypter)
|
|
|
+ assert self.maccer is maccer
|
|
|
+ data = self.compressor.decompress(data)
|
|
|
+ if id and self.id_hash(data) != id:
|
|
|
raise IntegrityError('Chunk id verification failed')
|
|
|
return data
|
|
|
|
|
@@ -93,45 +206,39 @@ class AESKeyBase(KeyBase):
|
|
|
"""Common base class shared by KeyfileKey and PassphraseKey
|
|
|
|
|
|
Chunks are encrypted using 256bit AES in Counter Mode (CTR)
|
|
|
-
|
|
|
- Payload layout: TYPE(1) + HMAC(32) + NONCE(8) + CIPHERTEXT
|
|
|
-
|
|
|
- To reduce payload size only 8 bytes of the 16 bytes nonce is saved
|
|
|
- in the payload, the first 8 bytes are always zeros. This does not
|
|
|
- affect security but limits the maximum repository capacity to
|
|
|
- only 295 exabytes!
|
|
|
"""
|
|
|
-
|
|
|
- PAYLOAD_OVERHEAD = 1 + 32 + 8 # TYPE + HMAC + NONCE
|
|
|
-
|
|
|
def id_hash(self, data):
|
|
|
"""Return HMAC hash using the "id" HMAC key
|
|
|
"""
|
|
|
- return HMAC(self.id_key, data, sha256).digest()
|
|
|
+ return self.maccer(self.id_key, data).digest()
|
|
|
|
|
|
def encrypt(self, data):
|
|
|
- data = zlib.compress(data)
|
|
|
+ data = self.compressor.compress(data)
|
|
|
self.enc_cipher.reset()
|
|
|
- data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
|
|
|
- hmac = HMAC(self.enc_hmac_key, data, sha256).digest()
|
|
|
- return b''.join((self.TYPE_STR, hmac, data))
|
|
|
+ stored_iv = self.enc_cipher.iv[8:]
|
|
|
+ data = self.enc_cipher.encrypt(data)
|
|
|
+ hmac = self.maccer(self.enc_hmac_key, stored_iv + data).digest()
|
|
|
+ meta = Meta(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE,
|
|
|
+ hmac=hmac, stored_iv=stored_iv)
|
|
|
+ return generate(meta, data)
|
|
|
|
|
|
def decrypt(self, id, data):
|
|
|
- if data[0] != self.TYPE:
|
|
|
- raise IntegrityError('Invalid encryption envelope')
|
|
|
- hmac = memoryview(data)[1:33]
|
|
|
- if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hmac:
|
|
|
+ meta, data, compressor, crypter, maccer = parser(data)
|
|
|
+ assert isinstance(self, crypter)
|
|
|
+ assert self.maccer is maccer
|
|
|
+ computed_hmac = self.maccer(self.enc_hmac_key, meta.stored_iv + data).digest()
|
|
|
+ if computed_hmac != meta.hmac:
|
|
|
raise IntegrityError('Encryption envelope checksum mismatch')
|
|
|
- self.dec_cipher.reset(iv=PREFIX + data[33:41])
|
|
|
- data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview
|
|
|
- if id and HMAC(self.id_key, data, sha256).digest() != id:
|
|
|
+ self.dec_cipher.reset(iv=PREFIX + meta.stored_iv)
|
|
|
+ data = self.compressor.decompress(self.dec_cipher.decrypt(data))
|
|
|
+ if id and self.id_hash(data) != id:
|
|
|
raise IntegrityError('Chunk id verification failed')
|
|
|
return data
|
|
|
|
|
|
def extract_nonce(self, payload):
|
|
|
- if payload[0] != self.TYPE:
|
|
|
- raise IntegrityError('Invalid encryption envelope')
|
|
|
- nonce = bytes_to_long(payload[33:41])
|
|
|
+ meta, data, compressor, crypter, maccer = parser(payload)
|
|
|
+ assert isinstance(self, crypter)
|
|
|
+ nonce = bytes_to_long(meta.stored_iv)
|
|
|
return nonce
|
|
|
|
|
|
def init_from_random_data(self, data):
|
|
@@ -154,7 +261,9 @@ class PassphraseKey(AESKeyBase):
|
|
|
|
|
|
@classmethod
|
|
|
def create(cls, repository, args):
|
|
|
- key = cls()
|
|
|
+ compressor = compressor_creator(args)
|
|
|
+ maccer = maccer_creator(args, cls)
|
|
|
+ key = cls(compressor, maccer)
|
|
|
passphrase = os.environ.get('ATTIC_PASSPHRASE')
|
|
|
if passphrase is not None:
|
|
|
passphrase2 = passphrase
|
|
@@ -176,7 +285,8 @@ class PassphraseKey(AESKeyBase):
|
|
|
@classmethod
|
|
|
def detect(cls, repository, manifest_data):
|
|
|
prompt = 'Enter passphrase for %s: ' % repository._location.orig
|
|
|
- key = cls()
|
|
|
+ meta, data, compressor, crypter, maccer = parser(manifest_data)
|
|
|
+ key = cls(compressor, maccer)
|
|
|
passphrase = os.environ.get('ATTIC_PASSPHRASE')
|
|
|
if passphrase is None:
|
|
|
passphrase = getpass(prompt)
|
|
@@ -184,7 +294,7 @@ class PassphraseKey(AESKeyBase):
|
|
|
key.init(repository, passphrase)
|
|
|
try:
|
|
|
key.decrypt(None, manifest_data)
|
|
|
- num_blocks = num_aes_blocks(len(manifest_data) - 41)
|
|
|
+ num_blocks = num_aes_blocks(len(data))
|
|
|
key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks))
|
|
|
return key
|
|
|
except IntegrityError:
|
|
@@ -207,13 +317,14 @@ class KeyfileKey(AESKeyBase):
|
|
|
|
|
|
@classmethod
|
|
|
def detect(cls, repository, manifest_data):
|
|
|
- key = cls()
|
|
|
+ meta, data, compressor, crypter, maccer = parser(manifest_data)
|
|
|
+ key = cls(compressor, maccer)
|
|
|
path = cls.find_key_file(repository)
|
|
|
prompt = 'Enter passphrase for key file %s: ' % path
|
|
|
passphrase = os.environ.get('ATTIC_PASSPHRASE', '')
|
|
|
while not key.load(path, passphrase):
|
|
|
passphrase = getpass(prompt)
|
|
|
- num_blocks = num_aes_blocks(len(manifest_data) - 41)
|
|
|
+ num_blocks = num_aes_blocks(len(data))
|
|
|
key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks))
|
|
|
return key
|
|
|
|
|
@@ -315,7 +426,9 @@ class KeyfileKey(AESKeyBase):
|
|
|
passphrase2 = getpass('Enter same passphrase again: ')
|
|
|
if passphrase != passphrase2:
|
|
|
print('Passphrases do not match')
|
|
|
- key = cls()
|
|
|
+ compressor = compressor_creator(args)
|
|
|
+ maccer = maccer_creator(args, cls)
|
|
|
+ key = cls(compressor, maccer)
|
|
|
key.repository_id = repository.id
|
|
|
key.init_from_random_data(get_random_bytes(100))
|
|
|
key.init_ciphers()
|
|
@@ -323,3 +436,153 @@ class KeyfileKey(AESKeyBase):
|
|
|
print('Key file "%s" created.' % key.path)
|
|
|
print('Keep this file safe. Your data will be inaccessible without it.')
|
|
|
return key
|
|
|
+
|
|
|
+
|
|
|
+# note: key 0 nicely maps to a zlib compressor with level 0 which means "no compression"
|
|
|
+compressor_mapping = {}
|
|
|
+for level in ZlibCompressor.LEVELS:
|
|
|
+ compressor_mapping[ZlibCompressor.TYPE + level] = \
|
|
|
+ type('ZlibCompressorLevel%d' % level, (ZlibCompressor, ), dict(TYPE=ZlibCompressor.TYPE + level))
|
|
|
+for preset in LzmaCompressor.PRESETS:
|
|
|
+ compressor_mapping[LzmaCompressor.TYPE + preset] = \
|
|
|
+ type('LzmaCompressorPreset%d' % preset, (LzmaCompressor, ), dict(TYPE=LzmaCompressor.TYPE + preset))
|
|
|
+
|
|
|
+
|
|
|
+crypter_mapping = {
|
|
|
+ KeyfileKey.TYPE: KeyfileKey,
|
|
|
+ PassphraseKey.TYPE: PassphraseKey,
|
|
|
+ PlaintextKey.TYPE: PlaintextKey,
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+maccer_mapping = {
|
|
|
+ # simple hashes, not MACs (but MAC-like class __init__ method signature):
|
|
|
+ SHA256.TYPE: SHA256,
|
|
|
+ SHA512_256.TYPE: SHA512_256,
|
|
|
+ # MACs:
|
|
|
+ HMAC_SHA256.TYPE: HMAC_SHA256,
|
|
|
+ HMAC_SHA512_256.TYPE: HMAC_SHA512_256,
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def get_implementations(meta):
|
|
|
+ try:
|
|
|
+ compressor = compressor_mapping[meta.compr_type]
|
|
|
+ crypter = crypter_mapping[meta.crypt_type]
|
|
|
+ maccer = maccer_mapping[meta.mac_type]
|
|
|
+ except KeyError:
|
|
|
+ raise UnsupportedPayloadError("compr_type %x crypt_type %x mac_type %x" % (
|
|
|
+ meta.compr_type, meta.crypt_type, meta.mac_type))
|
|
|
+ return compressor, crypter, maccer
|
|
|
+
|
|
|
+
|
|
|
+def legacy_parser(all_data, crypt_type): # all rather hardcoded
|
|
|
+ """
|
|
|
+ Payload layout:
|
|
|
+ no encryption: TYPE(1) + data
|
|
|
+ with encryption: TYPE(1) + HMAC(32) + NONCE(8) + data
|
|
|
+ data is compressed with zlib level 6 and (in the 2nd case) encrypted.
|
|
|
+
|
|
|
+ To reduce payload size only 8 bytes of the 16 bytes nonce is saved
|
|
|
+ in the payload, the first 8 bytes are always zeros. This does not
|
|
|
+ affect security but limits the maximum repository capacity to
|
|
|
+ only 295 exabytes!
|
|
|
+ """
|
|
|
+ offset = 1
|
|
|
+ if crypt_type == PlaintextKey.TYPE:
|
|
|
+ hmac = None
|
|
|
+ iv = stored_iv = None
|
|
|
+ data = all_data[offset:]
|
|
|
+ else:
|
|
|
+ hmac = all_data[offset:offset+32]
|
|
|
+ stored_iv = all_data[offset+32:offset+40]
|
|
|
+ data = all_data[offset+40:]
|
|
|
+ meta = Meta(compr_type=6, crypt_type=crypt_type, mac_type=HMAC_SHA256.TYPE,
|
|
|
+ hmac=hmac, stored_iv=stored_iv)
|
|
|
+ compressor, crypter, maccer = get_implementations(meta)
|
|
|
+ return meta, data, compressor, crypter, maccer
|
|
|
+
|
|
|
+def parser00(all_data):
|
|
|
+ return legacy_parser(all_data, KeyfileKey.TYPE)
|
|
|
+
|
|
|
+def parser01(all_data):
|
|
|
+ return legacy_parser(all_data, PassphraseKey.TYPE)
|
|
|
+
|
|
|
+def parser02(all_data):
|
|
|
+ return legacy_parser(all_data, PlaintextKey.TYPE)
|
|
|
+
|
|
|
+
|
|
|
+def parser03(all_data): # new & flexible
|
|
|
+ """
|
|
|
+ Payload layout:
|
|
|
+ always: TYPE(1) + MSGPACK((meta, data))
|
|
|
+
|
|
|
+ meta is a Meta namedtuple and contains all required information about data.
|
|
|
+ data is maybe compressed (see meta) and maybe encrypted (see meta).
|
|
|
+ """
|
|
|
+ # TODO use Unpacker(..., max_*_len=NOTMORETHANNEEDED) to avoid any memory
|
|
|
+ # allocation issues on untrusted and potentially tampered input data.
|
|
|
+ # Problem: we currently must use older msgpack because pure python impl.
|
|
|
+ # is broken in 0.4.2 < version <= 0.4.5, but this api is only offered by
|
|
|
+ # more recent ones, not by 0.4.2. So, fix here when 0.4.6 is out. :-(
|
|
|
+ meta_tuple, data = msgpack.unpackb(all_data[1:])
|
|
|
+ meta = Meta(*meta_tuple)
|
|
|
+ compressor, crypter, maccer = get_implementations(meta)
|
|
|
+ return meta, data, compressor, crypter, maccer
|
|
|
+
|
|
|
+
|
|
|
+def parser(data):
|
|
|
+ parser_mapping = {
|
|
|
+ 0x00: parser00,
|
|
|
+ 0x01: parser01,
|
|
|
+ 0x02: parser02,
|
|
|
+ 0x03: parser03,
|
|
|
+ }
|
|
|
+ header_type = data[0]
|
|
|
+ parser_func = parser_mapping[header_type]
|
|
|
+ return parser_func(data)
|
|
|
+
|
|
|
+
|
|
|
+def key_factory(repository, manifest_data):
|
|
|
+ meta, data, compressor, crypter, maccer = parser(manifest_data)
|
|
|
+ return crypter.detect(repository, manifest_data)
|
|
|
+
|
|
|
+
|
|
|
+def generate(meta, data):
|
|
|
+ # always create new-style 0x03 format
|
|
|
+ return b'\x03' + msgpack.packb((meta, data))
|
|
|
+
|
|
|
+
|
|
|
+def compressor_creator(args):
|
|
|
+ # args == None is used by unit tests
|
|
|
+ compression = COMPR_DEFAULT if args is None else args.compression
|
|
|
+ compressor = compressor_mapping.get(compression)
|
|
|
+ if compressor is None:
|
|
|
+ raise NotImplementedError("no compression %d" % args.compression)
|
|
|
+ return compressor
|
|
|
+
|
|
|
+
|
|
|
+def key_creator(repository, args):
|
|
|
+ if args.encryption == 'keyfile':
|
|
|
+ return KeyfileKey.create(repository, args)
|
|
|
+ if args.encryption == 'passphrase':
|
|
|
+ return PassphraseKey.create(repository, args)
|
|
|
+ if args.encryption == 'none':
|
|
|
+ return PlaintextKey.create(repository, args)
|
|
|
+ raise NotImplemented("no encryption %s" % args.encryption)
|
|
|
+
|
|
|
+
|
|
|
+def maccer_creator(args, key_cls):
|
|
|
+ # args == None is used by unit tests
|
|
|
+ mac = None if args is None else args.mac
|
|
|
+ if mac is None:
|
|
|
+ if key_cls is PlaintextKey:
|
|
|
+ mac = HASH_DEFAULT
|
|
|
+ elif key_cls in (KeyfileKey, PassphraseKey):
|
|
|
+ mac = MAC_DEFAULT
|
|
|
+ else:
|
|
|
+ raise NotImplementedError("unknown key class")
|
|
|
+ maccer = maccer_mapping.get(mac)
|
|
|
+ if maccer is None:
|
|
|
+ raise NotImplementedError("no mac %d" % args.mac)
|
|
|
+ return maccer
|