|
@@ -1,3 +1,4 @@
|
|
|
+from datetime import datetime, timezone, timedelta
|
|
|
import shutil
|
|
|
from unittest.mock import patch
|
|
|
|
|
@@ -5,7 +6,7 @@ import pytest
|
|
|
|
|
|
from ...archive import ChunkBuffer
|
|
|
from ...constants import * # NOQA
|
|
|
-from ...helpers import bin_to_hex
|
|
|
+from ...helpers import bin_to_hex, msgpack
|
|
|
from ...manifest import Manifest
|
|
|
from ...repository import Repository
|
|
|
from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
|
|
@@ -205,6 +206,40 @@ def test_corrupted_manifest(archivers, request):
|
|
|
cmd(archiver, "check", exit_code=0)
|
|
|
|
|
|
|
|
|
+def test_spoofed_manifest(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ check_cmd_setup(archiver)
|
|
|
+ archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
+ with repository:
|
|
|
+ manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
|
|
|
+ cdata = manifest.repo_objs.format(
|
|
|
+ Manifest.MANIFEST_ID,
|
|
|
+ {},
|
|
|
+ msgpack.packb(
|
|
|
+ {
|
|
|
+ "version": 1,
|
|
|
+ "archives": {},
|
|
|
+ "config": {},
|
|
|
+ "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"),
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ # we assume that an attacker can put a file into backup src files that contains a fake manifest.
|
|
|
+ # but, the attacker can not influence the ro_type borg will use to store user file data:
|
|
|
+ ro_type=ROBJ_FILE_STREAM, # a real manifest is stored with ROBJ_MANIFEST
|
|
|
+ )
|
|
|
+ # maybe a repo-side attacker could manage to move the fake manifest file chunk over to the manifest ID.
|
|
|
+ # we simulate this here by directly writing the fake manifest data to the manifest ID.
|
|
|
+ repository.put(Manifest.MANIFEST_ID, cdata)
|
|
|
+ repository.commit(compact=False)
|
|
|
+ # borg should notice that the manifest has the wrong ro_type.
|
|
|
+ cmd(archiver, "check", exit_code=1)
|
|
|
+ # borg check --repair should remove the corrupted manifest and rebuild a new one.
|
|
|
+ output = cmd(archiver, "check", "-v", "--repair", exit_code=0)
|
|
|
+ assert "archive1" in output
|
|
|
+ assert "archive2" in output
|
|
|
+ cmd(archiver, "check", exit_code=0)
|
|
|
+
|
|
|
+
|
|
|
def test_manifest_rebuild_corrupted_chunk(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|