|
@@ -67,6 +67,8 @@ KF_ENCRYPTION = "--encryption=keyfile-chacha20-poly1305"
|
|
|
|
|
|
src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
|
|
+requires_hardlinks = pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
|
|
|
+
|
|
|
|
|
|
def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b"", binary_output=False, **kw):
|
|
|
if fork:
|
|
@@ -117,14 +119,6 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b"", binary_outpu
|
|
|
sys.stdin, sys.stdout, sys.stderr = stdin, stdout, stderr
|
|
|
|
|
|
|
|
|
-def have_gnutar():
|
|
|
- if not shutil.which("tar"):
|
|
|
- return False
|
|
|
- popen = subprocess.Popen(["tar", "--version"], stdout=subprocess.PIPE)
|
|
|
- stdout, stderr = popen.communicate()
|
|
|
- return b"GNU tar" in stdout
|
|
|
-
|
|
|
-
|
|
|
# check if the binary "borg.exe" is available (for local testing a symlink to virtualenv/bin/borg should do)
|
|
|
try:
|
|
|
exec_cmd("help", exe="borg.exe", fork=True)
|
|
@@ -390,8 +384,6 @@ class ArchiverTestCaseBase(BaseTestCase):
|
|
|
|
|
|
|
|
|
class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
- requires_hardlinks = pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
|
|
|
-
|
|
|
def get_security_dir(self):
|
|
|
repository_id = bin_to_hex(self._extract_repository_id(self.repository_path))
|
|
|
return get_security_dir(repository_id)
|
|
@@ -1830,7 +1822,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.cmd(f"--repo={self.repository_location}", "extract", "test", "--dry-run")
|
|
|
output = self.cmd(f"--repo={self.repository_location}", "check", "--show-version")
|
|
|
self.assert_in("borgbackup version", output) # implied output even without --info given
|
|
|
- #self.assert_not_in("Starting repository check", output) # --info not given for root logger
|
|
|
+ self.assert_not_in("Starting repository check", output) # --info not given for root logger
|
|
|
|
|
|
name = sorted(os.listdir(os.path.join(self.tmpdir, "repository", "data", "0")), reverse=True)[1]
|
|
|
with open(os.path.join(self.tmpdir, "repository", "data", "0", name), "r+b") as fd:
|
|
@@ -1839,117 +1831,6 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
output = self.cmd(f"--repo={self.repository_location}", "check", "--info", exit_code=1)
|
|
|
self.assert_in("Starting repository check", output) # --info given for root logger
|
|
|
|
|
|
- def test_readonly_check(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("test")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "check", "--verify-data", exit_code=EXIT_ERROR)
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "check", "--verify-data")
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- self.cmd(f"--repo={self.repository_location}", "check", "--verify-data", "--bypass-lock")
|
|
|
-
|
|
|
- def test_readonly_diff(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("a")
|
|
|
- self.create_src_archive("b")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "diff", "a", "b", exit_code=EXIT_ERROR)
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "diff", "a", "b")
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- self.cmd(f"--repo={self.repository_location}", "diff", "a", "b", "--bypass-lock")
|
|
|
-
|
|
|
- def test_readonly_export_tar(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("test")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "export-tar", "test", "test.tar", exit_code=EXIT_ERROR)
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "export-tar", "test", "test.tar")
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- self.cmd(f"--repo={self.repository_location}", "export-tar", "test", "test.tar", "--bypass-lock")
|
|
|
-
|
|
|
- def test_readonly_extract(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("test")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "extract", "test", exit_code=EXIT_ERROR)
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "extract", "test")
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- self.cmd(f"--repo={self.repository_location}", "extract", "test", "--bypass-lock")
|
|
|
-
|
|
|
- def test_readonly_info(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("test")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rinfo", exit_code=EXIT_ERROR)
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rinfo")
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rinfo", "--bypass-lock")
|
|
|
-
|
|
|
- def test_readonly_list(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("test")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rlist", exit_code=EXIT_ERROR)
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rlist")
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rlist", "--bypass-lock")
|
|
|
-
|
|
|
- @unittest.skipUnless(llfuse, "llfuse not installed")
|
|
|
- def test_readonly_mount(self):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.create_src_archive("test")
|
|
|
- with self.read_only(self.repository_path):
|
|
|
- # verify that command normally doesn't work with read-only repo
|
|
|
- if self.FORK_DEFAULT:
|
|
|
- with self.fuse_mount(self.repository_location, exit_code=EXIT_ERROR):
|
|
|
- pass
|
|
|
- else:
|
|
|
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
|
|
|
- # self.fuse_mount always assumes fork=True, so for this test we have to manually set fork=False
|
|
|
- with self.fuse_mount(self.repository_location, fork=False):
|
|
|
- pass
|
|
|
- if isinstance(excinfo.value, RemoteRepository.RPCError):
|
|
|
- assert excinfo.value.exception_class == "LockFailed"
|
|
|
- # verify that command works with read-only repo when using --bypass-lock
|
|
|
- with self.fuse_mount(self.repository_location, None, "--bypass-lock"):
|
|
|
- pass
|
|
|
-
|
|
|
@pytest.mark.skipif("BORG_TESTS_IGNORE_MODES" in os.environ, reason="modes unreliable")
|
|
|
def test_umask(self):
|
|
|
self.create_regular_file("file1", size=1024 * 80)
|
|
@@ -3574,133 +3455,6 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
|
|
|
self.cmd(f"--repo={self.repository_location}", "config", exit_code=2)
|
|
|
self.cmd(f"--repo={self.repository_location}", "config", "invalid-option", exit_code=1)
|
|
|
|
|
|
- requires_gnutar = pytest.mark.skipif(not have_gnutar(), reason="GNU tar must be installed for this test.")
|
|
|
- requires_gzip = pytest.mark.skipif(not shutil.which("gzip"), reason="gzip must be installed for this test.")
|
|
|
-
|
|
|
- @requires_gnutar
|
|
|
- def test_export_tar(self):
|
|
|
- self.create_test_files()
|
|
|
- os.unlink("input/flagfile")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test", "input")
|
|
|
- self.cmd(
|
|
|
- f"--repo={self.repository_location}", "export-tar", "test", "simple.tar", "--progress", "--tar-format=GNU"
|
|
|
- )
|
|
|
- with changedir("output"):
|
|
|
- # This probably assumes GNU tar. Note -p switch to extract permissions regardless of umask.
|
|
|
- subprocess.check_call(["tar", "xpf", "../simple.tar", "--warning=no-timestamp"])
|
|
|
- self.assert_dirs_equal("input", "output/input", ignore_flags=True, ignore_xattrs=True, ignore_ns=True)
|
|
|
-
|
|
|
- @requires_gnutar
|
|
|
- @requires_gzip
|
|
|
- def test_export_tar_gz(self):
|
|
|
- if not shutil.which("gzip"):
|
|
|
- pytest.skip("gzip is not installed")
|
|
|
- self.create_test_files()
|
|
|
- os.unlink("input/flagfile")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test", "input")
|
|
|
- list = self.cmd(
|
|
|
- f"--repo={self.repository_location}", "export-tar", "test", "simple.tar.gz", "--list", "--tar-format=GNU"
|
|
|
- )
|
|
|
- assert "input/file1\n" in list
|
|
|
- assert "input/dir2\n" in list
|
|
|
- with changedir("output"):
|
|
|
- subprocess.check_call(["tar", "xpf", "../simple.tar.gz", "--warning=no-timestamp"])
|
|
|
- self.assert_dirs_equal("input", "output/input", ignore_flags=True, ignore_xattrs=True, ignore_ns=True)
|
|
|
-
|
|
|
- @requires_gnutar
|
|
|
- def test_export_tar_strip_components(self):
|
|
|
- if not shutil.which("gzip"):
|
|
|
- pytest.skip("gzip is not installed")
|
|
|
- self.create_test_files()
|
|
|
- os.unlink("input/flagfile")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "test", "input")
|
|
|
- list = self.cmd(
|
|
|
- f"--repo={self.repository_location}",
|
|
|
- "export-tar",
|
|
|
- "test",
|
|
|
- "simple.tar",
|
|
|
- "--strip-components=1",
|
|
|
- "--list",
|
|
|
- "--tar-format=GNU",
|
|
|
- )
|
|
|
- # --list's path are those before processing with --strip-components
|
|
|
- assert "input/file1\n" in list
|
|
|
- assert "input/dir2\n" in list
|
|
|
- with changedir("output"):
|
|
|
- subprocess.check_call(["tar", "xpf", "../simple.tar", "--warning=no-timestamp"])
|
|
|
- self.assert_dirs_equal("input", "output/", ignore_flags=True, ignore_xattrs=True, ignore_ns=True)
|
|
|
-
|
|
|
- @requires_hardlinks
|
|
|
- @requires_gnutar
|
|
|
- def test_export_tar_strip_components_links(self):
|
|
|
- self._extract_hardlinks_setup()
|
|
|
- self.cmd(
|
|
|
- f"--repo={self.repository_location}",
|
|
|
- "export-tar",
|
|
|
- "test",
|
|
|
- "output.tar",
|
|
|
- "--strip-components=2",
|
|
|
- "--tar-format=GNU",
|
|
|
- )
|
|
|
- with changedir("output"):
|
|
|
- subprocess.check_call(["tar", "xpf", "../output.tar", "--warning=no-timestamp"])
|
|
|
- assert os.stat("hardlink").st_nlink == 2
|
|
|
- assert os.stat("subdir/hardlink").st_nlink == 2
|
|
|
- assert os.stat("aaaa").st_nlink == 2
|
|
|
- assert os.stat("source2").st_nlink == 2
|
|
|
-
|
|
|
- @requires_hardlinks
|
|
|
- @requires_gnutar
|
|
|
- def test_extract_hardlinks_tar(self):
|
|
|
- self._extract_hardlinks_setup()
|
|
|
- self.cmd(
|
|
|
- f"--repo={self.repository_location}", "export-tar", "test", "output.tar", "input/dir1", "--tar-format=GNU"
|
|
|
- )
|
|
|
- with changedir("output"):
|
|
|
- subprocess.check_call(["tar", "xpf", "../output.tar", "--warning=no-timestamp"])
|
|
|
- assert os.stat("input/dir1/hardlink").st_nlink == 2
|
|
|
- assert os.stat("input/dir1/subdir/hardlink").st_nlink == 2
|
|
|
- assert os.stat("input/dir1/aaaa").st_nlink == 2
|
|
|
- assert os.stat("input/dir1/source2").st_nlink == 2
|
|
|
-
|
|
|
- def test_import_tar(self, tar_format="PAX"):
|
|
|
- self.create_test_files(create_hardlinks=False) # hardlinks become separate files
|
|
|
- os.unlink("input/flagfile")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "src", "input")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "export-tar", "src", "simple.tar", f"--tar-format={tar_format}")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "import-tar", "dst", "simple.tar")
|
|
|
- with changedir(self.output_path):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "extract", "dst")
|
|
|
- self.assert_dirs_equal("input", "output/input", ignore_ns=True, ignore_xattrs=True)
|
|
|
-
|
|
|
- @requires_gzip
|
|
|
- def test_import_tar_gz(self, tar_format="GNU"):
|
|
|
- if not shutil.which("gzip"):
|
|
|
- pytest.skip("gzip is not installed")
|
|
|
- self.create_test_files(create_hardlinks=False) # hardlinks become separate files
|
|
|
- os.unlink("input/flagfile")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "src", "input")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "export-tar", "src", "simple.tgz", f"--tar-format={tar_format}")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "import-tar", "dst", "simple.tgz")
|
|
|
- with changedir(self.output_path):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "extract", "dst")
|
|
|
- self.assert_dirs_equal("input", "output/input", ignore_ns=True, ignore_xattrs=True)
|
|
|
-
|
|
|
- def test_roundtrip_pax_borg(self):
|
|
|
- self.create_test_files()
|
|
|
- self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "create", "src", "input")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "export-tar", "src", "simple.tar", "--tar-format=BORG")
|
|
|
- self.cmd(f"--repo={self.repository_location}", "import-tar", "dst", "simple.tar")
|
|
|
- with changedir(self.output_path):
|
|
|
- self.cmd(f"--repo={self.repository_location}", "extract", "dst")
|
|
|
- self.assert_dirs_equal("input", "output/input")
|
|
|
-
|
|
|
# derived from test_extract_xattrs_errors()
|
|
|
@pytest.mark.skipif(
|
|
|
not xattr.XATTR_FAKEROOT, reason="xattr not supported on this system or on this version of fakeroot"
|