key.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. from __future__ import with_statement
  2. from getpass import getpass
  3. import os
  4. import msgpack
  5. import shutil
  6. import tempfile
  7. import unittest
  8. import zlib
  9. from Crypto.Cipher import AES
  10. from Crypto.Hash import SHA256, HMAC
  11. from Crypto.Util import Counter
  12. from Crypto.Util.number import bytes_to_long, long_to_bytes
  13. from Crypto.Random import get_random_bytes
  14. from Crypto.Protocol.KDF import PBKDF2
  15. from .helpers import IntegrityError, get_keys_dir, Location
  16. PREFIX = '\0' * 8
  17. KEYFILE = '\0'
  18. PASSPHRASE = '\1'
  19. PLAINTEXT = '\2'
  20. def key_creator(store, args):
  21. if args.keyfile:
  22. return KeyfileKey.create(store, args)
  23. elif args.passphrase:
  24. return PassphraseKey.create(store, args)
  25. else:
  26. return PlaintextKey.create(store, args)
  27. def key_factory(store, manifest_data):
  28. if manifest_data[0] == KEYFILE:
  29. return KeyfileKey.detect(store, manifest_data)
  30. elif manifest_data[0] == PASSPHRASE:
  31. return PassphraseKey.detect(store, manifest_data)
  32. elif manifest_data[0] == PLAINTEXT:
  33. return PlaintextKey.detect(store, manifest_data)
  34. else:
  35. raise Exception('Unkown Key type %d' % ord(manifest_data[0]))
  36. def SHA256_PDF(p, s):
  37. return HMAC.new(p, s, SHA256).digest()
  38. class KeyBase(object):
  39. def id_hash(self, data):
  40. """Return HMAC hash using the "id" HMAC key
  41. """
  42. def encrypt(self, data):
  43. pass
  44. def decrypt(self, id, data):
  45. pass
  46. class PlaintextKey(KeyBase):
  47. TYPE = PLAINTEXT
  48. chunk_seed = 0
  49. @classmethod
  50. def create(cls, store, args):
  51. print 'Encryption NOT enabled.\nUse the --key-file or --passphrase options to enable encryption.'
  52. return cls()
  53. @classmethod
  54. def detect(cls, store, manifest_data):
  55. return cls()
  56. def id_hash(self, data):
  57. return SHA256.new(data).digest()
  58. def encrypt(self, data):
  59. return ''.join([self.TYPE, zlib.compress(data)])
  60. def decrypt(self, id, data):
  61. if data[0] != self.TYPE:
  62. raise IntegrityError('Invalid encryption envelope')
  63. data = zlib.decompress(data[1:])
  64. if id and SHA256.new(data).digest() != id:
  65. raise IntegrityError('Chunk id verification failed')
  66. return data
  67. class AESKeyBase(KeyBase):
  68. def id_hash(self, data):
  69. """Return HMAC hash using the "id" HMAC key
  70. """
  71. return HMAC.new(self.id_key, data, SHA256).digest()
  72. def encrypt(self, data):
  73. data = zlib.compress(data)
  74. nonce = long_to_bytes(self.counter.next_value(), 8)
  75. data = ''.join((nonce, AES.new(self.enc_key, AES.MODE_CTR, '',
  76. counter=self.counter).encrypt(data)))
  77. hash = HMAC.new(self.enc_hmac_key, data, SHA256).digest()
  78. return ''.join((self.TYPE, hash, data))
  79. def decrypt(self, id, data):
  80. if data[0] != self.TYPE:
  81. raise IntegrityError('Invalid encryption envelope')
  82. hash = data[1:33]
  83. if HMAC.new(self.enc_hmac_key, data[33:], SHA256).digest() != hash:
  84. raise IntegrityError('Encryption envelope checksum mismatch')
  85. nonce = bytes_to_long(data[33:41])
  86. counter = Counter.new(64, initial_value=nonce, prefix=PREFIX)
  87. data = zlib.decompress(AES.new(self.enc_key, AES.MODE_CTR, counter=counter).decrypt(data[41:]))
  88. if id and HMAC.new(self.id_key, data, SHA256).digest() != id:
  89. raise IntegrityError('Chunk id verification failed')
  90. return data
  91. def extract_iv(self, payload):
  92. if payload[0] != self.TYPE:
  93. raise IntegrityError('Invalid encryption envelope')
  94. nonce = bytes_to_long(payload[33:41])
  95. return nonce
  96. def init_from_random_data(self, data):
  97. self.enc_key = data[0:32]
  98. self.enc_hmac_key = data[32:64]
  99. self.id_key = data[64:96]
  100. self.chunk_seed = bytes_to_long(data[96:100])
  101. # Convert to signed int32
  102. if self.chunk_seed & 0x80000000:
  103. self.chunk_seed = self.chunk_seed - 0xffffffff - 1
  104. self.counter = Counter.new(64, initial_value=1, prefix=PREFIX)
  105. class PassphraseKey(AESKeyBase):
  106. TYPE = PASSPHRASE
  107. iterations = 10000
  108. @classmethod
  109. def create(cls, store, args):
  110. key = cls()
  111. passphrase = os.environ.get('DARC_PASSPHRASE')
  112. if passphrase is not None:
  113. passphrase2 = passphrase
  114. else:
  115. passphrase, passphrase2 = 1, 2
  116. while passphrase != passphrase2:
  117. passphrase = getpass('Enter passphrase: ')
  118. if not passphrase:
  119. print 'Passphrase must not be blank'
  120. continue
  121. passphrase2 = getpass('Enter same passphrase again: ')
  122. if passphrase != passphrase2:
  123. print 'Passphrases do not match'
  124. key.init(store, passphrase)
  125. if passphrase:
  126. print 'Remember your passphrase. Your data will be inaccessible without it.'
  127. return key
  128. @classmethod
  129. def detect(cls, store, manifest_data):
  130. prompt = 'Enter passphrase for %s: ' % store._location.orig
  131. key = cls()
  132. passphrase = os.environ.get('DARC_PASSPHRASE')
  133. if passphrase is None:
  134. passphrase = getpass(prompt)
  135. while True:
  136. key.init(store, passphrase)
  137. try:
  138. key.decrypt(None, manifest_data)
  139. iv = key.extract_iv(manifest_data)
  140. key.counter = Counter.new(64, initial_value=iv + 1000, prefix=PREFIX)
  141. return key
  142. except IntegrityError:
  143. passphrase = getpass(prompt)
  144. def init(self, store, passphrase):
  145. self.init_from_random_data(PBKDF2(passphrase, store.id, 100, self.iterations, SHA256_PDF))
  146. class KeyfileKey(AESKeyBase):
  147. FILE_ID = 'DARC KEY'
  148. TYPE = KEYFILE
  149. @classmethod
  150. def detect(cls, store, manifest_data):
  151. key = cls()
  152. path = cls.find_key_file(store)
  153. prompt = 'Enter passphrase for key file %s: ' % path
  154. passphrase = os.environ.get('DARC_PASSPHRASE', '')
  155. while not key.load(path, passphrase):
  156. passphrase = getpass(prompt)
  157. iv = key.extract_iv(manifest_data)
  158. key.counter = Counter.new(64, initial_value=iv + 1000, prefix=PREFIX)
  159. return key
  160. @classmethod
  161. def find_key_file(cls, store):
  162. id = store.id.encode('hex')
  163. keys_dir = get_keys_dir()
  164. for name in os.listdir(keys_dir):
  165. filename = os.path.join(keys_dir, name)
  166. with open(filename, 'rb') as fd:
  167. line = fd.readline().strip()
  168. if line and line.startswith(cls.FILE_ID) and line[9:] == id:
  169. return filename
  170. raise Exception('Key file for store with ID %s not found' % id)
  171. def load(self, filename, passphrase):
  172. with open(filename, 'rb') as fd:
  173. cdata = (''.join(fd.readlines()[1:])).decode('base64')
  174. data = self.decrypt_key_file(cdata, passphrase)
  175. if data:
  176. key = msgpack.unpackb(data)
  177. if key['version'] != 1:
  178. raise IntegrityError('Invalid key file header')
  179. self.store_id = key['store_id']
  180. self.enc_key = key['enc_key']
  181. self.enc_hmac_key = key['enc_hmac_key']
  182. self.id_key = key['id_key']
  183. self.chunk_seed = key['chunk_seed']
  184. self.counter = Counter.new(64, initial_value=1, prefix=PREFIX)
  185. self.path = filename
  186. return True
  187. def decrypt_key_file(self, data, passphrase):
  188. d = msgpack.unpackb(data)
  189. assert d['version'] == 1
  190. assert d['algorithm'] == 'SHA256'
  191. key = PBKDF2(passphrase, d['salt'], 32, d['iterations'], SHA256_PDF)
  192. data = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).decrypt(d['data'])
  193. if HMAC.new(key, data, SHA256).digest() != d['hash']:
  194. return None
  195. return data
  196. def encrypt_key_file(self, data, passphrase):
  197. salt = get_random_bytes(32)
  198. iterations = 10000
  199. key = PBKDF2(passphrase, salt, 32, iterations, SHA256_PDF)
  200. hash = HMAC.new(key, data, SHA256).digest()
  201. cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data)
  202. d = {
  203. 'version': 1,
  204. 'salt': salt,
  205. 'iterations': iterations,
  206. 'algorithm': 'SHA256',
  207. 'hash': hash,
  208. 'data': cdata,
  209. }
  210. return msgpack.packb(d)
  211. def save(self, path, passphrase):
  212. key = {
  213. 'version': 1,
  214. 'store_id': self.store_id,
  215. 'enc_key': self.enc_key,
  216. 'enc_hmac_key': self.enc_hmac_key,
  217. 'id_key': self.id_key,
  218. 'chunk_seed': self.chunk_seed,
  219. }
  220. data = self.encrypt_key_file(msgpack.packb(key), passphrase)
  221. with open(path, 'wb') as fd:
  222. fd.write('%s %s\n' % (self.FILE_ID, self.store_id.encode('hex')))
  223. fd.write(data.encode('base64'))
  224. self.path = path
  225. def change_passphrase(self):
  226. passphrase, passphrase2 = 1, 2
  227. while passphrase != passphrase2:
  228. passphrase = getpass('New passphrase: ')
  229. passphrase2 = getpass('Enter same passphrase again: ')
  230. if passphrase != passphrase2:
  231. print 'Passphrases do not match'
  232. self.save(self.path, passphrase)
  233. print 'Key file "%s" updated' % self.path
  234. @classmethod
  235. def create(cls, store, args):
  236. filename = args.store.to_key_filename()
  237. path = filename
  238. i = 1
  239. while os.path.exists(path):
  240. i += 1
  241. path = filename + '.%d' % i
  242. passphrase = os.environ.get('DARC_PASSPHRASE')
  243. if passphrase is not None:
  244. passphrase2 = passphrase
  245. else:
  246. passphrase, passphrase2 = 1, 2
  247. while passphrase != passphrase2:
  248. passphrase = getpass('Enter passphrase (empty for no passphrase):')
  249. passphrase2 = getpass('Enter same passphrase again: ')
  250. if passphrase != passphrase2:
  251. print 'Passphrases do not match'
  252. key = cls()
  253. key.store_id = store.id
  254. key.init_from_random_data(get_random_bytes(100))
  255. key.save(path, passphrase)
  256. print 'Key file "%s" created.' % key.path
  257. print 'Keep this file safe. Your data will be inaccessible without it.'
  258. return key
  259. class KeyTestCase(unittest.TestCase):
  260. def setUp(self):
  261. self.tmppath = tempfile.mkdtemp()
  262. os.environ['DARC_KEYS_DIR'] = self.tmppath
  263. def tearDown(self):
  264. shutil.rmtree(self.tmppath)
  265. class MockStore(object):
  266. class _Location(object):
  267. orig = '/some/place'
  268. _location = _Location()
  269. id = '\0' * 32
  270. def setUp(self):
  271. self.tmpdir = tempfile.mkdtemp()
  272. os.environ['DARC_KEYS_DIR'] = self.tmpdir
  273. def tearDown(self):
  274. shutil.rmtree(self.tmpdir)
  275. def test_plaintext(self):
  276. key = PlaintextKey.create(None, None)
  277. data = 'foo'
  278. self.assertEqual(key.id_hash(data).encode('hex'), '2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae')
  279. self.assertEqual(data, key.decrypt(key.id_hash(data), key.encrypt(data)))
  280. def test_keyfile(self):
  281. class MockArgs(object):
  282. store = Location(tempfile.mkstemp()[1])
  283. os.environ['DARC_PASSPHRASE'] = 'test'
  284. key = KeyfileKey.create(self.MockStore(), MockArgs())
  285. self.assertEqual(bytes_to_long(key.counter()), 1)
  286. manifest = key.encrypt('')
  287. iv = key.extract_iv(manifest)
  288. key2 = KeyfileKey.detect(self.MockStore(), manifest)
  289. self.assertEqual(bytes_to_long(key2.counter()), iv + 1000)
  290. # Key data sanity check
  291. self.assertEqual(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3)
  292. self.assertEqual(key2.chunk_seed == 0, False)
  293. data = 'foo'
  294. self.assertEqual(data, key2.decrypt(key.id_hash(data), key.encrypt(data)))
  295. def test_passphrase(self):
  296. os.environ['DARC_PASSPHRASE'] = 'test'
  297. key = PassphraseKey.create(self.MockStore(), None)
  298. self.assertEqual(bytes_to_long(key.counter()), 1)
  299. self.assertEqual(key.id_key.encode('hex'), 'f28e915da78a972786da47fee6c4bd2960a421b9bdbdb35a7942eb82552e9a72')
  300. self.assertEqual(key.enc_hmac_key.encode('hex'), '169c6082f209e524ea97e2c75318936f6e93c101b9345942a95491e9ae1738ca')
  301. self.assertEqual(key.enc_key.encode('hex'), 'c05dd423843d4dd32a52e4dc07bb11acabe215917fc5cf3a3df6c92b47af79ba')
  302. self.assertEqual(key.chunk_seed, -324662077)
  303. manifest = key.encrypt('')
  304. iv = key.extract_iv(manifest)
  305. key2 = PassphraseKey.detect(self.MockStore(), manifest)
  306. self.assertEqual(bytes_to_long(key2.counter()), iv + 1000)
  307. self.assertEqual(key.id_key, key2.id_key)
  308. self.assertEqual(key.enc_hmac_key, key2.enc_hmac_key)
  309. self.assertEqual(key.enc_key, key2.enc_key)
  310. self.assertEqual(key.chunk_seed, key2.chunk_seed)
  311. data = 'foo'
  312. self.assertEqual(key.id_hash(data).encode('hex'), '016c27cd40dc8e84f196f3b43a9424e8472897e09f6935d0d3a82fb41664bad7')
  313. self.assertEqual(data, key2.decrypt(key2.id_hash(data), key.encrypt(data)))
  314. def suite():
  315. return unittest.TestLoader().loadTestsFromTestCase(KeyTestCase)
  316. if __name__ == '__main__':
  317. unittest.main()