瀏覽代碼

Merge pull request #6880 from ThomasWaldmann/new-crypto-no-assert-id-borg2

new crypto does not need to call ._assert_id()
TW 3 年之前
父節點
當前提交
320190dacf
共有 4 個文件被更改,包括 34 次插入6 次删除
  1. 0 2
      docs/internals/security.rst
  2. 9 3
      src/borg/archive.py
  3. 4 1
      src/borg/crypto/key.py
  4. 21 0
      src/borg/testsuite/key.py

+ 0 - 2
docs/internals/security.rst

@@ -180,8 +180,6 @@ Decryption::
 
     decompressed = decompress(decrypted)
 
-    ASSERT( CONSTANT-TIME-COMPARISON( chunk-id, MAC(id_key, decompressed) ) )
-
 Notable:
 
 - More modern and often faster AEAD ciphers instead of self-assembled stuff.

+ 9 - 3
src/borg/archive.py

@@ -21,7 +21,7 @@ logger = create_logger()
 from . import xattr
 from .chunker import get_chunker, Chunk
 from .cache import ChunkListEntry
-from .crypto.key import key_factory
+from .crypto.key import key_factory, AEADKeyBase
 from .compress import Compressor, CompressionSpec
 from .constants import *  # NOQA
 from .crypto.low_level import IntegrityError as IntegrityErrorBase
@@ -1684,6 +1684,12 @@ class ArchiveChecker:
         chunks_count_index = len(self.chunks)
         chunks_count_segments = 0
         errors = 0
+        # for the new crypto, derived from AEADKeyBase, we know that it checks authenticity on
+        # the crypto.low_level level - invalid chunks will fail to AEAD authenticate.
+        # for these key types, we know that there is no need to decompress the data afterwards.
+        # for all other modes, we assume that we must decompress, so we can verify authenticity
+        # based on the plaintext MAC (via calling ._assert_id(id, plaintext)).
+        decompress = not isinstance(self.key, AEADKeyBase)
         defect_chunks = []
         pi = ProgressIndicatorPercent(
             total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01, msgid="check.verify_data"
@@ -1714,7 +1720,7 @@ class ArchiveChecker:
                         chunk_data_iter = self.repository.get_many(chunk_ids)
                 else:
                     try:
-                        self.key.decrypt(chunk_id, encrypted_data)
+                        self.key.decrypt(chunk_id, encrypted_data, decompress=decompress)
                     except IntegrityErrorBase as integrity_error:
                         self.error_found = True
                         errors += 1
@@ -1745,7 +1751,7 @@ class ArchiveChecker:
                     # from the underlying media.
                     try:
                         encrypted_data = self.repository.get(defect_chunk)
-                        self.key.decrypt(defect_chunk, encrypted_data)
+                        self.key.decrypt(defect_chunk, encrypted_data, decompress=decompress)
                     except IntegrityErrorBase:
                         # failed twice -> get rid of this chunk
                         del self.chunks[defect_chunk]

+ 4 - 1
src/borg/crypto/key.py

@@ -871,7 +871,10 @@ class AEADKeyBase(KeyBase):
         if not decompress:
             return payload
         data = self.decompress(memoryview(payload))
-        self.assert_id(id, data)
+        # note: calling self.assert_id(id, data) is not needed any more for the new AEAD crypto.
+        # we put the id into AAD when storing the chunk, so it gets into the authentication tag computation.
+        # when decrypting, we provide the id we **want** as AAD for the auth tag verification, so
+        # decrypting only succeeds if we got the ciphertext we wrote **for that chunk id**.
         return data
 
     def init_from_given_data(self, *, enc_key, enc_hmac_key, id_key, chunk_seed):

+ 21 - 0
src/borg/testsuite/key.py

@@ -256,6 +256,27 @@ class TestKey:
         with pytest.raises(IntegrityError):
             key.assert_id(id, plaintext_changed)
 
+    def test_getting_wrong_chunk_fails(self, key):
+        # for the new AEAD crypto, we provide the chunk id as AAD when encrypting/authenticating,
+        # we provide the id **we want** as AAD when authenticating/decrypting the data we got from the repo.
+        # only if the id used for encrypting matches the id we want, the AEAD crypto authentication will succeed.
+        # thus, there is no need any more for calling self._assert_id() for the new crypto.
+        # the old crypto as well as plaintext and authenticated modes still need to call self._assert_id().
+        plaintext_wanted = b"123456789"
+        id_wanted = key.id_hash(plaintext_wanted)
+        ciphertext_wanted = key.encrypt(id_wanted, plaintext_wanted)
+        plaintext_other = b"xxxxxxxxx"
+        id_other = key.id_hash(plaintext_other)
+        ciphertext_other = key.encrypt(id_other, plaintext_other)
+        # both ciphertexts are authentic and decrypting them should succeed:
+        key.decrypt(id_wanted, ciphertext_wanted)
+        key.decrypt(id_other, ciphertext_other)
+        # but if we wanted the one and got the other, it must fail.
+        # the new crypto will fail due to AEAD auth failure,
+        # the old crypto and plaintext, authenticated modes will fail due to ._assert_id() check failing:
+        with pytest.raises(IntegrityErrorBase):
+            key.decrypt(id_wanted, ciphertext_other)
+
     def test_authenticated_encrypt(self, monkeypatch):
         monkeypatch.setenv("BORG_PASSPHRASE", "test")
         key = AuthenticatedKey.create(self.MockRepository(), self.MockArgs())