xattr.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """A basic extended attributes (xattr) implementation for Linux and MacOS X
  2. On Linux only the "user." namespace is accessed
  3. """
  4. import os
  5. import sys
  6. import tempfile
  7. from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
  8. from ctypes.util import find_library
  9. libc = CDLL(find_library('c'), use_errno=True)
  10. def is_enabled():
  11. """Determine if xattr is enabled on the filesystem
  12. """
  13. with tempfile.TemporaryFile() as fd:
  14. try:
  15. set(fd.fileno(), b'name', b'value')
  16. except OSError:
  17. return False
  18. return get_all(fd.fileno()) == {b'name': b'value'}
  19. def set(path_or_fd, name, value):
  20. if isinstance(path_or_fd, int):
  21. fsetxattr(path_or_fd, name, value)
  22. else:
  23. lsetxattr(path_or_fd, name, value)
  24. def get_all(path_or_fd):
  25. """Return a dictionary with all (user) xattrs for "path_or_fd"
  26. Symbolic links are not followed
  27. """
  28. if isinstance(path_or_fd, int):
  29. return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd))
  30. else:
  31. return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd))
  32. def _check(rv, path=None):
  33. if rv < 0:
  34. raise OSError(get_errno(), path)
  35. return rv
  36. if sys.platform.startswith('linux'):
  37. libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
  38. libc.llistxattr.restype = c_ssize_t
  39. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  40. libc.flistxattr.restype = c_ssize_t
  41. libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
  42. libc.lsetxattr.restype = c_int
  43. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
  44. libc.fsetxattr.restype = c_int
  45. libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
  46. libc.lgetxattr.restype = c_ssize_t
  47. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
  48. libc.fgetxattr.restype = c_ssize_t
  49. def llistxattr(path):
  50. path = os.fsencode(path)
  51. n = _check(libc.llistxattr(path, None, 0), path)
  52. if n == 0:
  53. []
  54. namebuf = create_string_buffer(n)
  55. n2 = _check(libc.llistxattr(path, namebuf, n))
  56. if n2 != n:
  57. raise Exception('llistxattr failed')
  58. return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
  59. def flistxattr(fd):
  60. n = _check(libc.flistxattr(fd, None, 0))
  61. if n == 0:
  62. []
  63. namebuf = create_string_buffer(n)
  64. n2 = _check(libc.flistxattr(fd, namebuf, n))
  65. if n2 != n:
  66. raise Exception('flistxattr failed')
  67. return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
  68. def lsetxattr(path, name, value, flags=0):
  69. _check(libc.lsetxattr(os.fsencode(path), b'user.' + name, value, len(value), flags), path)
  70. def fsetxattr(fd, name, value, flags=0):
  71. _check(libc.fsetxattr(fd, b'user.' + name, value, len(value), flags))
  72. def lgetxattr(path, name):
  73. path = os.fsencode(path)
  74. name = b'user.' + name
  75. n = _check(libc.lgetxattr(path, name, None, 0))
  76. if n == 0:
  77. return None
  78. valuebuf = create_string_buffer(n)
  79. n2 = _check(libc.lgetxattr(path, name, valuebuf, n), path)
  80. if n2 != n:
  81. raise Exception('lgetxattr failed')
  82. return valuebuf.raw
  83. def fgetxattr(fd, name):
  84. name = b'user.' + name
  85. n = _check(libc.fgetxattr(fd, name, None, 0))
  86. if n == 0:
  87. return None
  88. valuebuf = create_string_buffer(n)
  89. n2 = _check(libc.fgetxattr(fd, name, valuebuf, n))
  90. if n2 != n:
  91. raise Exception('fgetxattr failed')
  92. return valuebuf.raw
  93. elif sys.platform == 'darwin':
  94. libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
  95. libc.listxattr.restype = c_ssize_t
  96. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  97. libc.flistxattr.restype = c_ssize_t
  98. libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  99. libc.setxattr.restype = c_int
  100. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  101. libc.fsetxattr.restype = c_int
  102. libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  103. libc.getxattr.restype = c_ssize_t
  104. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  105. libc.fgetxattr.restype = c_ssize_t
  106. XATTR_NOFOLLOW = 0x0001
  107. def llistxattr(path):
  108. path = os.fsencode(path)
  109. n = _check(libc.listxattr(path, None, 0, XATTR_NOFOLLOW), path)
  110. if n == 0:
  111. []
  112. namebuf = create_string_buffer(n)
  113. n2 = _check(libc.listxattr(path, namebuf, n, XATTR_NOFOLLOW))
  114. if n2 != n:
  115. raise Exception('llistxattr failed')
  116. return namebuf.raw.split(b'\0')[:-1]
  117. def flistxattr(fd):
  118. n = _check(libc.flistxattr(fd, None, 0, 0))
  119. if n == 0:
  120. []
  121. namebuf = create_string_buffer(n)
  122. n2 = _check(libc.flistxattr(fd, namebuf, n, 0))
  123. if n2 != n:
  124. raise Exception('flistxattr failed')
  125. return namebuf.raw.split(b'\0')[:-1]
  126. def lsetxattr(path, name, value, flags=XATTR_NOFOLLOW):
  127. _check(libc.setxattr(os.fsencode(path), name, value, len(value), 0, flags), path)
  128. def fsetxattr(fd, name, value, flags=0):
  129. _check(libc.fsetxattr(fd, name, value, len(value), 0, flags))
  130. def lgetxattr(path, name):
  131. path = os.fsencode(path)
  132. n = _check(libc.getxattr(path, name, None, 0, 0, XATTR_NOFOLLOW), path)
  133. if n == 0:
  134. return None
  135. valuebuf = create_string_buffer(n)
  136. n2 = _check(libc.getxattr(path, name, valuebuf, n, 0, XATTR_NOFOLLOW))
  137. if n2 != n:
  138. raise Exception('getxattr failed')
  139. return valuebuf.raw
  140. def fgetxattr(fd, name):
  141. n = _check(libc.fgetxattr(fd, name, None, 0, 0, 0))
  142. if n == 0:
  143. return None
  144. valuebuf = create_string_buffer(n)
  145. n2 = _check(libc.fgetxattr(fd, name, valuebuf, n, 0, 0))
  146. if n2 != n:
  147. Exception('fgetxattr failed')
  148. return valuebuf.raw
  149. else:
  150. raise Exception('Unsupported platform: %s' % sys.platform)