فهرست منبع

Don't use os.listxattr even if available.

Jonas Borgström 12 سال پیش
والد
کامیت
8a29017910
2فایلهای تغییر یافته به همراه224 افزوده شده و 233 حذف شده
  1. 2 8
      attic/testsuite/__init__.py
  2. 222 225
      attic/xattr.py

+ 2 - 8
attic/testsuite/__init__.py

@@ -38,12 +38,6 @@ class AtticTestCase(unittest.TestCase):
     assert_not_equal = unittest.TestCase.assertNotEqual
     assert_raises = unittest.TestCase.assertRaises
 
-    def _get_xattrs(self, path):
-        try:
-            return get_all(path, follow_symlinks=False)
-        except EnvironmentError:
-            return {}
-
     def assert_dirs_equal(self, dir1, dir2):
         diff = filecmp.dircmp(dir1, dir2)
         self._assert_dirs_equal_cmp(diff)
@@ -73,8 +67,8 @@ class AtticTestCase(unittest.TestCase):
                     d2.append(round(st_mtime_ns(s2), -4))
                 d1.append(round(st_mtime_ns(s1), st_mtime_ns_round))
                 d2.append(round(st_mtime_ns(s2), st_mtime_ns_round))
-            d1.append(self._get_xattrs(path1))
-            d2.append(self._get_xattrs(path2))
+            d1.append(get_all(path1, follow_symlinks=False))
+            d2.append(get_all(path2, follow_symlinks=False))
             self.assert_equal(d1, d2)
         for sub_diff in diff.subdirs.values():
             self._assert_dirs_equal_cmp(sub_diff)

+ 222 - 225
attic/xattr.py

@@ -4,6 +4,8 @@ import errno
 import os
 import sys
 import tempfile
+from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
+from ctypes.util import find_library
 
 
 def is_enabled():
@@ -18,6 +20,8 @@ def is_enabled():
 
 
 def get_all(path, follow_symlinks=True):
+    return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
+                for name in listxattr(path, follow_symlinks=follow_symlinks))
     try:
         return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
                     for name in listxattr(path, follow_symlinks=follow_symlinks))
@@ -25,232 +29,225 @@ def get_all(path, follow_symlinks=True):
         if e.errno in (errno.ENOTSUP, errno.EPERM):
             return {}
 
-try:
-    # Currently only available on Python 3.3+ on Linux
-    from os import getxattr, setxattr, listxattr
 
-except ImportError:
-    from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
-    from ctypes.util import find_library
-    libc = CDLL(find_library('c'), use_errno=True)
-
-    def _check(rv, path=None):
-        if rv < 0:
-            raise OSError(get_errno(), path)
-        return rv
-
-    if sys.platform.startswith('linux'):
-        libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
-        libc.llistxattr.restype = c_ssize_t
-        libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
-        libc.flistxattr.restype = c_ssize_t
-        libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
-        libc.lsetxattr.restype = c_int
-        libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
-        libc.fsetxattr.restype = c_int
-        libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
-        libc.lgetxattr.restype = c_ssize_t
-        libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
-        libc.fgetxattr.restype = c_ssize_t
-
-        def listxattr(path, *, follow_symlinks=True):
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.flistxattr
-            elif follow_symlinks:
-                func = libc.listxattr
-            else:
-                func = libc.llistxattr
-            n = _check(func(path, None, 0), path)
-            if n == 0:
-                return []
-            namebuf = create_string_buffer(n)
-            n2 = _check(func(path, namebuf, n), path)
-            if n2 != n:
-                raise Exception('listxattr failed')
-            return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
-
-        def getxattr(path, name, *, follow_symlinks=True):
-            name = os.fsencode(name)
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.fgetxattr
-            elif follow_symlinks:
-                func = libc.getxattr
-            else:
-                func = libc.lgetxattr
-            n = _check(func(path, name, None, 0))
-            if n == 0:
-                return
-            valuebuf = create_string_buffer(n)
-            n2 = _check(func(path, name, valuebuf, n), path)
-            if n2 != n:
-                raise Exception('getxattr failed')
-            return valuebuf.raw
-
-        def setxattr(path, name, value, *, follow_symlinks=True):
-            name = os.fsencode(name)
-            value = os.fsencode(value)
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.fsetxattr
-            elif follow_symlinks:
-                func = libc.setxattr
-            else:
-                func = libc.lsetxattr
-            _check(func(path, name, value, len(value), 0), path)
-
-    elif sys.platform == 'darwin':
-        libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
-        libc.listxattr.restype = c_ssize_t
-        libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
-        libc.flistxattr.restype = c_ssize_t
-        libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
-        libc.setxattr.restype = c_int
-        libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
-        libc.fsetxattr.restype = c_int
-        libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
-        libc.getxattr.restype = c_ssize_t
-        libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
-        libc.fgetxattr.restype = c_ssize_t
-
-        XATTR_NOFOLLOW = 0x0001
-
-        def listxattr(path, *, follow_symlinks=True):
+libc = CDLL(find_library('c'), use_errno=True)
+
+
+def _check(rv, path=None):
+    if rv < 0:
+        raise OSError(get_errno(), path)
+    return rv
+
+if sys.platform.startswith('linux'):
+    libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
+    libc.llistxattr.restype = c_ssize_t
+    libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
+    libc.flistxattr.restype = c_ssize_t
+    libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
+    libc.lsetxattr.restype = c_int
+    libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
+    libc.fsetxattr.restype = c_int
+    libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
+    libc.lgetxattr.restype = c_ssize_t
+    libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
+    libc.fgetxattr.restype = c_ssize_t
+
+    def listxattr(path, *, follow_symlinks=True):
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.flistxattr
+        elif follow_symlinks:
             func = libc.listxattr
-            flags = 0
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.flistxattr
-            elif not follow_symlinks:
-                flags = XATTR_NOFOLLOW
-            n = _check(func(path, None, 0, flags), path)
-            if n == 0:
-                return []
-            namebuf = create_string_buffer(n)
-            n2 = _check(func(path, namebuf, n, flags), path)
-            if n2 != n:
-                raise Exception('listxattr failed')
-            return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
-
-        def getxattr(path, name, *, follow_symlinks=True):
-            name = os.fsencode(name)
+        else:
+            func = libc.llistxattr
+        n = _check(func(path, None, 0), path)
+        if n == 0:
+            return []
+        namebuf = create_string_buffer(n)
+        n2 = _check(func(path, namebuf, n), path)
+        if n2 != n:
+            raise Exception('listxattr failed')
+        return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
+
+    def getxattr(path, name, *, follow_symlinks=True):
+        name = os.fsencode(name)
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.fgetxattr
+        elif follow_symlinks:
             func = libc.getxattr
-            flags = 0
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.fgetxattr
-            elif not follow_symlinks:
-                flags = XATTR_NOFOLLOW
-            n = _check(func(path, name, None, 0, 0, flags))
-            if n == 0:
-                return
-            valuebuf = create_string_buffer(n)
-            n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
-            if n2 != n:
-                raise Exception('getxattr failed')
-            return valuebuf.raw
-
-        def setxattr(path, name, value, *, follow_symlinks=True):
-            name = os.fsencode(name)
-            value = os.fsencode(value)
+        else:
+            func = libc.lgetxattr
+        n = _check(func(path, name, None, 0))
+        if n == 0:
+            return
+        valuebuf = create_string_buffer(n)
+        n2 = _check(func(path, name, valuebuf, n), path)
+        if n2 != n:
+            raise Exception('getxattr failed')
+        return valuebuf.raw
+
+    def setxattr(path, name, value, *, follow_symlinks=True):
+        name = os.fsencode(name)
+        value = os.fsencode(value)
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.fsetxattr
+        elif follow_symlinks:
             func = libc.setxattr
-            flags = 0
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.fsetxattr
-            elif not follow_symlinks:
-                flags = XATTR_NOFOLLOW
-            _check(func(path, name, value, len(value), 0, flags), path)
-
-    elif sys.platform.startswith('freebsd'):
-        EXTATTR_NAMESPACE_USER = 0x0001
-        libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
-        libc.extattr_list_fd.restype = c_ssize_t
-        libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
-        libc.extattr_list_link.restype = c_ssize_t
-        libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
-        libc.extattr_list_file.restype = c_ssize_t
-        libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
-        libc.extattr_get_fd.restype = c_ssize_t
-        libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
-        libc.extattr_get_link.restype = c_ssize_t
-        libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
-        libc.extattr_get_file.restype = c_ssize_t
-        libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
-        libc.extattr_set_fd.restype = c_int
-        libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
-        libc.extattr_set_link.restype = c_int
-        libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
-        libc.extattr_set_file.restype = c_int
-
-        def listxattr(path, *, follow_symlinks=True):
-            ns = EXTATTR_NAMESPACE_USER
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.extattr_list_fd
-            elif follow_symlinks:
-                func = libc.extattr_list_file
-            else:
-                func = libc.extattr_list_link
-            n = _check(func(path, ns, None, 0), path)
-            if n == 0:
-                return []
-            namebuf = create_string_buffer(n)
-            n2 = _check(func(path, ns, namebuf, n), path)
-            if n2 != n:
-                raise Exception('listxattr failed')
-            names = []
-            mv = memoryview(namebuf.raw)
-            while mv:
-                length = mv[0]
-                # Python < 3.3 returns bytes instead of int
-                if isinstance(length, bytes):
-                    length = ord(length)
-                names.append(os.fsdecode(bytes(mv[1:1+length])))
-                mv = mv[1+length:]
-            return names
-
-        def getxattr(path, name, *, follow_symlinks=True):
-            name = os.fsencode(name)
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.extattr_get_fd
-            elif follow_symlinks:
-                func = libc.extattr_get_file
-            else:
-                func = libc.extattr_get_link
-            n = _check(func(path, EXTATTR_NAMESPACE_USER, name, None, 0))
-            if n == 0:
-                return
-            valuebuf = create_string_buffer(n)
-            n2 = _check(func(path, EXTATTR_NAMESPACE_USER, name, valuebuf, n), path)
-            if n2 != n:
-                raise Exception('getxattr failed')
-            return valuebuf.raw
-
-        def setxattr(path, name, value, *, follow_symlinks=True):
-            name = os.fsencode(name)
-            value = os.fsencode(value)
-            if isinstance(path, str):
-                path = os.fsencode(path)
-            if isinstance(path, int):
-                func = libc.extattr_set_fd
-            elif follow_symlinks:
-                func = libc.extattr_set_file
-            else:
-                func = libc.extattr_set_link
-            _check(func(path, EXTATTR_NAMESPACE_USER, name, value, len(value)), path)
-
-    else:
-        raise Exception('Unsupported platform: %s' % sys.platform)
-
-
+        else:
+            func = libc.lsetxattr
+        _check(func(path, name, value, len(value), 0), path)
+
+elif sys.platform == 'darwin':
+    libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
+    libc.listxattr.restype = c_ssize_t
+    libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
+    libc.flistxattr.restype = c_ssize_t
+    libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
+    libc.setxattr.restype = c_int
+    libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
+    libc.fsetxattr.restype = c_int
+    libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
+    libc.getxattr.restype = c_ssize_t
+    libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
+    libc.fgetxattr.restype = c_ssize_t
+
+    XATTR_NOFOLLOW = 0x0001
+
+    def listxattr(path, *, follow_symlinks=True):
+        func = libc.listxattr
+        flags = 0
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.flistxattr
+        elif not follow_symlinks:
+            flags = XATTR_NOFOLLOW
+        n = _check(func(path, None, 0, flags), path)
+        if n == 0:
+            return []
+        namebuf = create_string_buffer(n)
+        n2 = _check(func(path, namebuf, n, flags), path)
+        if n2 != n:
+            raise Exception('listxattr failed')
+        return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
+
+    def getxattr(path, name, *, follow_symlinks=True):
+        name = os.fsencode(name)
+        func = libc.getxattr
+        flags = 0
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.fgetxattr
+        elif not follow_symlinks:
+            flags = XATTR_NOFOLLOW
+        n = _check(func(path, name, None, 0, 0, flags))
+        if n == 0:
+            return
+        valuebuf = create_string_buffer(n)
+        n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
+        if n2 != n:
+            raise Exception('getxattr failed')
+        return valuebuf.raw
+
+    def setxattr(path, name, value, *, follow_symlinks=True):
+        name = os.fsencode(name)
+        value = os.fsencode(value)
+        func = libc.setxattr
+        flags = 0
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.fsetxattr
+        elif not follow_symlinks:
+            flags = XATTR_NOFOLLOW
+        _check(func(path, name, value, len(value), 0, flags), path)
+
+elif sys.platform.startswith('freebsd'):
+    EXTATTR_NAMESPACE_USER = 0x0001
+    libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
+    libc.extattr_list_fd.restype = c_ssize_t
+    libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
+    libc.extattr_list_link.restype = c_ssize_t
+    libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
+    libc.extattr_list_file.restype = c_ssize_t
+    libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
+    libc.extattr_get_fd.restype = c_ssize_t
+    libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
+    libc.extattr_get_link.restype = c_ssize_t
+    libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
+    libc.extattr_get_file.restype = c_ssize_t
+    libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
+    libc.extattr_set_fd.restype = c_int
+    libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
+    libc.extattr_set_link.restype = c_int
+    libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
+    libc.extattr_set_file.restype = c_int
+
+    def listxattr(path, *, follow_symlinks=True):
+        ns = EXTATTR_NAMESPACE_USER
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.extattr_list_fd
+        elif follow_symlinks:
+            func = libc.extattr_list_file
+        else:
+            func = libc.extattr_list_link
+        n = _check(func(path, ns, None, 0), path)
+        if n == 0:
+            return []
+        namebuf = create_string_buffer(n)
+        n2 = _check(func(path, ns, namebuf, n), path)
+        if n2 != n:
+            raise Exception('listxattr failed')
+        names = []
+        mv = memoryview(namebuf.raw)
+        while mv:
+            length = mv[0]
+            # Python < 3.3 returns bytes instead of int
+            if isinstance(length, bytes):
+                length = ord(length)
+            names.append(os.fsdecode(bytes(mv[1:1+length])))
+            mv = mv[1+length:]
+        return names
+
+    def getxattr(path, name, *, follow_symlinks=True):
+        name = os.fsencode(name)
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.extattr_get_fd
+        elif follow_symlinks:
+            func = libc.extattr_get_file
+        else:
+            func = libc.extattr_get_link
+        n = _check(func(path, EXTATTR_NAMESPACE_USER, name, None, 0))
+        if n == 0:
+            return
+        valuebuf = create_string_buffer(n)
+        n2 = _check(func(path, EXTATTR_NAMESPACE_USER, name, valuebuf, n), path)
+        if n2 != n:
+            raise Exception('getxattr failed')
+        return valuebuf.raw
+
+    def setxattr(path, name, value, *, follow_symlinks=True):
+        name = os.fsencode(name)
+        value = os.fsencode(value)
+        if isinstance(path, str):
+            path = os.fsencode(path)
+        if isinstance(path, int):
+            func = libc.extattr_set_fd
+        elif follow_symlinks:
+            func = libc.extattr_set_file
+        else:
+            func = libc.extattr_set_link
+        _check(func(path, EXTATTR_NAMESPACE_USER, name, value, len(value)), path)
+
+else:
+    raise Exception('Unsupported platform: %s' % sys.platform)