__init__.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import filecmp
  2. import os
  3. import posix
  4. import sys
  5. import sysconfig
  6. import time
  7. import unittest
  8. from attic.helpers import st_mtime_ns
  9. from attic.xattr import get_all
  10. # The mtime get/set precison varies on different OS and Python versions
  11. if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
  12. st_mtime_ns_round = 0
  13. elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
  14. st_mtime_ns_round = -6
  15. else:
  16. st_mtime_ns_round = -9
  17. has_mtime_ns = sys.version >= '3.3'
  18. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  19. class AtticTestCase(unittest.TestCase):
  20. """
  21. """
  22. assert_equal = unittest.TestCase.assertEqual
  23. assert_not_equal = unittest.TestCase.assertNotEqual
  24. assert_raises = unittest.TestCase.assertRaises
  25. def _get_xattrs(self, path):
  26. try:
  27. return get_all(path, follow_symlinks=False)
  28. except EnvironmentError:
  29. return {}
  30. def assert_dirs_equal(self, dir1, dir2, fuse=False):
  31. diff = filecmp.dircmp(dir1, dir2)
  32. self._assert_dirs_equal_cmp(diff, fuse)
  33. def _assert_dirs_equal_cmp(self, diff, fuse=False):
  34. self.assert_equal(diff.left_only, [])
  35. self.assert_equal(diff.right_only, [])
  36. self.assert_equal(diff.diff_files, [])
  37. self.assert_equal(diff.funny_files, [])
  38. for filename in diff.common:
  39. path1 = os.path.join(diff.left, filename)
  40. path2 = os.path.join(diff.right, filename)
  41. s1 = os.lstat(path1)
  42. s2 = os.lstat(path2)
  43. attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
  44. if not fuse or not os.path.isdir(path1):
  45. # dir nlink is always 1 on our fuse fileystem
  46. attrs.append('st_nlink')
  47. d1 = [filename] + [getattr(s1, a) for a in attrs]
  48. d2 = [filename] + [getattr(s2, a) for a in attrs]
  49. if not os.path.islink(path1) or utime_supports_fd:
  50. # llfuse does not provide ns precision for now
  51. if fuse:
  52. d1.append(round(st_mtime_ns(s1), -4))
  53. d2.append(round(st_mtime_ns(s2), -4))
  54. else:
  55. d1.append(round(st_mtime_ns(s1), st_mtime_ns_round))
  56. d2.append(round(st_mtime_ns(s2), st_mtime_ns_round))
  57. d1.append(self._get_xattrs(path1))
  58. d2.append(self._get_xattrs(path2))
  59. self.assert_equal(d1, d2)
  60. for sub_diff in diff.subdirs.values():
  61. self._assert_dirs_equal_cmp(sub_diff, fuse)
  62. def wait_for_mount(self, path, timeout=5):
  63. """Wait until a filesystem is mounted on `path`
  64. """
  65. timeout += time.time()
  66. while timeout > time.time():
  67. if os.path.ismount(path):
  68. return
  69. time.sleep(.1)
  70. raise Exception('wait_for_mount(%s) timeout' % path)
  71. def get_tests(suite):
  72. """Generates a sequence of tests from a test suite
  73. """
  74. for item in suite:
  75. try:
  76. # TODO: This could be "yield from..." with Python 3.3+
  77. for i in get_tests(item):
  78. yield i
  79. except TypeError:
  80. yield item
  81. class TestLoader(unittest.TestLoader):
  82. """A customzied test loader that properly detects and filters our test cases
  83. """
  84. def loadTestsFromName(self, pattern, module=None):
  85. suite = self.discover('attic.testsuite', '*.py')
  86. tests = unittest.TestSuite()
  87. for test in get_tests(suite):
  88. if pattern.lower() in test.id().lower():
  89. tests.addTest(test)
  90. return tests