|
@@ -1,5 +1,4 @@
|
|
|
from contextlib import contextmanager
|
|
|
-import filecmp
|
|
|
import functools
|
|
|
import os
|
|
|
|
|
@@ -8,34 +7,25 @@ try:
|
|
|
except ImportError:
|
|
|
posix = None
|
|
|
|
|
|
-import re
|
|
|
import stat
|
|
|
import sys
|
|
|
import sysconfig
|
|
|
import tempfile
|
|
|
-import time
|
|
|
import unittest
|
|
|
|
|
|
-from ..xattr import get_all
|
|
|
-from ..platform import get_flags
|
|
|
-from ..platformflags import is_win32
|
|
|
-from ..helpers import umount
|
|
|
-from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
|
|
|
-from .. import platform
|
|
|
-
|
|
|
-
|
|
|
-# Note: this is used by borg.selftest, do not use or import py.test functionality here.
|
|
|
-
|
|
|
-from ..fuse_impl import llfuse, has_pyfuse3, has_llfuse
|
|
|
-
|
|
|
-# Does this version of llfuse support ns precision?
|
|
|
-have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, "st_mtime_ns") if llfuse else False
|
|
|
-
|
|
|
+# Note: this is used by borg.selftest, do not *require* pytest functionality here.
|
|
|
try:
|
|
|
from pytest import raises
|
|
|
except: # noqa
|
|
|
raises = None
|
|
|
|
|
|
+from ..fuse_impl import llfuse, has_llfuse, has_pyfuse3
|
|
|
+from .. import platform
|
|
|
+from ..platformflags import is_win32
|
|
|
+
|
|
|
+# Does this version of llfuse support ns precision?
|
|
|
+have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, "st_mtime_ns") if llfuse else False
|
|
|
+
|
|
|
has_lchflags = hasattr(os, "lchflags") or sys.platform.startswith("linux")
|
|
|
try:
|
|
|
with tempfile.NamedTemporaryFile() as file:
|
|
@@ -86,6 +76,14 @@ def unopened_tempfile():
|
|
|
yield os.path.join(tempdir, "file")
|
|
|
|
|
|
|
|
|
+@contextmanager
|
|
|
+def changedir(dir):
|
|
|
+ cwd = os.getcwd()
|
|
|
+ os.chdir(dir)
|
|
|
+ yield
|
|
|
+ os.chdir(cwd)
|
|
|
+
|
|
|
+
|
|
|
def is_root():
|
|
|
"""return True if running with high privileges, like as root"""
|
|
|
if is_win32:
|
|
@@ -194,193 +192,11 @@ def no_selinux(x):
|
|
|
|
|
|
|
|
|
class BaseTestCase(unittest.TestCase):
|
|
|
- """ """
|
|
|
-
|
|
|
assert_in = unittest.TestCase.assertIn
|
|
|
assert_not_in = unittest.TestCase.assertNotIn
|
|
|
assert_equal = unittest.TestCase.assertEqual
|
|
|
assert_not_equal = unittest.TestCase.assertNotEqual
|
|
|
-
|
|
|
- if raises:
|
|
|
- assert_raises = staticmethod(raises)
|
|
|
- else:
|
|
|
- assert_raises = unittest.TestCase.assertRaises # type: ignore
|
|
|
-
|
|
|
- @contextmanager
|
|
|
- def assert_creates_file(self, path):
|
|
|
- assert not os.path.exists(path), f"{path} should not exist"
|
|
|
- yield
|
|
|
- assert os.path.exists(path), f"{path} should exist"
|
|
|
-
|
|
|
- def assert_dirs_equal(self, dir1, dir2, **kwargs):
|
|
|
- diff = filecmp.dircmp(dir1, dir2)
|
|
|
- self._assert_dirs_equal_cmp(diff, **kwargs)
|
|
|
-
|
|
|
- def assert_line_exists(self, lines, expected_regexpr):
|
|
|
- assert any(re.search(expected_regexpr, line) for line in lines), f"no match for {expected_regexpr} in {lines}"
|
|
|
-
|
|
|
- def _assert_dirs_equal_cmp(self, diff, ignore_flags=False, ignore_xattrs=False, ignore_ns=False):
|
|
|
- self.assert_equal(diff.left_only, [])
|
|
|
- self.assert_equal(diff.right_only, [])
|
|
|
- self.assert_equal(diff.diff_files, [])
|
|
|
- self.assert_equal(diff.funny_files, [])
|
|
|
- for filename in diff.common:
|
|
|
- path1 = os.path.join(diff.left, filename)
|
|
|
- path2 = os.path.join(diff.right, filename)
|
|
|
- s1 = os.stat(path1, follow_symlinks=False)
|
|
|
- s2 = os.stat(path2, follow_symlinks=False)
|
|
|
- # Assume path2 is on FUSE if st_dev is different
|
|
|
- fuse = s1.st_dev != s2.st_dev
|
|
|
- attrs = ["st_uid", "st_gid", "st_rdev"]
|
|
|
- if not fuse or not os.path.isdir(path1):
|
|
|
- # dir nlink is always 1 on our FUSE filesystem
|
|
|
- attrs.append("st_nlink")
|
|
|
- d1 = [filename] + [getattr(s1, a) for a in attrs]
|
|
|
- d2 = [filename] + [getattr(s2, a) for a in attrs]
|
|
|
- d1.insert(1, oct(s1.st_mode))
|
|
|
- d2.insert(1, oct(s2.st_mode))
|
|
|
- if not ignore_flags:
|
|
|
- d1.append(get_flags(path1, s1))
|
|
|
- d2.append(get_flags(path2, s2))
|
|
|
- # ignore st_rdev if file is not a block/char device, fixes #203
|
|
|
- if not stat.S_ISCHR(s1.st_mode) and not stat.S_ISBLK(s1.st_mode):
|
|
|
- d1[4] = None
|
|
|
- if not stat.S_ISCHR(s2.st_mode) and not stat.S_ISBLK(s2.st_mode):
|
|
|
- d2[4] = None
|
|
|
- # If utime isn't fully supported, borg can't set mtime.
|
|
|
- # Therefore, we shouldn't test it in that case.
|
|
|
- if is_utime_fully_supported():
|
|
|
- # Older versions of llfuse do not support ns precision properly
|
|
|
- if ignore_ns:
|
|
|
- d1.append(int(s1.st_mtime_ns / 1e9))
|
|
|
- d2.append(int(s2.st_mtime_ns / 1e9))
|
|
|
- elif fuse and not have_fuse_mtime_ns:
|
|
|
- d1.append(round(s1.st_mtime_ns, -4))
|
|
|
- d2.append(round(s2.st_mtime_ns, -4))
|
|
|
- else:
|
|
|
- d1.append(round(s1.st_mtime_ns, st_mtime_ns_round))
|
|
|
- d2.append(round(s2.st_mtime_ns, st_mtime_ns_round))
|
|
|
- if not ignore_xattrs:
|
|
|
- d1.append(no_selinux(get_all(path1, follow_symlinks=False)))
|
|
|
- d2.append(no_selinux(get_all(path2, follow_symlinks=False)))
|
|
|
- self.assert_equal(d1, d2)
|
|
|
- for sub_diff in diff.subdirs.values():
|
|
|
- self._assert_dirs_equal_cmp(
|
|
|
- sub_diff, ignore_flags=ignore_flags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns
|
|
|
- )
|
|
|
-
|
|
|
- @contextmanager
|
|
|
- def fuse_mount(self, location, mountpoint=None, *options, fork=True, os_fork=False, **kwargs):
|
|
|
- # For a successful mount, `fork = True` is required for
|
|
|
- # the borg mount daemon to work properly or the tests
|
|
|
- # will just freeze. Therefore, if argument `fork` is not
|
|
|
- # specified, the default value is `True`, regardless of
|
|
|
- # `FORK_DEFAULT`. However, leaving the possibility to run
|
|
|
- # the command with `fork = False` is still necessary for
|
|
|
- # testing for mount failures, for example attempting to
|
|
|
- # mount a read-only repo.
|
|
|
- # `os_fork = True` is needed for testing (the absence of)
|
|
|
- # a race condition of the Lock during lock migration when
|
|
|
- # borg mount (local repo) is daemonizing (#4953). This is another
|
|
|
- # example where we need `fork = False`, because the test case
|
|
|
- # needs an OS fork, not a spawning of the fuse mount.
|
|
|
- # `fork = False` is implied if `os_fork = True`.
|
|
|
- if mountpoint is None:
|
|
|
- mountpoint = tempfile.mkdtemp()
|
|
|
- else:
|
|
|
- os.mkdir(mountpoint)
|
|
|
- args = [f"--repo={location}", "mount", mountpoint] + list(options)
|
|
|
- if os_fork:
|
|
|
- # Do not spawn, but actually (OS) fork.
|
|
|
- if os.fork() == 0:
|
|
|
- # The child process.
|
|
|
- # Decouple from parent and fork again.
|
|
|
- # Otherwise, it becomes a zombie and pretends to be alive.
|
|
|
- os.setsid()
|
|
|
- if os.fork() > 0:
|
|
|
- os._exit(0)
|
|
|
- # The grandchild process.
|
|
|
- try:
|
|
|
- self.cmd(*args, fork=False, **kwargs) # borg mount not spawning.
|
|
|
- finally:
|
|
|
- # This should never be reached, since it daemonizes,
|
|
|
- # and the grandchild process exits before cmd() returns.
|
|
|
- # However, just in case...
|
|
|
- print("Fatal: borg mount did not daemonize properly. Force exiting.", file=sys.stderr, flush=True)
|
|
|
- os._exit(0)
|
|
|
- else:
|
|
|
- self.cmd(*args, fork=fork, **kwargs)
|
|
|
- if kwargs.get("exit_code", EXIT_SUCCESS) == EXIT_ERROR:
|
|
|
- # If argument `exit_code = EXIT_ERROR`, then this call
|
|
|
- # is testing the behavior of an unsuccessful mount and
|
|
|
- # we must not continue, as there is no mount to work
|
|
|
- # with. The test itself has already failed or succeeded
|
|
|
- # with the call to `self.cmd`, above.
|
|
|
- yield
|
|
|
- return
|
|
|
- self.wait_for_mountstate(mountpoint, mounted=True)
|
|
|
- yield
|
|
|
- umount(mountpoint)
|
|
|
- self.wait_for_mountstate(mountpoint, mounted=False)
|
|
|
- os.rmdir(mountpoint)
|
|
|
- # Give the daemon some time to exit
|
|
|
- time.sleep(0.2)
|
|
|
-
|
|
|
- def wait_for_mountstate(self, mountpoint, *, mounted, timeout=5):
|
|
|
- """Wait until a path meets specified mount point status"""
|
|
|
- timeout += time.time()
|
|
|
- while timeout > time.time():
|
|
|
- if os.path.ismount(mountpoint) == mounted:
|
|
|
- return
|
|
|
- time.sleep(0.1)
|
|
|
- message = "Waiting for {} of {}".format("mount" if mounted else "umount", mountpoint)
|
|
|
- raise TimeoutError(message)
|
|
|
-
|
|
|
- @contextmanager
|
|
|
- def read_only(self, path):
|
|
|
- """Some paths need to be made read-only for testing
|
|
|
-
|
|
|
- If the tests are executed inside a fakeroot environment, the
|
|
|
- changes from chmod won't affect the real permissions of that
|
|
|
- folder. This issue is circumvented by temporarily disabling
|
|
|
- fakeroot with `LD_PRELOAD=`.
|
|
|
-
|
|
|
- Using chmod to remove write permissions is not enough if the
|
|
|
- tests are running with root privileges. Instead, the folder is
|
|
|
- rendered immutable with chattr or chflags, respectively.
|
|
|
- """
|
|
|
- if sys.platform.startswith("linux"):
|
|
|
- cmd_immutable = 'chattr +i "%s"' % path
|
|
|
- cmd_mutable = 'chattr -i "%s"' % path
|
|
|
- elif sys.platform.startswith(("darwin", "freebsd", "netbsd", "openbsd")):
|
|
|
- cmd_immutable = 'chflags uchg "%s"' % path
|
|
|
- cmd_mutable = 'chflags nouchg "%s"' % path
|
|
|
- elif sys.platform.startswith("sunos"): # openindiana
|
|
|
- cmd_immutable = 'chmod S+vimmutable "%s"' % path
|
|
|
- cmd_mutable = 'chmod S-vimmutable "%s"' % path
|
|
|
- else:
|
|
|
- message = "Testing read-only repos is not supported on platform %s" % sys.platform
|
|
|
- self.skipTest(message)
|
|
|
- try:
|
|
|
- os.system('LD_PRELOAD= chmod -R ugo-w "%s"' % path)
|
|
|
- os.system(cmd_immutable)
|
|
|
- yield
|
|
|
- finally:
|
|
|
- # Restore permissions to ensure clean-up doesn't fail
|
|
|
- os.system(cmd_mutable)
|
|
|
- os.system('LD_PRELOAD= chmod -R ugo+w "%s"' % path)
|
|
|
-
|
|
|
-
|
|
|
-class changedir:
|
|
|
- def __init__(self, dir):
|
|
|
- self.dir = dir
|
|
|
-
|
|
|
- def __enter__(self):
|
|
|
- self.old = os.getcwd()
|
|
|
- os.chdir(self.dir)
|
|
|
-
|
|
|
- def __exit__(self, *args, **kw):
|
|
|
- os.chdir(self.old)
|
|
|
+ assert_raises = staticmethod(raises) if raises else unittest.TestCase.assertRaises # type: ignore
|
|
|
|
|
|
|
|
|
class FakeInputs:
|