|
@@ -1,118 +1,130 @@
|
|
|
import json
|
|
|
import os
|
|
|
-import unittest
|
|
|
|
|
|
from ...constants import * # NOQA
|
|
|
-from . import (
|
|
|
- ArchiverTestCaseBase,
|
|
|
- RemoteArchiverTestCaseBase,
|
|
|
- ArchiverTestCaseBinaryBase,
|
|
|
- src_dir,
|
|
|
- RK_ENCRYPTION,
|
|
|
- checkts,
|
|
|
- BORG_EXES,
|
|
|
-)
|
|
|
-
|
|
|
-
|
|
|
-class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
- def test_rlist_glob(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test-1", src_dir)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "something-else-than-test-1", src_dir)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test-2", src_dir)
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "--match-archives=sh:test-*")
|
|
|
- self.assert_in("test-1", output)
|
|
|
- self.assert_in("test-2", output)
|
|
|
- self.assert_not_in("something-else", output)
|
|
|
-
|
|
|
- def test_archives_format(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "--comment", "comment 1", "test-1", src_dir)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "--comment", "comment 2", "test-2", src_dir)
|
|
|
- output_1 = self.cmd(f"--repo={self.repository_location}", "rlist")
|
|
|
- output_2 = self.cmd(
|
|
|
- f"--repo={self.repository_location}", "rlist", "--format", "{archive:<36} {time} [{id}]{NL}"
|
|
|
- )
|
|
|
- self.assertEqual(output_1, output_2)
|
|
|
- output_1 = self.cmd(f"--repo={self.repository_location}", "rlist", "--short")
|
|
|
- self.assertEqual(output_1, "test-1" + os.linesep + "test-2" + os.linesep)
|
|
|
- output_3 = self.cmd(f"--repo={self.repository_location}", "rlist", "--format", "{name} {comment}{NL}")
|
|
|
- self.assert_in("test-1 comment 1" + os.linesep, output_3)
|
|
|
- self.assert_in("test-2 comment 2" + os.linesep, output_3)
|
|
|
-
|
|
|
- def test_size_nfiles(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_regular_file("file1", size=123000)
|
|
|
- self.create_regular_file("file2", size=456)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test", "input/file1", "input/file2")
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "list", "test")
|
|
|
- print(output)
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "--format", "{name} {nfiles} {size}")
|
|
|
- o_t = output.split()
|
|
|
- assert o_t[0] == "test"
|
|
|
- assert int(o_t[1]) == 2
|
|
|
- assert 123456 <= int(o_t[2]) < 123999 # there is some metadata overhead
|
|
|
-
|
|
|
- def test_date_matching(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- earliest_ts = "2022-11-20T23:59:59"
|
|
|
- ts_in_between = "2022-12-18T23:59:59"
|
|
|
- self.create_src_archive("archive1", ts=earliest_ts)
|
|
|
- self.create_src_archive("archive2", ts=ts_in_between)
|
|
|
- self.create_src_archive("archive3")
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "-v", "--oldest=23e", exit_code=2)
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "-v", "--oldest=1m", exit_code=0)
|
|
|
- self.assert_in("archive1", output)
|
|
|
- self.assert_in("archive2", output)
|
|
|
- self.assert_not_in("archive3", output)
|
|
|
-
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "-v", "--newest=1m", exit_code=0)
|
|
|
- self.assert_in("archive3", output)
|
|
|
- self.assert_not_in("archive2", output)
|
|
|
- self.assert_not_in("archive1", output)
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "-v", "--newer=1d", exit_code=0)
|
|
|
- self.assert_in("archive3", output)
|
|
|
- self.assert_not_in("archive1", output)
|
|
|
- self.assert_not_in("archive2", output)
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "-v", "--older=1d", exit_code=0)
|
|
|
- self.assert_in("archive1", output)
|
|
|
- self.assert_in("archive2", output)
|
|
|
- self.assert_not_in("archive3", output)
|
|
|
-
|
|
|
- def test_rlist_consider_checkpoints(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test1", src_dir)
|
|
|
- # these are not really a checkpoints, but they look like some:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test2.checkpoint", src_dir)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test3.checkpoint.1", src_dir)
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist")
|
|
|
- assert "test1" in output
|
|
|
- assert "test2.checkpoint" not in output
|
|
|
- assert "test3.checkpoint.1" not in output
|
|
|
- output = self.cmd(f"--repo={self.repository_location}", "rlist", "--consider-checkpoints")
|
|
|
- assert "test1" in output
|
|
|
- assert "test2.checkpoint" in output
|
|
|
- assert "test3.checkpoint.1" in output
|
|
|
-
|
|
|
- def test_rlist_json(self):
|
|
|
- self.create_regular_file("file1", size=1024 * 80)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test", "input")
|
|
|
-
|
|
|
- list_repo = json.loads(self.cmd(f"--repo={self.repository_location}", "rlist", "--json"))
|
|
|
- repository = list_repo["repository"]
|
|
|
- assert len(repository["id"]) == 64
|
|
|
- checkts(repository["last_modified"])
|
|
|
- assert list_repo["encryption"]["mode"] == RK_ENCRYPTION[13:]
|
|
|
- assert "keyfile" not in list_repo["encryption"]
|
|
|
- archive0 = list_repo["archives"][0]
|
|
|
- checkts(archive0["time"])
|
|
|
-
|
|
|
-
|
|
|
-class RemoteArchiverTestCase(RemoteArchiverTestCaseBase, ArchiverTestCase):
|
|
|
- """run the same tests, but with a remote repository"""
|
|
|
-
|
|
|
-
|
|
|
-@unittest.skipUnless("binary" in BORG_EXES, "no borg.exe available")
|
|
|
-class ArchiverTestCaseBinary(ArchiverTestCaseBinaryBase, ArchiverTestCase):
|
|
|
- """runs the same tests, but via the borg binary"""
|
|
|
+from . import cmd, checkts, create_src_archive, create_regular_file, src_dir, RK_ENCRYPTION
|
|
|
+
|
|
|
+
|
|
|
+def pytest_generate_tests(metafunc):
|
|
|
+ # Generate tests for different scenarios: local repository, remote repository, and using the borg binary.
|
|
|
+ if "archivers" in metafunc.fixturenames:
|
|
|
+ metafunc.parametrize("archivers", ["archiver", "remote_archiver", "binary_archiver"])
|
|
|
+
|
|
|
+
|
|
|
+def test_rlist_glob(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ repo_location = archiver.repository_location
|
|
|
+
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test-1", src_dir)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "something-else-than-test-1", src_dir)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test-2", src_dir)
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "--match-archives=sh:test-*")
|
|
|
+ assert "test-1" in output
|
|
|
+ assert "test-2" in output
|
|
|
+ assert "something-else" not in output
|
|
|
+
|
|
|
+
|
|
|
+def test_archives_format(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ repo_location = archiver.repository_location
|
|
|
+
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "--comment", "comment 1", "test-1", src_dir)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "--comment", "comment 2", "test-2", src_dir)
|
|
|
+ output_1 = cmd(archiver, f"--repo={repo_location}", "rlist")
|
|
|
+ output_2 = cmd(archiver, f"--repo={repo_location}", "rlist", "--format", "{archive:<36} {time} [{id}]{NL}")
|
|
|
+ assert output_1 == output_2
|
|
|
+ output_1 = cmd(archiver, f"--repo={repo_location}", "rlist", "--short")
|
|
|
+ assert output_1 == "test-1" + os.linesep + "test-2" + os.linesep
|
|
|
+ output_3 = cmd(archiver, f"--repo={repo_location}", "rlist", "--format", "{name} {comment}{NL}")
|
|
|
+ assert "test-1 comment 1" + os.linesep in output_3
|
|
|
+ assert "test-2 comment 2" + os.linesep in output_3
|
|
|
+
|
|
|
+
|
|
|
+def test_size_nfiles(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ repo_location, input_path = archiver.repository_location, archiver.input_path
|
|
|
+
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
|
|
+ create_regular_file(input_path, "file1", size=123000)
|
|
|
+ create_regular_file(input_path, "file2", size=456)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test", "input/file1", "input/file2")
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "list", "test")
|
|
|
+ print(output)
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "--format", "{name} {nfiles} {size}")
|
|
|
+ o_t = output.split()
|
|
|
+ assert o_t[0] == "test"
|
|
|
+ assert int(o_t[1]) == 2
|
|
|
+ assert 123456 <= int(o_t[2]) < 123999 # there is some metadata overhead
|
|
|
+
|
|
|
+
|
|
|
+def test_date_matching(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ repo_location = archiver.repository_location
|
|
|
+
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
|
|
+ earliest_ts = "2022-11-20T23:59:59"
|
|
|
+ ts_in_between = "2022-12-18T23:59:59"
|
|
|
+ create_src_archive(archiver, "archive1", ts=earliest_ts)
|
|
|
+ create_src_archive(archiver, "archive2", ts=ts_in_between)
|
|
|
+ create_src_archive(archiver, "archive3")
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rlist", "-v", "--oldest=23e", exit_code=2)
|
|
|
+
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "-v", "--oldest=1m", exit_code=0)
|
|
|
+ assert "archive1" in output
|
|
|
+ assert "archive2" in output
|
|
|
+ assert "archive3" not in output
|
|
|
+
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "-v", "--newest=1m", exit_code=0)
|
|
|
+ assert "archive3" in output
|
|
|
+ assert "archive2" not in output
|
|
|
+ assert "archive1" not in output
|
|
|
+
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "-v", "--newer=1d", exit_code=0)
|
|
|
+ assert "archive3" in output
|
|
|
+ assert "archive1" not in output
|
|
|
+ assert "archive2" not in output
|
|
|
+
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "-v", "--older=1d", exit_code=0)
|
|
|
+ assert "archive1" in output
|
|
|
+ assert "archive2" in output
|
|
|
+ assert "archive3" not in output
|
|
|
+
|
|
|
+
|
|
|
+def test_rlist_consider_checkpoints(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ repo_location = archiver.repository_location
|
|
|
+
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test1", src_dir)
|
|
|
+ # these are not really a checkpoints, but they look like some:
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test2.checkpoint", src_dir)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test3.checkpoint.1", src_dir)
|
|
|
+
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist")
|
|
|
+ assert "test1" in output
|
|
|
+ assert "test2.checkpoint" not in output
|
|
|
+ assert "test3.checkpoint.1" not in output
|
|
|
+
|
|
|
+ output = cmd(archiver, f"--repo={repo_location}", "rlist", "--consider-checkpoints")
|
|
|
+ assert "test1" in output
|
|
|
+ assert "test2.checkpoint" in output
|
|
|
+ assert "test3.checkpoint.1" in output
|
|
|
+
|
|
|
+
|
|
|
+def test_rlist_json(archivers, request):
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
+ repo_location, input_path = archiver.repository_location, archiver.input_path
|
|
|
+ create_regular_file(input_path, "file1", size=1024 * 80)
|
|
|
+
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
|
|
+ cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
|
|
|
+ list_repo = json.loads(cmd(archiver, f"--repo={repo_location}", "rlist", "--json"))
|
|
|
+ repository = list_repo["repository"]
|
|
|
+ assert len(repository["id"]) == 64
|
|
|
+ checkts(repository["last_modified"])
|
|
|
+ assert list_repo["encryption"]["mode"] == RK_ENCRYPTION[13:]
|
|
|
+ assert "keyfile" not in list_repo["encryption"]
|
|
|
+ archive0 = list_repo["archives"][0]
|
|
|
+ checkts(archive0["time"])
|