__init__.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import filecmp
  2. import os
  3. import posix
  4. import sys
  5. import sysconfig
  6. import time
  7. import unittest
  8. from attic.helpers import st_mtime_ns
  9. from attic.xattr import get_all
  10. try:
  11. import llfuse
  12. # Does this version of llfuse support ns precision?
  13. have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  14. except ImportError:
  15. have_fuse_mtime_ns = False
  16. # The mtime get/set precison varies on different OS and Python versions
  17. if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
  18. st_mtime_ns_round = 0
  19. elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
  20. st_mtime_ns_round = -6
  21. else:
  22. st_mtime_ns_round = -9
  23. has_mtime_ns = sys.version >= '3.3'
  24. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  25. class AtticTestCase(unittest.TestCase):
  26. """
  27. """
  28. assert_in = unittest.TestCase.assertIn
  29. assert_not_in = unittest.TestCase.assertNotIn
  30. assert_equal = unittest.TestCase.assertEqual
  31. assert_not_equal = unittest.TestCase.assertNotEqual
  32. assert_raises = unittest.TestCase.assertRaises
  33. def _get_xattrs(self, path):
  34. try:
  35. return get_all(path, follow_symlinks=False)
  36. except EnvironmentError:
  37. return {}
  38. def assert_dirs_equal(self, dir1, dir2):
  39. diff = filecmp.dircmp(dir1, dir2)
  40. self._assert_dirs_equal_cmp(diff)
  41. def _assert_dirs_equal_cmp(self, diff):
  42. self.assert_equal(diff.left_only, [])
  43. self.assert_equal(diff.right_only, [])
  44. self.assert_equal(diff.diff_files, [])
  45. self.assert_equal(diff.funny_files, [])
  46. for filename in diff.common:
  47. path1 = os.path.join(diff.left, filename)
  48. path2 = os.path.join(diff.right, filename)
  49. s1 = os.lstat(path1)
  50. s2 = os.lstat(path2)
  51. # Assume path2 is on FUSE if st_dev is different
  52. fuse = s1.st_dev != s2.st_dev
  53. attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
  54. if not fuse or not os.path.isdir(path1):
  55. # dir nlink is always 1 on our fuse fileystem
  56. attrs.append('st_nlink')
  57. d1 = [filename] + [getattr(s1, a) for a in attrs]
  58. d2 = [filename] + [getattr(s2, a) for a in attrs]
  59. if not os.path.islink(path1) or utime_supports_fd:
  60. # Older versions of llfuse does not support ns precision properly
  61. if fuse and not have_fuse_mtime_ns:
  62. d1.append(round(st_mtime_ns(s1), -4))
  63. d2.append(round(st_mtime_ns(s2), -4))
  64. d1.append(round(st_mtime_ns(s1), st_mtime_ns_round))
  65. d2.append(round(st_mtime_ns(s2), st_mtime_ns_round))
  66. d1.append(self._get_xattrs(path1))
  67. d2.append(self._get_xattrs(path2))
  68. self.assert_equal(d1, d2)
  69. for sub_diff in diff.subdirs.values():
  70. self._assert_dirs_equal_cmp(sub_diff)
  71. def wait_for_mount(self, path, timeout=5):
  72. """Wait until a filesystem is mounted on `path`
  73. """
  74. timeout += time.time()
  75. while timeout > time.time():
  76. if os.path.ismount(path):
  77. return
  78. time.sleep(.1)
  79. raise Exception('wait_for_mount(%s) timeout' % path)
  80. def get_tests(suite):
  81. """Generates a sequence of tests from a test suite
  82. """
  83. for item in suite:
  84. try:
  85. # TODO: This could be "yield from..." with Python 3.3+
  86. for i in get_tests(item):
  87. yield i
  88. except TypeError:
  89. yield item
  90. class TestLoader(unittest.TestLoader):
  91. """A customzied test loader that properly detects and filters our test cases
  92. """
  93. def loadTestsFromName(self, pattern, module=None):
  94. suite = self.discover('attic.testsuite', '*.py')
  95. tests = unittest.TestSuite()
  96. for test in get_tests(suite):
  97. if pattern.lower() in test.id().lower():
  98. tests.addTest(test)
  99. return tests