123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- """A basic extended attributes (xattr) implementation for Linux and MacOS X
- On Linux only the "user." namespace is accessed
- """
- import os
- import sys
- from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
- from ctypes.util import find_library
- libc = CDLL(find_library('c'), use_errno=True)
- def set(path_or_fd, name, value):
- if isinstance(path_or_fd, int):
- fsetxattr(path_or_fd, name, value)
- else:
- lsetxattr(path_or_fd, name, value)
- def get_all(path_or_fd):
- """Return a dictionary with all (user) xattrs for "path_or_fd"
- Symbolic links are not followed
- """
- if isinstance(path_or_fd, int):
- return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd))
- else:
- return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd))
- def _check(rv, path=None):
- if rv < 0:
- raise OSError(get_errno(), path)
- return rv
- if sys.platform.startswith('linux'):
- libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
- libc.llistxattr.restype = c_ssize_t
- libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
- libc.flistxattr.restype = c_ssize_t
- libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
- libc.lsetxattr.restype = c_int
- libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
- libc.fsetxattr.restype = c_int
- libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
- libc.lgetxattr.restype = c_ssize_t
- libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
- libc.fgetxattr.restype = c_ssize_t
- def llistxattr(path):
- path = os.fsencode(path)
- n = _check(libc.llistxattr(path, None, 0), path)
- if n == 0:
- []
- namebuf = create_string_buffer(n)
- n2 = _check(libc.llistxattr(path, namebuf, n))
- if n2 != n:
- raise Exception('llistxattr failed')
- return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
- def flistxattr(fd):
- n = _check(libc.flistxattr(fd, None, 0))
- if n == 0:
- []
- namebuf = create_string_buffer(n)
- n2 = _check(libc.flistxattr(fd, namebuf, n))
- if n2 != n:
- raise Exception('flistxattr failed')
- return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
- def lsetxattr(path, name, value, flags=0):
- _check(libc.lsetxattr(os.fsencode(path), b'user.' + name, value, len(value), flags), path)
- def fsetxattr(fd, name, value, flags=0):
- _check(libc.fsetxattr(fd, b'user.' + name, value, len(value), flags))
- def lgetxattr(path, name):
- path = os.fsencode(path)
- name = b'user.' + name
- n = _check(libc.lgetxattr(path, name, None, 0))
- if n == 0:
- return None
- valuebuf = create_string_buffer(n)
- n2 = _check(libc.lgetxattr(path, name, valuebuf, n), path)
- if n2 != n:
- raise Exception('lgetxattr failed')
- return valuebuf.raw
- def fgetxattr(fd, name):
- name = b'user.' + name
- n = _check(libc.fgetxattr(fd, name, None, 0))
- if n == 0:
- return None
- valuebuf = create_string_buffer(n)
- n2 = _check(libc.fgetxattr(fd, name, valuebuf, n))
- if n2 != n:
- raise Exception('fgetxattr failed')
- return valuebuf.raw
- elif sys.platform == 'darwin':
- libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
- libc.listxattr.restype = c_ssize_t
- libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
- libc.flistxattr.restype = c_ssize_t
- libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
- libc.setxattr.restype = c_int
- libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
- libc.fsetxattr.restype = c_int
- libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
- libc.getxattr.restype = c_ssize_t
- libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
- libc.fgetxattr.restype = c_ssize_t
- XATTR_NOFOLLOW = 0x0001
- def llistxattr(path):
- path = os.fsencode(path)
- n = _check(libc.listxattr(path, None, 0, XATTR_NOFOLLOW), path)
- if n == 0:
- []
- namebuf = create_string_buffer(n)
- n2 = _check(libc.listxattr(path, namebuf, n, XATTR_NOFOLLOW))
- if n2 != n:
- raise Exception('llistxattr failed')
- return namebuf.raw.split(b'\0')[:-1]
- def flistxattr(fd):
- n = _check(libc.flistxattr(fd, None, 0, 0))
- if n == 0:
- []
- namebuf = create_string_buffer(n)
- n2 = _check(libc.flistxattr(fd, namebuf, n, 0))
- if n2 != n:
- raise Exception('flistxattr failed')
- return namebuf.raw.split(b'\0')[:-1]
- def lsetxattr(path, name, value, flags=XATTR_NOFOLLOW):
- _check(libc.setxattr(os.fsencode(path), name, value, len(value), 0, flags), path)
- def fsetxattr(fd, name, value, flags=0):
- _check(libc.fsetxattr(fd, name, value, len(value), 0, flags))
- def lgetxattr(path, name):
- path = os.fsencode(path)
- n = _check(libc.getxattr(path, name, None, 0, 0, XATTR_NOFOLLOW), path)
- if n == 0:
- return None
- valuebuf = create_string_buffer(n)
- n2 = _check(libc.getxattr(path, name, valuebuf, n, 0, XATTR_NOFOLLOW))
- if n2 != n:
- raise Exception('getxattr failed')
- return valuebuf.raw
- def fgetxattr(fd, name):
- n = _check(libc.fgetxattr(fd, name, None, 0, 0, 0))
- if n == 0:
- return None
- valuebuf = create_string_buffer(n)
- n2 = _check(libc.fgetxattr(fd, name, valuebuf, n, 0, 0))
- if n2 != n:
- Exception('fgetxattr failed')
- return valuebuf.raw
- else:
- raise Exception('Unsupported platform: %s' % sys.platform)
|