| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 | from contextlib import contextmanagerimport filecmpimport osimport posiximport sysimport sysconfigimport timeimport unittestfrom attic.helpers import st_mtime_nsfrom attic.xattr import get_alltry:    import llfuse    # Does this version of llfuse support ns precision?    have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')except ImportError:    have_fuse_mtime_ns = Falsehas_lchflags = hasattr(os, 'lchflags')# The mtime get/set precison varies on different OS and Python versionsif 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):    st_mtime_ns_round = 0elif 'HAVE_UTIMES' in sysconfig.get_config_vars():    st_mtime_ns_round = -6else:    st_mtime_ns_round = -9has_mtime_ns = sys.version >= '3.3'utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})class AtticTestCase(unittest.TestCase):    """    """    assert_in = unittest.TestCase.assertIn    assert_not_in = unittest.TestCase.assertNotIn    assert_equal = unittest.TestCase.assertEqual    assert_not_equal = unittest.TestCase.assertNotEqual    assert_raises = unittest.TestCase.assertRaises    assert_true = unittest.TestCase.assertTrue    @contextmanager    def assert_creates_file(self, path):        self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))        yield        self.assert_true(os.path.exists(path), '{} should exist'.format(path))    def assert_dirs_equal(self, dir1, dir2):        diff = filecmp.dircmp(dir1, dir2)        self._assert_dirs_equal_cmp(diff)    def _assert_dirs_equal_cmp(self, diff):        self.assert_equal(diff.left_only, [])        self.assert_equal(diff.right_only, [])        self.assert_equal(diff.diff_files, [])        self.assert_equal(diff.funny_files, [])        for filename in diff.common:            path1 = os.path.join(diff.left, filename)            path2 = os.path.join(diff.right, filename)            s1 = os.lstat(path1)            s2 = os.lstat(path2)            # Assume path2 is on FUSE if st_dev is different            fuse = s1.st_dev != s2.st_dev            attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']            if has_lchflags:                attrs.append('st_flags')            if not fuse or not os.path.isdir(path1):                # dir nlink is always 1 on our fuse fileystem                attrs.append('st_nlink')            d1 = [filename] + [getattr(s1, a) for a in attrs]            d2 = [filename] + [getattr(s2, a) for a in attrs]            if not os.path.islink(path1) or utime_supports_fd:                # Older versions of llfuse does not support ns precision properly                if fuse and not have_fuse_mtime_ns:                    d1.append(round(st_mtime_ns(s1), -4))                    d2.append(round(st_mtime_ns(s2), -4))                d1.append(round(st_mtime_ns(s1), st_mtime_ns_round))                d2.append(round(st_mtime_ns(s2), st_mtime_ns_round))            d1.append(get_all(path1, follow_symlinks=False))            d2.append(get_all(path2, follow_symlinks=False))            self.assert_equal(d1, d2)        for sub_diff in diff.subdirs.values():            self._assert_dirs_equal_cmp(sub_diff)    def wait_for_mount(self, path, timeout=5):        """Wait until a filesystem is mounted on `path`        """        timeout += time.time()        while timeout > time.time():            if os.path.ismount(path):                return            time.sleep(.1)        raise Exception('wait_for_mount(%s) timeout' % path)def get_tests(suite):    """Generates a sequence of tests from a test suite    """    for item in suite:        try:            # TODO: This could be "yield from..." with Python 3.3+            for i in get_tests(item):                yield i        except TypeError:            yield itemclass TestLoader(unittest.TestLoader):    """A customzied test loader that properly detects and filters our test cases    """    def loadTestsFromName(self, pattern, module=None):        suite = self.discover('attic.testsuite', '*.py')        tests = unittest.TestSuite()        for test in get_tests(suite):            if pattern.lower() in test.id().lower():                tests.addTest(test)        return tests
 |