| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 | 
							- """A basic extended attributes (xattr) implementation for Linux and MacOS X
 
- """
 
- import errno
 
- import os
 
- import sys
 
- import tempfile
 
- def is_enabled():
 
-     """Determine if xattr is enabled on the filesystem
 
-     """
 
-     with tempfile.NamedTemporaryFile() as fd:
 
-         try:
 
-             setxattr(fd.fileno(), 'user.name', b'value')
 
-         except OSError:
 
-             return False
 
-         return getxattr(fd.fileno(), 'user.name') == b'value'
 
- def get_all(path, follow_symlinks=True):
 
-     return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
 
-                 for name in listxattr(path, follow_symlinks=follow_symlinks))
 
- try:
 
-     # Currently only available on Python 3.3+ on Linux
 
-     from os import getxattr, setxattr, listxattr2
 
- except ImportError:
 
-     from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
 
-     from ctypes.util import find_library
 
-     libc = CDLL(find_library('c'), use_errno=True)
 
-     def _check(rv, path=None):
 
-         if rv < 0:
 
-             raise OSError(get_errno(), path)
 
-         return rv
 
-     if sys.platform.startswith('linux'):
 
-         libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
 
-         libc.llistxattr.restype = c_ssize_t
 
-         libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
 
-         libc.flistxattr.restype = c_ssize_t
 
-         libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
 
-         libc.lsetxattr.restype = c_int
 
-         libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
 
-         libc.fsetxattr.restype = c_int
 
-         libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
 
-         libc.lgetxattr.restype = c_ssize_t
 
-         libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.fgetxattr.restype = c_ssize_t
 
-         def listxattr(path, *, follow_symlinks=True):
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.flistxattr
 
-             elif follow_symlinks:
 
-                 func = libc.listxattr
 
-             else:
 
-                 func = libc.llistxattr
 
-             n = _check(func(path, None, 0), path)
 
-             if n == 0:
 
-                 return []
 
-             namebuf = create_string_buffer(n)
 
-             n2 = _check(func(path, namebuf, n), path)
 
-             if n2 != n:
 
-                 raise Exception('listxattr failed')
 
-             return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
 
-         def getxattr(path, name, *, follow_symlinks=True):
 
-             name = os.fsencode(name)
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.fgetxattr
 
-             elif follow_symlinks:
 
-                 func = libc.getxattr
 
-             else:
 
-                 func = libc.lgetxattr
 
-             n = _check(func(path, name, None, 0))
 
-             if n == 0:
 
-                 return
 
-             valuebuf = create_string_buffer(n)
 
-             n2 = _check(func(path, name, valuebuf, n), path)
 
-             if n2 != n:
 
-                 raise Exception('getxattr failed')
 
-             return valuebuf.raw
 
-         def setxattr(path, name, value, *, follow_symlinks=True):
 
-             name = os.fsencode(name)
 
-             value = os.fsencode(value)
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.fsetxattr
 
-             elif follow_symlinks:
 
-                 func = libc.setxattr
 
-             else:
 
-                 func = libc.lsetxattr
 
-             _check(func(path, name, value, len(value), 0), path)
 
-     elif sys.platform == 'darwin':
 
-         libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
 
-         libc.listxattr.restype = c_ssize_t
 
-         libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
 
-         libc.flistxattr.restype = c_ssize_t
 
-         libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
 
-         libc.setxattr.restype = c_int
 
-         libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
 
-         libc.fsetxattr.restype = c_int
 
-         libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
 
-         libc.getxattr.restype = c_ssize_t
 
-         libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
 
-         libc.fgetxattr.restype = c_ssize_t
 
-         XATTR_NOFOLLOW = 0x0001
 
-         def listxattr(path, *, follow_symlinks=True):
 
-             func = libc.listxattr
 
-             flags = 0
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.flistxattr
 
-             elif not follow_symlinks:
 
-                 flags = XATTR_NOFOLLOW
 
-             try:
 
-                 n = _check(func(path, None, 0, flags), path)
 
-             except OSError as e:
 
-                 if e.errno == errno.EPERM:
 
-                     return []
 
-                 raise
 
-             if n == 0:
 
-                 return []
 
-             namebuf = create_string_buffer(n)
 
-             n2 = _check(func(path, namebuf, n, flags), path)
 
-             if n2 != n:
 
-                 raise Exception('listxattr failed')
 
-             return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
 
-         def getxattr(path, name, *, follow_symlinks=True):
 
-             name = os.fsencode(name)
 
-             func = libc.getxattr
 
-             flags = 0
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.fgetxattr
 
-             elif not follow_symlinks:
 
-                 flags = XATTR_NOFOLLOW
 
-             n = _check(func(path, name, None, 0, 0, flags))
 
-             if n == 0:
 
-                 return
 
-             valuebuf = create_string_buffer(n)
 
-             n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
 
-             if n2 != n:
 
-                 raise Exception('getxattr failed')
 
-             return valuebuf.raw
 
-         def setxattr(path, name, value, *, follow_symlinks=True):
 
-             name = os.fsencode(name)
 
-             value = os.fsencode(value)
 
-             func = libc.setxattr
 
-             flags = 0
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.fsetxattr
 
-             elif not follow_symlinks:
 
-                 flags = XATTR_NOFOLLOW
 
-             _check(func(path, name, value, len(value), 0, flags), path)
 
-     elif sys.platform.startswith('freebsd'):
 
-         EXTATTR_NAMESPACE_USER = 0x0001
 
-         libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
 
-         libc.extattr_list_fd.restype = c_ssize_t
 
-         libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
 
-         libc.extattr_list_link.restype = c_ssize_t
 
-         libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
 
-         libc.extattr_list_file.restype = c_ssize_t
 
-         libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.extattr_get_fd.restype = c_ssize_t
 
-         libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.extattr_get_link.restype = c_ssize_t
 
-         libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.extattr_get_file.restype = c_ssize_t
 
-         libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.extattr_set_fd.restype = c_int
 
-         libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.extattr_set_link.restype = c_int
 
-         libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
 
-         libc.extattr_set_file.restype = c_int
 
-         def listxattr(path, *, follow_symlinks=True):
 
-             ns = EXTATTR_NAMESPACE_USER
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.extattr_list_fd
 
-             elif follow_symlinks:
 
-                 func = libc.extattr_list_file
 
-             else:
 
-                 func = libc.extattr_list_link
 
-             try:
 
-                 n = _check(func(path, ns, None, 0), path)
 
-             except OSError as e:
 
-                 if e.errno == errno.ENOTSUP:
 
-                     return []
 
-                 raise
 
-             if n == 0:
 
-                 return []
 
-             namebuf = create_string_buffer(n)
 
-             n2 = _check(func(path, ns, namebuf, n), path)
 
-             if n2 != n:
 
-                 raise Exception('listxattr failed')
 
-             names = []
 
-             mv = memoryview(namebuf.raw)
 
-             while mv:
 
-                 length = mv[0]
 
-                 # Python < 3.3 returns bytes instead of int
 
-                 if isinstance(length, bytes):
 
-                     length = ord(length)
 
-                 names.append(os.fsdecode(bytes(mv[1:1+length])))
 
-                 mv = mv[1+length:]
 
-             return names
 
-         def getxattr(path, name, *, follow_symlinks=True):
 
-             name = os.fsencode(name)
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.extattr_get_fd
 
-             elif follow_symlinks:
 
-                 func = libc.extattr_get_file
 
-             else:
 
-                 func = libc.extattr_get_link
 
-             n = _check(func(path, EXTATTR_NAMESPACE_USER, name, None, 0))
 
-             if n == 0:
 
-                 return
 
-             valuebuf = create_string_buffer(n)
 
-             n2 = _check(func(path, EXTATTR_NAMESPACE_USER, name, valuebuf, n), path)
 
-             if n2 != n:
 
-                 raise Exception('getxattr failed')
 
-             return valuebuf.raw
 
-         def setxattr(path, name, value, *, follow_symlinks=True):
 
-             name = os.fsencode(name)
 
-             value = os.fsencode(value)
 
-             if isinstance(path, str):
 
-                 path = os.fsencode(path)
 
-             if isinstance(path, int):
 
-                 func = libc.extattr_set_fd
 
-             elif follow_symlinks:
 
-                 func = libc.extattr_set_file
 
-             else:
 
-                 func = libc.extattr_set_link
 
-             _check(func(path, EXTATTR_NAMESPACE_USER, name, value, len(value)), path)
 
-     else:
 
-         raise Exception('Unsupported platform: %s' % sys.platform)
 
 
  |