|
@@ -13,7 +13,7 @@ import pytest
|
|
from ... import platform
|
|
from ... import platform
|
|
from ...constants import * # NOQA
|
|
from ...constants import * # NOQA
|
|
from ...manifest import Manifest
|
|
from ...manifest import Manifest
|
|
-from ...platform import is_cygwin
|
|
|
|
|
|
+from ...platform import is_cygwin, is_win32
|
|
from ...repository import Repository
|
|
from ...repository import Repository
|
|
from .. import has_lchflags
|
|
from .. import has_lchflags
|
|
from .. import changedir
|
|
from .. import changedir
|
|
@@ -119,6 +119,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
# we have all fs items exactly once!
|
|
# we have all fs items exactly once!
|
|
assert sorted(paths) == ["input", "input/a", "input/a/hardlink", "input/b", "input/b/hardlink"]
|
|
assert sorted(paths) == ["input", "input/a", "input/a/hardlink", "input/b", "input/b/hardlink"]
|
|
|
|
|
|
|
|
+ @pytest.mark.skipif(is_win32, reason="unix sockets not available on windows")
|
|
def test_unix_socket(self):
|
|
def test_unix_socket(self):
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
try:
|
|
try:
|
|
@@ -204,14 +205,14 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
"exit 73;",
|
|
"exit 73;",
|
|
exit_code=2,
|
|
exit_code=2,
|
|
)
|
|
)
|
|
- assert output.endswith("Command 'sh' exited with status 73\n")
|
|
|
|
|
|
+ assert output.endswith("Command 'sh' exited with status 73" + os.linesep)
|
|
archive_list = json.loads(self.cmd(f"--repo={self.repository_location}", "rlist", "--json"))
|
|
archive_list = json.loads(self.cmd(f"--repo={self.repository_location}", "rlist", "--json"))
|
|
assert archive_list["archives"] == []
|
|
assert archive_list["archives"] == []
|
|
|
|
|
|
def test_create_content_from_command_missing_command(self):
|
|
def test_create_content_from_command_missing_command(self):
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
output = self.cmd(f"--repo={self.repository_location}", "create", "test", "--content-from-command", exit_code=2)
|
|
output = self.cmd(f"--repo={self.repository_location}", "create", "test", "--content-from-command", exit_code=2)
|
|
- assert output.endswith("No command given.\n")
|
|
|
|
|
|
+ assert output.endswith("No command given." + os.linesep)
|
|
|
|
|
|
def test_create_paths_from_stdin(self):
|
|
def test_create_paths_from_stdin(self):
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
@@ -242,9 +243,20 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
self.create_regular_file("file4", size=1024 * 80)
|
|
self.create_regular_file("file4", size=1024 * 80)
|
|
|
|
|
|
input_data = "input/file1\ninput/file2\ninput/file3"
|
|
input_data = "input/file1\ninput/file2\ninput/file3"
|
|
|
|
+ if is_win32:
|
|
|
|
+ with open("filenames.cmd", "w") as script:
|
|
|
|
+ for filename in input_data.splitlines():
|
|
|
|
+ script.write(f"@echo {filename}\n")
|
|
self.cmd(
|
|
self.cmd(
|
|
- f"--repo={self.repository_location}", "create", "--paths-from-command", "test", "--", "echo", input_data
|
|
|
|
|
|
+ f"--repo={self.repository_location}",
|
|
|
|
+ "create",
|
|
|
|
+ "--paths-from-command",
|
|
|
|
+ "test",
|
|
|
|
+ "--",
|
|
|
|
+ "filenames.cmd" if is_win32 else "echo",
|
|
|
|
+ input_data,
|
|
)
|
|
)
|
|
|
|
+
|
|
archive_list = self.cmd(f"--repo={self.repository_location}", "list", "test", "--json-lines")
|
|
archive_list = self.cmd(f"--repo={self.repository_location}", "list", "test", "--json-lines")
|
|
paths = [json.loads(line)["path"] for line in archive_list.split("\n") if line]
|
|
paths = [json.loads(line)["path"] for line in archive_list.split("\n") if line]
|
|
assert paths == ["input/file1", "input/file2", "input/file3"]
|
|
assert paths == ["input/file1", "input/file2", "input/file3"]
|
|
@@ -262,14 +274,14 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
"exit 73;",
|
|
"exit 73;",
|
|
exit_code=2,
|
|
exit_code=2,
|
|
)
|
|
)
|
|
- assert output.endswith("Command 'sh' exited with status 73\n")
|
|
|
|
|
|
+ assert output.endswith("Command 'sh' exited with status 73" + os.linesep)
|
|
archive_list = json.loads(self.cmd(f"--repo={self.repository_location}", "rlist", "--json"))
|
|
archive_list = json.loads(self.cmd(f"--repo={self.repository_location}", "rlist", "--json"))
|
|
assert archive_list["archives"] == []
|
|
assert archive_list["archives"] == []
|
|
|
|
|
|
def test_create_paths_from_command_missing_command(self):
|
|
def test_create_paths_from_command_missing_command(self):
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
output = self.cmd(f"--repo={self.repository_location}", "create", "test", "--paths-from-command", exit_code=2)
|
|
output = self.cmd(f"--repo={self.repository_location}", "create", "test", "--paths-from-command", exit_code=2)
|
|
- assert output.endswith("No command given.\n")
|
|
|
|
|
|
+ assert output.endswith("No command given." + os.linesep)
|
|
|
|
|
|
def test_create_without_root(self):
|
|
def test_create_without_root(self):
|
|
"""test create without a root"""
|
|
"""test create without a root"""
|
|
@@ -500,6 +512,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
self.cmd(f"--repo={self.repository_location}", "create", "test", "input", "input")
|
|
self.cmd(f"--repo={self.repository_location}", "create", "test", "input", "input")
|
|
|
|
|
|
@pytest.mark.skipif("BORG_TESTS_IGNORE_MODES" in os.environ, reason="modes unreliable")
|
|
@pytest.mark.skipif("BORG_TESTS_IGNORE_MODES" in os.environ, reason="modes unreliable")
|
|
|
|
+ @pytest.mark.skipif(is_win32, reason="modes unavailable on Windows")
|
|
def test_umask(self):
|
|
def test_umask(self):
|
|
self.create_regular_file("file1", size=1024 * 80)
|
|
self.create_regular_file("file1", size=1024 * 80)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
@@ -545,6 +558,9 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
# https://borgbackup.readthedocs.org/en/latest/faq.html#i-am-seeing-a-added-status-for-a-unchanged-file
|
|
# https://borgbackup.readthedocs.org/en/latest/faq.html#i-am-seeing-a-added-status-for-a-unchanged-file
|
|
self.assert_in("A input/file2", output)
|
|
self.assert_in("A input/file2", output)
|
|
|
|
|
|
|
|
+ @pytest.mark.skipif(
|
|
|
|
+ is_win32, reason="ctime attribute is file creation time on Windows"
|
|
|
|
+ ) # see https://docs.python.org/3/library/os.html#os.stat_result.st_ctime
|
|
def test_file_status_cs_cache_mode(self):
|
|
def test_file_status_cs_cache_mode(self):
|
|
"""test that a changed file with faked "previous" mtime still gets backed up in ctime,size cache_mode"""
|
|
"""test that a changed file with faked "previous" mtime still gets backed up in ctime,size cache_mode"""
|
|
self.create_regular_file("file1", contents=b"123")
|
|
self.create_regular_file("file1", contents=b"123")
|
|
@@ -740,6 +756,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
extracted_data = f.read()
|
|
extracted_data = f.read()
|
|
assert extracted_data == data
|
|
assert extracted_data == data
|
|
|
|
|
|
|
|
+ @pytest.mark.skipif(not are_symlinks_supported(), reason="symlinks not supported")
|
|
def test_create_read_special_broken_symlink(self):
|
|
def test_create_read_special_broken_symlink(self):
|
|
os.symlink("somewhere does not exist", os.path.join(self.input_path, "link"))
|
|
os.symlink("somewhere does not exist", os.path.join(self.input_path, "link"))
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
@@ -784,7 +801,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
|
|
|
# Test case set up: create a repository and a file
|
|
# Test case set up: create a repository and a file
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
|
|
- self.create_regular_file("testfile", contents=randbytes(15000000)) # more data might be needed for faster CPUs
|
|
|
|
|
|
+ self.create_regular_file("testfile", contents=randbytes(50000000))
|
|
# Archive
|
|
# Archive
|
|
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
|
|
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
|
|
hashing_time = extract_hashing_time(result)
|
|
hashing_time = extract_hashing_time(result)
|
|
@@ -802,7 +819,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
|
|
|
# Test case set up: create a repository and a file
|
|
# Test case set up: create a repository and a file
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
- self.create_regular_file("testfile", contents=randbytes(10000000))
|
|
|
|
|
|
+ self.create_regular_file("testfile", contents=randbytes(50000000))
|
|
# Archive
|
|
# Archive
|
|
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
|
|
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
|
|
chunking_time = extract_chunking_time(result)
|
|
chunking_time = extract_chunking_time(result)
|