key.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. from binascii import hexlify, a2b_base64, b2a_base64
  2. from getpass import getpass
  3. import os
  4. import msgpack
  5. import shutil
  6. import tempfile
  7. import unittest
  8. import hmac
  9. from hashlib import sha256
  10. import zlib
  11. from .crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int
  12. from .helpers import IntegrityError, get_keys_dir, Location
  13. PREFIX = b'\0' * 8
  14. KEYFILE = b'\0'
  15. PASSPHRASE = b'\1'
  16. PLAINTEXT = b'\2'
  17. class HMAC(hmac.HMAC):
  18. def update(self, msg):
  19. self.inner.update(msg)
  20. def key_creator(store, args):
  21. if args.keyfile:
  22. return KeyfileKey.create(store, args)
  23. elif args.passphrase:
  24. return PassphraseKey.create(store, args)
  25. else:
  26. return PlaintextKey.create(store, args)
  27. def key_factory(store, manifest_data):
  28. if manifest_data[:1] == KEYFILE:
  29. return KeyfileKey.detect(store, manifest_data)
  30. elif manifest_data[:1] == PASSPHRASE:
  31. return PassphraseKey.detect(store, manifest_data)
  32. elif manifest_data[:1] == PLAINTEXT:
  33. return PlaintextKey.detect(store, manifest_data)
  34. else:
  35. raise Exception('Unkown Key type %d' % ord(manifest_data[0]))
  36. class KeyBase(object):
  37. def id_hash(self, data):
  38. """Return HMAC hash using the "id" HMAC key
  39. """
  40. def encrypt(self, data):
  41. pass
  42. def decrypt(self, id, data):
  43. pass
  44. class PlaintextKey(KeyBase):
  45. TYPE = PLAINTEXT
  46. chunk_seed = 0
  47. @classmethod
  48. def create(cls, store, args):
  49. print('Encryption NOT enabled.\nUse the --key-file or --passphrase options to enable encryption.')
  50. return cls()
  51. @classmethod
  52. def detect(cls, store, manifest_data):
  53. return cls()
  54. def id_hash(self, data):
  55. return sha256(data).digest()
  56. def encrypt(self, data):
  57. return b''.join([self.TYPE, zlib.compress(data)])
  58. def decrypt(self, id, data):
  59. if data[:1] != self.TYPE:
  60. raise IntegrityError('Invalid encryption envelope')
  61. data = zlib.decompress(memoryview(data)[1:])
  62. if id and sha256(data).digest() != id:
  63. raise IntegrityError('Chunk id verification failed')
  64. return data
  65. class AESKeyBase(KeyBase):
  66. def id_hash(self, data):
  67. """Return HMAC hash using the "id" HMAC key
  68. """
  69. return HMAC(self.id_key, data, sha256).digest()
  70. def encrypt(self, data):
  71. data = zlib.compress(data)
  72. self.enc_cipher.reset()
  73. data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
  74. hash = HMAC(self.enc_hmac_key, data, sha256).digest()
  75. return b''.join((self.TYPE, hash, data))
  76. def decrypt(self, id, data):
  77. if data[:1] != self.TYPE:
  78. raise IntegrityError('Invalid encryption envelope')
  79. hash = memoryview(data)[1:33]
  80. if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hash:
  81. raise IntegrityError('Encryption envelope checksum mismatch')
  82. self.dec_cipher.reset(iv=PREFIX + data[33:41])
  83. data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview
  84. if id and HMAC(self.id_key, data, sha256).digest() != id:
  85. raise IntegrityError('Chunk id verification failed')
  86. return data
  87. def extract_iv(self, payload):
  88. if payload[:1] != self.TYPE:
  89. raise IntegrityError('Invalid encryption envelope')
  90. nonce = bytes_to_long(payload[33:41])
  91. return nonce
  92. def init_from_random_data(self, data):
  93. self.enc_key = data[0:32]
  94. self.enc_hmac_key = data[32:64]
  95. self.id_key = data[64:96]
  96. self.chunk_seed = bytes_to_int(data[96:100])
  97. # Convert to signed int32
  98. if self.chunk_seed & 0x80000000:
  99. self.chunk_seed = self.chunk_seed - 0xffffffff - 1
  100. def init_ciphers(self, enc_iv=b''):
  101. self.enc_cipher = AES(self.enc_key, enc_iv)
  102. self.dec_cipher = AES(self.enc_key)
  103. class PassphraseKey(AESKeyBase):
  104. TYPE = PASSPHRASE
  105. iterations = 10000
  106. @classmethod
  107. def create(cls, store, args):
  108. key = cls()
  109. passphrase = os.environ.get('DARC_PASSPHRASE')
  110. if passphrase is not None:
  111. passphrase2 = passphrase
  112. else:
  113. passphrase, passphrase2 = 1, 2
  114. while passphrase != passphrase2:
  115. passphrase = getpass('Enter passphrase: ')
  116. if not passphrase:
  117. print('Passphrase must not be blank')
  118. continue
  119. passphrase2 = getpass('Enter same passphrase again: ')
  120. if passphrase != passphrase2:
  121. print('Passphrases do not match')
  122. key.init(store, passphrase)
  123. if passphrase:
  124. print('Remember your passphrase. Your data will be inaccessible without it.')
  125. return key
  126. @classmethod
  127. def detect(cls, store, manifest_data):
  128. prompt = 'Enter passphrase for %s: ' % store._location.orig
  129. key = cls()
  130. passphrase = os.environ.get('DARC_PASSPHRASE')
  131. if passphrase is None:
  132. passphrase = getpass(prompt)
  133. while True:
  134. key.init(store, passphrase)
  135. try:
  136. key.decrypt(None, manifest_data)
  137. key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000))
  138. return key
  139. except IntegrityError:
  140. passphrase = getpass(prompt)
  141. def init(self, store, passphrase):
  142. self.init_from_random_data(pbkdf2_sha256(passphrase.encode('utf-8'), store.id, self.iterations, 100))
  143. self.init_ciphers()
  144. class KeyfileKey(AESKeyBase):
  145. FILE_ID = 'DARC KEY'
  146. TYPE = KEYFILE
  147. IV = PREFIX + long_to_bytes(1)
  148. @classmethod
  149. def detect(cls, store, manifest_data):
  150. key = cls()
  151. path = cls.find_key_file(store)
  152. prompt = 'Enter passphrase for key file %s: ' % path
  153. passphrase = os.environ.get('DARC_PASSPHRASE', '')
  154. while not key.load(path, passphrase):
  155. passphrase = getpass(prompt)
  156. key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000))
  157. return key
  158. @classmethod
  159. def find_key_file(cls, store):
  160. id = hexlify(store.id).decode('ascii')
  161. keys_dir = get_keys_dir()
  162. for name in os.listdir(keys_dir):
  163. filename = os.path.join(keys_dir, name)
  164. with open(filename, 'r') as fd:
  165. line = fd.readline().strip()
  166. if line and line.startswith(cls.FILE_ID) and line[9:] == id:
  167. return filename
  168. raise Exception('Key file for store with ID %s not found' % id)
  169. def load(self, filename, passphrase):
  170. with open(filename, 'r') as fd:
  171. cdata = a2b_base64(''.join(fd.readlines()[1:]).encode('ascii')) # .encode needed for Python 3.[0-2]
  172. data = self.decrypt_key_file(cdata, passphrase)
  173. if data:
  174. key = msgpack.unpackb(data)
  175. if key[b'version'] != 1:
  176. raise IntegrityError('Invalid key file header')
  177. self.store_id = key[b'store_id']
  178. self.enc_key = key[b'enc_key']
  179. self.enc_hmac_key = key[b'enc_hmac_key']
  180. self.id_key = key[b'id_key']
  181. self.chunk_seed = key[b'chunk_seed']
  182. self.path = filename
  183. return True
  184. def decrypt_key_file(self, data, passphrase):
  185. d = msgpack.unpackb(data)
  186. assert d[b'version'] == 1
  187. assert d[b'algorithm'] == b'SHA256'
  188. key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
  189. data = AES(key, self.IV).decrypt(d[b'data'])
  190. if HMAC(key, data, sha256).digest() != d[b'hash']:
  191. return None
  192. return data
  193. def encrypt_key_file(self, data, passphrase):
  194. salt = get_random_bytes(32)
  195. iterations = 10000
  196. key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
  197. hash = HMAC(key, data, sha256).digest()
  198. cdata = AES(key, self.IV).encrypt(data)
  199. # cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data)
  200. d = {
  201. 'version': 1,
  202. 'salt': salt,
  203. 'iterations': iterations,
  204. 'algorithm': 'SHA256',
  205. 'hash': hash,
  206. 'data': cdata,
  207. }
  208. return msgpack.packb(d)
  209. def save(self, path, passphrase):
  210. key = {
  211. 'version': 1,
  212. 'store_id': self.store_id,
  213. 'enc_key': self.enc_key,
  214. 'enc_hmac_key': self.enc_hmac_key,
  215. 'id_key': self.id_key,
  216. 'chunk_seed': self.chunk_seed,
  217. }
  218. data = self.encrypt_key_file(msgpack.packb(key), passphrase)
  219. with open(path, 'w') as fd:
  220. fd.write('%s %s\n' % (self.FILE_ID, hexlify(self.store_id).decode('ascii')))
  221. fd.write(b2a_base64(data).decode('ascii'))
  222. self.path = path
  223. def change_passphrase(self):
  224. passphrase, passphrase2 = 1, 2
  225. while passphrase != passphrase2:
  226. passphrase = getpass('New passphrase: ')
  227. passphrase2 = getpass('Enter same passphrase again: ')
  228. if passphrase != passphrase2:
  229. print('Passphrases do not match')
  230. self.save(self.path, passphrase)
  231. print('Key file "%s" updated' % self.path)
  232. @classmethod
  233. def create(cls, store, args):
  234. filename = args.store.to_key_filename()
  235. path = filename
  236. i = 1
  237. while os.path.exists(path):
  238. i += 1
  239. path = filename + '.%d' % i
  240. passphrase = os.environ.get('DARC_PASSPHRASE')
  241. if passphrase is not None:
  242. passphrase2 = passphrase
  243. else:
  244. passphrase, passphrase2 = 1, 2
  245. while passphrase != passphrase2:
  246. passphrase = getpass('Enter passphrase (empty for no passphrase):')
  247. passphrase2 = getpass('Enter same passphrase again: ')
  248. if passphrase != passphrase2:
  249. print('Passphrases do not match')
  250. key = cls()
  251. key.store_id = store.id
  252. key.init_from_random_data(get_random_bytes(100))
  253. key.init_ciphers()
  254. key.save(path, passphrase)
  255. print('Key file "%s" created.' % key.path)
  256. print('Keep this file safe. Your data will be inaccessible without it.')
  257. return key
  258. class KeyTestCase(unittest.TestCase):
  259. def setUp(self):
  260. self.tmppath = tempfile.mkdtemp()
  261. os.environ['DARC_KEYS_DIR'] = self.tmppath
  262. def tearDown(self):
  263. shutil.rmtree(self.tmppath)
  264. class MockStore(object):
  265. class _Location(object):
  266. orig = '/some/place'
  267. _location = _Location()
  268. id = b'\0' * 32
  269. def setUp(self):
  270. self.tmpdir = tempfile.mkdtemp()
  271. os.environ['DARC_KEYS_DIR'] = self.tmpdir
  272. def tearDown(self):
  273. shutil.rmtree(self.tmpdir)
  274. def test_plaintext(self):
  275. key = PlaintextKey.create(None, None)
  276. data = b'foo'
  277. self.assertEqual(hexlify(key.id_hash(data)), b'2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae')
  278. self.assertEqual(data, key.decrypt(key.id_hash(data), key.encrypt(data)))
  279. def test_keyfile(self):
  280. class MockArgs(object):
  281. store = Location(tempfile.mkstemp()[1])
  282. os.environ['DARC_PASSPHRASE'] = 'test'
  283. key = KeyfileKey.create(self.MockStore(), MockArgs())
  284. self.assertEqual(bytes_to_long(key.enc_cipher.iv, 8), 0)
  285. manifest = key.encrypt(b'')
  286. iv = key.extract_iv(manifest)
  287. key2 = KeyfileKey.detect(self.MockStore(), manifest)
  288. self.assertEqual(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000)
  289. # Key data sanity check
  290. self.assertEqual(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3)
  291. self.assertEqual(key2.chunk_seed == 0, False)
  292. data = b'foo'
  293. self.assertEqual(data, key2.decrypt(key.id_hash(data), key.encrypt(data)))
  294. def test_passphrase(self):
  295. os.environ['DARC_PASSPHRASE'] = 'test'
  296. key = PassphraseKey.create(self.MockStore(), None)
  297. self.assertEqual(bytes_to_long(key.enc_cipher.iv, 8), 0)
  298. self.assertEqual(hexlify(key.id_key), b'f28e915da78a972786da47fee6c4bd2960a421b9bdbdb35a7942eb82552e9a72')
  299. self.assertEqual(hexlify(key.enc_hmac_key), b'169c6082f209e524ea97e2c75318936f6e93c101b9345942a95491e9ae1738ca')
  300. self.assertEqual(hexlify(key.enc_key), b'c05dd423843d4dd32a52e4dc07bb11acabe215917fc5cf3a3df6c92b47af79ba')
  301. self.assertEqual(key.chunk_seed, -324662077)
  302. manifest = key.encrypt(b'')
  303. iv = key.extract_iv(manifest)
  304. key2 = PassphraseKey.detect(self.MockStore(), manifest)
  305. self.assertEqual(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000)
  306. self.assertEqual(key.id_key, key2.id_key)
  307. self.assertEqual(key.enc_hmac_key, key2.enc_hmac_key)
  308. self.assertEqual(key.enc_key, key2.enc_key)
  309. self.assertEqual(key.chunk_seed, key2.chunk_seed)
  310. data = b'foo'
  311. self.assertEqual(hexlify(key.id_hash(data)), b'016c27cd40dc8e84f196f3b43a9424e8472897e09f6935d0d3a82fb41664bad7')
  312. self.assertEqual(data, key2.decrypt(key2.id_hash(data), key.encrypt(data)))
  313. def suite():
  314. return unittest.TestLoader().loadTestsFromTestCase(KeyTestCase)
  315. if __name__ == '__main__':
  316. unittest.main()