__init__.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import filecmp
  2. import os
  3. import posix
  4. import sys
  5. import sysconfig
  6. import time
  7. import unittest
  8. from attic.helpers import st_mtime_ns
  9. from attic.xattr import get_all
  10. try:
  11. import llfuse
  12. # Does this version of llfuse support ns precision?
  13. have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  14. except ImportError:
  15. have_fuse_mtime_ns = False
  16. # The mtime get/set precison varies on different OS and Python versions
  17. if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
  18. st_mtime_ns_round = 0
  19. elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
  20. st_mtime_ns_round = -6
  21. else:
  22. st_mtime_ns_round = -9
  23. has_mtime_ns = sys.version >= '3.3'
  24. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  25. class AtticTestCase(unittest.TestCase):
  26. """
  27. """
  28. assert_equal = unittest.TestCase.assertEqual
  29. assert_not_equal = unittest.TestCase.assertNotEqual
  30. assert_raises = unittest.TestCase.assertRaises
  31. def _get_xattrs(self, path):
  32. try:
  33. return get_all(path, follow_symlinks=False)
  34. except EnvironmentError:
  35. return {}
  36. def assert_dirs_equal(self, dir1, dir2):
  37. diff = filecmp.dircmp(dir1, dir2)
  38. self._assert_dirs_equal_cmp(diff)
  39. def _assert_dirs_equal_cmp(self, diff):
  40. self.assert_equal(diff.left_only, [])
  41. self.assert_equal(diff.right_only, [])
  42. self.assert_equal(diff.diff_files, [])
  43. self.assert_equal(diff.funny_files, [])
  44. for filename in diff.common:
  45. path1 = os.path.join(diff.left, filename)
  46. path2 = os.path.join(diff.right, filename)
  47. s1 = os.lstat(path1)
  48. s2 = os.lstat(path2)
  49. # Assume path2 is on FUSE if st_dev is different
  50. fuse = s1.st_dev != s2.st_dev
  51. attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
  52. if not fuse or not os.path.isdir(path1):
  53. # dir nlink is always 1 on our fuse fileystem
  54. attrs.append('st_nlink')
  55. d1 = [filename] + [getattr(s1, a) for a in attrs]
  56. d2 = [filename] + [getattr(s2, a) for a in attrs]
  57. if not os.path.islink(path1) or utime_supports_fd:
  58. # Older versions of llfuse does not support ns precision properly
  59. if fuse and not have_fuse_mtime_ns:
  60. d1.append(round(st_mtime_ns(s1), -4))
  61. d2.append(round(st_mtime_ns(s2), -4))
  62. d1.append(round(st_mtime_ns(s1), st_mtime_ns_round))
  63. d2.append(round(st_mtime_ns(s2), st_mtime_ns_round))
  64. d1.append(self._get_xattrs(path1))
  65. d2.append(self._get_xattrs(path2))
  66. self.assert_equal(d1, d2)
  67. for sub_diff in diff.subdirs.values():
  68. self._assert_dirs_equal_cmp(sub_diff)
  69. def wait_for_mount(self, path, timeout=5):
  70. """Wait until a filesystem is mounted on `path`
  71. """
  72. timeout += time.time()
  73. while timeout > time.time():
  74. if os.path.ismount(path):
  75. return
  76. time.sleep(.1)
  77. raise Exception('wait_for_mount(%s) timeout' % path)
  78. def get_tests(suite):
  79. """Generates a sequence of tests from a test suite
  80. """
  81. for item in suite:
  82. try:
  83. # TODO: This could be "yield from..." with Python 3.3+
  84. for i in get_tests(item):
  85. yield i
  86. except TypeError:
  87. yield item
  88. class TestLoader(unittest.TestLoader):
  89. """A customzied test loader that properly detects and filters our test cases
  90. """
  91. def loadTestsFromName(self, pattern, module=None):
  92. suite = self.discover('attic.testsuite', '*.py')
  93. tests = unittest.TestSuite()
  94. for test in get_tests(suite):
  95. if pattern.lower() in test.id().lower():
  96. tests.addTest(test)
  97. return tests