__init__.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from contextlib import contextmanager
  2. import filecmp
  3. import os
  4. import posix
  5. import stat
  6. import sys
  7. import sysconfig
  8. import time
  9. import unittest
  10. from ..xattr import get_all
  11. # Note: this is used by borg.selftest, do not use or import py.test functionality here.
  12. try:
  13. import llfuse
  14. # Does this version of llfuse support ns precision?
  15. have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  16. except ImportError:
  17. have_fuse_mtime_ns = False
  18. try:
  19. from pytest import raises
  20. except ImportError:
  21. raises = None
  22. has_lchflags = hasattr(os, 'lchflags')
  23. # The mtime get/set precision varies on different OS and Python versions
  24. if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
  25. st_mtime_ns_round = 0
  26. elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
  27. st_mtime_ns_round = -6
  28. else:
  29. st_mtime_ns_round = -9
  30. if sys.platform.startswith('netbsd'):
  31. st_mtime_ns_round = -4 # only >1 microsecond resolution here?
  32. class BaseTestCase(unittest.TestCase):
  33. """
  34. """
  35. assert_in = unittest.TestCase.assertIn
  36. assert_not_in = unittest.TestCase.assertNotIn
  37. assert_equal = unittest.TestCase.assertEqual
  38. assert_not_equal = unittest.TestCase.assertNotEqual
  39. assert_true = unittest.TestCase.assertTrue
  40. if raises:
  41. assert_raises = staticmethod(raises)
  42. else:
  43. assert_raises = unittest.TestCase.assertRaises
  44. @contextmanager
  45. def assert_creates_file(self, path):
  46. self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))
  47. yield
  48. self.assert_true(os.path.exists(path), '{} should exist'.format(path))
  49. def assert_dirs_equal(self, dir1, dir2):
  50. diff = filecmp.dircmp(dir1, dir2)
  51. self._assert_dirs_equal_cmp(diff)
  52. def _assert_dirs_equal_cmp(self, diff):
  53. self.assert_equal(diff.left_only, [])
  54. self.assert_equal(diff.right_only, [])
  55. self.assert_equal(diff.diff_files, [])
  56. self.assert_equal(diff.funny_files, [])
  57. for filename in diff.common:
  58. path1 = os.path.join(diff.left, filename)
  59. path2 = os.path.join(diff.right, filename)
  60. s1 = os.lstat(path1)
  61. s2 = os.lstat(path2)
  62. # Assume path2 is on FUSE if st_dev is different
  63. fuse = s1.st_dev != s2.st_dev
  64. attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
  65. if has_lchflags:
  66. attrs.append('st_flags')
  67. if not fuse or not os.path.isdir(path1):
  68. # dir nlink is always 1 on our fuse filesystem
  69. attrs.append('st_nlink')
  70. d1 = [filename] + [getattr(s1, a) for a in attrs]
  71. d2 = [filename] + [getattr(s2, a) for a in attrs]
  72. # ignore st_rdev if file is not a block/char device, fixes #203
  73. if not stat.S_ISCHR(d1[1]) and not stat.S_ISBLK(d1[1]):
  74. d1[4] = None
  75. if not stat.S_ISCHR(d2[1]) and not stat.S_ISBLK(d2[1]):
  76. d2[4] = None
  77. # Older versions of llfuse do not support ns precision properly
  78. if fuse and not have_fuse_mtime_ns:
  79. d1.append(round(s1.st_mtime_ns, -4))
  80. d2.append(round(s2.st_mtime_ns, -4))
  81. else:
  82. d1.append(round(s1.st_mtime_ns, st_mtime_ns_round))
  83. d2.append(round(s2.st_mtime_ns, st_mtime_ns_round))
  84. d1.append(get_all(path1, follow_symlinks=False))
  85. d2.append(get_all(path2, follow_symlinks=False))
  86. self.assert_equal(d1, d2)
  87. for sub_diff in diff.subdirs.values():
  88. self._assert_dirs_equal_cmp(sub_diff)
  89. def wait_for_mount(self, path, timeout=5):
  90. """Wait until a filesystem is mounted on `path`
  91. """
  92. timeout += time.time()
  93. while timeout > time.time():
  94. if os.path.ismount(path):
  95. return
  96. time.sleep(.1)
  97. raise Exception('wait_for_mount(%s) timeout' % path)
  98. class changedir:
  99. def __init__(self, dir):
  100. self.dir = dir
  101. def __enter__(self):
  102. self.old = os.getcwd()
  103. os.chdir(self.dir)
  104. def __exit__(self, *args, **kw):
  105. os.chdir(self.old)
  106. class environment_variable:
  107. def __init__(self, **values):
  108. self.values = values
  109. self.old_values = {}
  110. def __enter__(self):
  111. for k, v in self.values.items():
  112. self.old_values[k] = os.environ.get(k)
  113. if v is None:
  114. os.environ.pop(k, None)
  115. else:
  116. os.environ[k] = v
  117. def __exit__(self, *args, **kw):
  118. for k, v in self.old_values.items():
  119. if v is None:
  120. os.environ.pop(k, None)
  121. else:
  122. os.environ[k] = v
  123. class FakeInputs:
  124. """Simulate multiple user inputs, can be used as input() replacement"""
  125. def __init__(self, inputs):
  126. self.inputs = inputs
  127. def __call__(self, prompt=None):
  128. if prompt is not None:
  129. print(prompt, end='')
  130. try:
  131. return self.inputs.pop(0)
  132. except IndexError:
  133. raise EOFError from None