|
@@ -8,6 +8,7 @@ from ...archive import ChunkBuffer
|
|
|
from ...constants import * # NOQA
|
|
|
from ...helpers import bin_to_hex, msgpack
|
|
|
from ...manifest import Manifest
|
|
|
+from ...remote3 import RemoteRepository3
|
|
|
from ...repository3 import Repository3
|
|
|
from ..repository3 import fchunk
|
|
|
from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
|
|
@@ -192,11 +193,12 @@ def test_missing_manifest(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|
|
|
archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
- if isinstance(repository, Repository3):
|
|
|
- pytest.skip("Test not adapted to Repository3")
|
|
|
with repository:
|
|
|
- repository.delete(Manifest.MANIFEST_ID)
|
|
|
- repository.commit(compact=False)
|
|
|
+ if isinstance(repository, (Repository3, RemoteRepository3)):
|
|
|
+ repository.store_delete("config/manifest")
|
|
|
+ else:
|
|
|
+ repository.delete(Manifest.MANIFEST_ID)
|
|
|
+ repository.commit(compact=False)
|
|
|
cmd(archiver, "check", exit_code=1)
|
|
|
output = cmd(archiver, "check", "-v", "--repair", exit_code=0)
|
|
|
assert "archive1" in output
|
|
@@ -208,12 +210,10 @@ def test_corrupted_manifest(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|
|
|
archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
- if isinstance(repository, Repository3):
|
|
|
- pytest.skip("Test not adapted to Repository3")
|
|
|
with repository:
|
|
|
- manifest = repository.get(Manifest.MANIFEST_ID)
|
|
|
+ manifest = repository.get_manifest()
|
|
|
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
|
|
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
|
|
|
+ repository.put_manifest(corrupted_manifest)
|
|
|
repository.commit(compact=False)
|
|
|
cmd(archiver, "check", exit_code=1)
|
|
|
output = cmd(archiver, "check", "-v", "--repair", exit_code=0)
|
|
@@ -226,8 +226,6 @@ def test_spoofed_manifest(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|
|
|
archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
- if isinstance(repository, Repository3):
|
|
|
- pytest.skip("Test not adapted to Repository3")
|
|
|
with repository:
|
|
|
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
|
|
|
cdata = manifest.repo_objs.format(
|
|
@@ -247,7 +245,7 @@ def test_spoofed_manifest(archivers, request):
|
|
|
)
|
|
|
# maybe a repo-side attacker could manage to move the fake manifest file chunk over to the manifest ID.
|
|
|
# we simulate this here by directly writing the fake manifest data to the manifest ID.
|
|
|
- repository.put(Manifest.MANIFEST_ID, cdata)
|
|
|
+ repository.put_manifest(cdata)
|
|
|
repository.commit(compact=False)
|
|
|
# borg should notice that the manifest has the wrong ro_type.
|
|
|
cmd(archiver, "check", exit_code=1)
|
|
@@ -262,12 +260,10 @@ def test_manifest_rebuild_corrupted_chunk(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|
|
|
archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
- if isinstance(repository, Repository3):
|
|
|
- pytest.skip("Test not adapted to Repository3")
|
|
|
with repository:
|
|
|
- manifest = repository.get(Manifest.MANIFEST_ID)
|
|
|
+ manifest = repository.get_manifest()
|
|
|
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
|
|
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
|
|
|
+ repository.put_manifest(corrupted_manifest)
|
|
|
chunk = repository.get(archive.id)
|
|
|
corrupted_chunk = chunk + b"corrupted!"
|
|
|
repository.put(archive.id, corrupted_chunk)
|
|
@@ -282,13 +278,11 @@ def test_manifest_rebuild_duplicate_archive(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|
|
|
archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
- if isinstance(repository, Repository3):
|
|
|
- pytest.skip("Test not adapted to Repository3")
|
|
|
repo_objs = archive.repo_objs
|
|
|
with repository:
|
|
|
- manifest = repository.get(Manifest.MANIFEST_ID)
|
|
|
+ manifest = repository.get_manifest()
|
|
|
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
|
|
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
|
|
|
+ repository.put_manifest(corrupted_manifest)
|
|
|
archive_dict = {
|
|
|
"command_line": "",
|
|
|
"item_ptrs": [],
|
|
@@ -314,14 +308,12 @@ def test_spoofed_archive(archivers, request):
|
|
|
archiver = request.getfixturevalue(archivers)
|
|
|
check_cmd_setup(archiver)
|
|
|
archive, repository = open_archive(archiver.repository_path, "archive1")
|
|
|
- if isinstance(repository, Repository3):
|
|
|
- pytest.skip("Test not adapted to Repository3")
|
|
|
repo_objs = archive.repo_objs
|
|
|
with repository:
|
|
|
# attacker would corrupt or delete the manifest to trigger a rebuild of it:
|
|
|
- manifest = repository.get(Manifest.MANIFEST_ID)
|
|
|
+ manifest = repository.get_manifest()
|
|
|
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
|
|
|
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
|
|
|
+ repository.put_manifest(corrupted_manifest)
|
|
|
archive_dict = {
|
|
|
"command_line": "",
|
|
|
"item_ptrs": [],
|