__init__.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from contextlib import contextmanager
  2. import filecmp
  3. import os
  4. import posix
  5. import sys
  6. import sysconfig
  7. import time
  8. import unittest
  9. from attic.helpers import st_mtime_ns
  10. from attic.xattr import get_all
  11. try:
  12. import llfuse
  13. # Does this version of llfuse support ns precision?
  14. have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  15. except ImportError:
  16. have_fuse_mtime_ns = False
  17. has_lchflags = hasattr(os, 'lchflags')
  18. # The mtime get/set precision varies on different OS and Python versions
  19. if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
  20. st_mtime_ns_round = 0
  21. elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
  22. st_mtime_ns_round = -6
  23. else:
  24. st_mtime_ns_round = -9
  25. has_mtime_ns = sys.version >= '3.3'
  26. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  27. class AtticTestCase(unittest.TestCase):
  28. """
  29. """
  30. assert_in = unittest.TestCase.assertIn
  31. assert_not_in = unittest.TestCase.assertNotIn
  32. assert_equal = unittest.TestCase.assertEqual
  33. assert_not_equal = unittest.TestCase.assertNotEqual
  34. assert_raises = unittest.TestCase.assertRaises
  35. assert_true = unittest.TestCase.assertTrue
  36. @contextmanager
  37. def assert_creates_file(self, path):
  38. self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))
  39. yield
  40. self.assert_true(os.path.exists(path), '{} should exist'.format(path))
  41. def assert_dirs_equal(self, dir1, dir2):
  42. diff = filecmp.dircmp(dir1, dir2)
  43. self._assert_dirs_equal_cmp(diff)
  44. def _assert_dirs_equal_cmp(self, diff):
  45. self.assert_equal(diff.left_only, [])
  46. self.assert_equal(diff.right_only, [])
  47. self.assert_equal(diff.diff_files, [])
  48. self.assert_equal(diff.funny_files, [])
  49. for filename in diff.common:
  50. path1 = os.path.join(diff.left, filename)
  51. path2 = os.path.join(diff.right, filename)
  52. s1 = os.lstat(path1)
  53. s2 = os.lstat(path2)
  54. # Assume path2 is on FUSE if st_dev is different
  55. fuse = s1.st_dev != s2.st_dev
  56. attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
  57. if has_lchflags:
  58. attrs.append('st_flags')
  59. if not fuse or not os.path.isdir(path1):
  60. # dir nlink is always 1 on our fuse filesystem
  61. attrs.append('st_nlink')
  62. d1 = [filename] + [getattr(s1, a) for a in attrs]
  63. d2 = [filename] + [getattr(s2, a) for a in attrs]
  64. if not os.path.islink(path1) or utime_supports_fd:
  65. # Older versions of llfuse does not support ns precision properly
  66. if fuse and not have_fuse_mtime_ns:
  67. d1.append(round(st_mtime_ns(s1), -4))
  68. d2.append(round(st_mtime_ns(s2), -4))
  69. d1.append(round(st_mtime_ns(s1), st_mtime_ns_round))
  70. d2.append(round(st_mtime_ns(s2), st_mtime_ns_round))
  71. d1.append(get_all(path1, follow_symlinks=False))
  72. d2.append(get_all(path2, follow_symlinks=False))
  73. self.assert_equal(d1, d2)
  74. for sub_diff in diff.subdirs.values():
  75. self._assert_dirs_equal_cmp(sub_diff)
  76. def wait_for_mount(self, path, timeout=5):
  77. """Wait until a filesystem is mounted on `path`
  78. """
  79. timeout += time.time()
  80. while timeout > time.time():
  81. if os.path.ismount(path):
  82. return
  83. time.sleep(.1)
  84. raise Exception('wait_for_mount(%s) timeout' % path)
  85. def get_tests(suite):
  86. """Generates a sequence of tests from a test suite
  87. """
  88. for item in suite:
  89. try:
  90. # TODO: This could be "yield from..." with Python 3.3+
  91. for i in get_tests(item):
  92. yield i
  93. except TypeError:
  94. yield item
  95. class TestLoader(unittest.TestLoader):
  96. """A customized test loader that properly detects and filters our test cases
  97. """
  98. def loadTestsFromName(self, pattern, module=None):
  99. suite = self.discover('attic.testsuite', '*.py')
  100. tests = unittest.TestSuite()
  101. for test in get_tests(suite):
  102. if pattern.lower() in test.id().lower():
  103. tests.addTest(test)
  104. return tests