소스 검색

archiver: add test for paperkey import, fix bug

Marian Beermann 8 년 전
부모
커밋
78f0e5d473
2개의 변경된 파일49개의 추가작업 그리고 3개의 파일을 삭제
  1. 1 0
      src/borg/crypto/keymanager.py
  2. 48 3
      src/borg/testsuite/archiver.py

+ 1 - 0
src/borg/crypto/keymanager.py

@@ -164,6 +164,7 @@ class KeyManager:
                         (id_lines, id_repoid, id_complete_checksum) = data.split('/')
                         (id_lines, id_repoid, id_complete_checksum) = data.split('/')
                     except ValueError:
                     except ValueError:
                         print("the id line must contain exactly three '/', try again")
                         print("the id line must contain exactly three '/', try again")
+                        continue
                     if sha256_truncated(data.lower().encode('ascii'), 2) != checksum:
                     if sha256_truncated(data.lower().encode('ascii'), 2) != checksum:
                         print('line checksum did not match, try same line again')
                         print('line checksum did not match, try same line again')
                         continue
                         continue

+ 48 - 3
src/borg/testsuite/archiver.py

@@ -61,7 +61,7 @@ from . import key
 src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
 src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
 
 
 
 
-def exec_cmd(*args, archiver=None, fork=False, exe=None, **kw):
+def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b'', **kw):
     if fork:
     if fork:
         try:
         try:
             if exe is None:
             if exe is None:
@@ -70,7 +70,7 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, **kw):
                 borg = (exe, )
                 borg = (exe, )
             elif not isinstance(exe, tuple):
             elif not isinstance(exe, tuple):
                 raise ValueError('exe must be None, a tuple or a str')
                 raise ValueError('exe must be None, a tuple or a str')
-            output = subprocess.check_output(borg + args, stderr=subprocess.STDOUT)
+            output = subprocess.check_output(borg + args, stderr=subprocess.STDOUT, input=input)
             ret = 0
             ret = 0
         except subprocess.CalledProcessError as e:
         except subprocess.CalledProcessError as e:
             output = e.output
             output = e.output
@@ -82,7 +82,7 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, **kw):
     else:
     else:
         stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
         stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
         try:
         try:
-            sys.stdin = StringIO()
+            sys.stdin = StringIO(input.decode())
             sys.stdout = sys.stderr = output = StringIO()
             sys.stdout = sys.stderr = output = StringIO()
             if archiver is None:
             if archiver is None:
                 archiver = Archiver()
                 archiver = Archiver()
@@ -2533,6 +2533,51 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
  2: 737475 - 88
  2: 737475 - 88
 """
 """
 
 
+    def test_key_import_paperkey(self):
+        repo_id = 'e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239'
+        self.cmd('init', self.repository_location, '--encryption', 'keyfile')
+        self._set_repository_id(self.repository_path, unhexlify(repo_id))
+
+        key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0]
+        with open(key_file, 'w') as fd:
+            fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n')
+            fd.write(b2a_base64(b'abcdefghijklmnopqrstu').decode())
+
+        typed_input = (
+            b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41  02\n'   # Forgot to type "-"
+            b'2 / e29442 3506da 4e1ea7  25f62a 5a3d41 - 02\n'   # Forgot to type second "/"
+            b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d42 - 02\n'  # Typo (..42 not ..41)
+            b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n'  # Correct! Congratulations
+            b'616263 646566 676869 6a6b6c 6d6e6f 707172 - 6d\n'
+            b'\n\n'  # Abort [yN] => N
+            b'737475 88\n'  # missing "-"
+            b'73747i - 88\n'  # typo
+            b'73747 - 88\n'  # missing nibble
+            b'73 74 75  -  89\n'  # line checksum mismatch
+            b'00a1 - 88\n'  # line hash collision - overall hash mismatch, have to start over
+
+            b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n'
+            b'616263 646566 676869 6a6b6c 6d6e6f 707172 - 6d\n'
+            b'73 74 75  -  88\n'
+        )
+
+        # In case that this has to change, here is a quick way to find a colliding line hash:
+        #
+        # from hashlib import sha256
+        # hash_fn = lambda x: sha256(b'\x00\x02' + x).hexdigest()[:2]
+        # for i in range(1000):
+        #     if hash_fn(i.to_bytes(2, byteorder='big')) == '88':  # 88 = line hash
+        #         print(i.to_bytes(2, 'big'))
+        #         break
+
+        self.cmd('key', 'import', '--paper', self.repository_location, input=typed_input)
+
+        # Test abort paths
+        typed_input = b'\ny\n'
+        self.cmd('key', 'import', '--paper', self.repository_location, input=typed_input)
+        typed_input = b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n\ny\n'
+        self.cmd('key', 'import', '--paper', self.repository_location, input=typed_input)
+
     def test_debug_dump_manifest(self):
     def test_debug_dump_manifest(self):
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('file1', size=1024 * 80)
         self.cmd('init', '--encryption=repokey', self.repository_location)
         self.cmd('init', '--encryption=repokey', self.repository_location)