xattr.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """A basic extended attributes (xattr) implementation for Linux and MacOS X
  2. """
  3. import errno
  4. import os
  5. import sys
  6. import tempfile
  7. def is_enabled():
  8. """Determine if xattr is enabled on the filesystem
  9. """
  10. with tempfile.NamedTemporaryFile() as fd:
  11. try:
  12. setxattr(fd.fileno(), 'user.name', b'value')
  13. except OSError:
  14. return False
  15. return getxattr(fd.fileno(), 'user.name') == b'value'
  16. def get_all(path, follow_symlinks=True):
  17. try:
  18. return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
  19. for name in listxattr(path, follow_symlinks=follow_symlinks))
  20. except OSError as e:
  21. if e.errno in (errno.ENOTSUP, errno.EPERM):
  22. return {}
  23. try:
  24. # Currently only available on Python 3.3+ on Linux
  25. from os import getxattr, setxattr, listxattr
  26. except ImportError:
  27. from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
  28. from ctypes.util import find_library
  29. libc = CDLL(find_library('c'), use_errno=True)
  30. def _check(rv, path=None):
  31. if rv < 0:
  32. raise OSError(get_errno(), path)
  33. return rv
  34. if sys.platform.startswith('linux'):
  35. libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
  36. libc.llistxattr.restype = c_ssize_t
  37. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  38. libc.flistxattr.restype = c_ssize_t
  39. libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
  40. libc.lsetxattr.restype = c_int
  41. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
  42. libc.fsetxattr.restype = c_int
  43. libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
  44. libc.lgetxattr.restype = c_ssize_t
  45. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
  46. libc.fgetxattr.restype = c_ssize_t
  47. def listxattr(path, *, follow_symlinks=True):
  48. if isinstance(path, str):
  49. path = os.fsencode(path)
  50. if isinstance(path, int):
  51. func = libc.flistxattr
  52. elif follow_symlinks:
  53. func = libc.listxattr
  54. else:
  55. func = libc.llistxattr
  56. n = _check(func(path, None, 0), path)
  57. if n == 0:
  58. return []
  59. namebuf = create_string_buffer(n)
  60. n2 = _check(func(path, namebuf, n), path)
  61. if n2 != n:
  62. raise Exception('listxattr failed')
  63. return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
  64. def getxattr(path, name, *, follow_symlinks=True):
  65. name = os.fsencode(name)
  66. if isinstance(path, str):
  67. path = os.fsencode(path)
  68. if isinstance(path, int):
  69. func = libc.fgetxattr
  70. elif follow_symlinks:
  71. func = libc.getxattr
  72. else:
  73. func = libc.lgetxattr
  74. n = _check(func(path, name, None, 0))
  75. if n == 0:
  76. return
  77. valuebuf = create_string_buffer(n)
  78. n2 = _check(func(path, name, valuebuf, n), path)
  79. if n2 != n:
  80. raise Exception('getxattr failed')
  81. return valuebuf.raw
  82. def setxattr(path, name, value, *, follow_symlinks=True):
  83. name = os.fsencode(name)
  84. value = os.fsencode(value)
  85. if isinstance(path, str):
  86. path = os.fsencode(path)
  87. if isinstance(path, int):
  88. func = libc.fsetxattr
  89. elif follow_symlinks:
  90. func = libc.setxattr
  91. else:
  92. func = libc.lsetxattr
  93. _check(func(path, name, value, len(value), 0), path)
  94. elif sys.platform == 'darwin':
  95. libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
  96. libc.listxattr.restype = c_ssize_t
  97. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  98. libc.flistxattr.restype = c_ssize_t
  99. libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  100. libc.setxattr.restype = c_int
  101. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  102. libc.fsetxattr.restype = c_int
  103. libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  104. libc.getxattr.restype = c_ssize_t
  105. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  106. libc.fgetxattr.restype = c_ssize_t
  107. XATTR_NOFOLLOW = 0x0001
  108. def listxattr(path, *, follow_symlinks=True):
  109. func = libc.listxattr
  110. flags = 0
  111. if isinstance(path, str):
  112. path = os.fsencode(path)
  113. if isinstance(path, int):
  114. func = libc.flistxattr
  115. elif not follow_symlinks:
  116. flags = XATTR_NOFOLLOW
  117. n = _check(func(path, None, 0, flags), path)
  118. if n == 0:
  119. return []
  120. namebuf = create_string_buffer(n)
  121. n2 = _check(func(path, namebuf, n, flags), path)
  122. if n2 != n:
  123. raise Exception('listxattr failed')
  124. return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
  125. def getxattr(path, name, *, follow_symlinks=True):
  126. name = os.fsencode(name)
  127. func = libc.getxattr
  128. flags = 0
  129. if isinstance(path, str):
  130. path = os.fsencode(path)
  131. if isinstance(path, int):
  132. func = libc.fgetxattr
  133. elif not follow_symlinks:
  134. flags = XATTR_NOFOLLOW
  135. n = _check(func(path, name, None, 0, 0, flags))
  136. if n == 0:
  137. return
  138. valuebuf = create_string_buffer(n)
  139. n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
  140. if n2 != n:
  141. raise Exception('getxattr failed')
  142. return valuebuf.raw
  143. def setxattr(path, name, value, *, follow_symlinks=True):
  144. name = os.fsencode(name)
  145. value = os.fsencode(value)
  146. func = libc.setxattr
  147. flags = 0
  148. if isinstance(path, str):
  149. path = os.fsencode(path)
  150. if isinstance(path, int):
  151. func = libc.fsetxattr
  152. elif not follow_symlinks:
  153. flags = XATTR_NOFOLLOW
  154. _check(func(path, name, value, len(value), 0, flags), path)
  155. elif sys.platform.startswith('freebsd'):
  156. EXTATTR_NAMESPACE_USER = 0x0001
  157. libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
  158. libc.extattr_list_fd.restype = c_ssize_t
  159. libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
  160. libc.extattr_list_link.restype = c_ssize_t
  161. libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
  162. libc.extattr_list_file.restype = c_ssize_t
  163. libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
  164. libc.extattr_get_fd.restype = c_ssize_t
  165. libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  166. libc.extattr_get_link.restype = c_ssize_t
  167. libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  168. libc.extattr_get_file.restype = c_ssize_t
  169. libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
  170. libc.extattr_set_fd.restype = c_int
  171. libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  172. libc.extattr_set_link.restype = c_int
  173. libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  174. libc.extattr_set_file.restype = c_int
  175. def listxattr(path, *, follow_symlinks=True):
  176. ns = EXTATTR_NAMESPACE_USER
  177. if isinstance(path, str):
  178. path = os.fsencode(path)
  179. if isinstance(path, int):
  180. func = libc.extattr_list_fd
  181. elif follow_symlinks:
  182. func = libc.extattr_list_file
  183. else:
  184. func = libc.extattr_list_link
  185. n = _check(func(path, ns, None, 0), path)
  186. if n == 0:
  187. return []
  188. namebuf = create_string_buffer(n)
  189. n2 = _check(func(path, ns, namebuf, n), path)
  190. if n2 != n:
  191. raise Exception('listxattr failed')
  192. names = []
  193. mv = memoryview(namebuf.raw)
  194. while mv:
  195. length = mv[0]
  196. # Python < 3.3 returns bytes instead of int
  197. if isinstance(length, bytes):
  198. length = ord(length)
  199. names.append(os.fsdecode(bytes(mv[1:1+length])))
  200. mv = mv[1+length:]
  201. return names
  202. def getxattr(path, name, *, follow_symlinks=True):
  203. name = os.fsencode(name)
  204. if isinstance(path, str):
  205. path = os.fsencode(path)
  206. if isinstance(path, int):
  207. func = libc.extattr_get_fd
  208. elif follow_symlinks:
  209. func = libc.extattr_get_file
  210. else:
  211. func = libc.extattr_get_link
  212. n = _check(func(path, EXTATTR_NAMESPACE_USER, name, None, 0))
  213. if n == 0:
  214. return
  215. valuebuf = create_string_buffer(n)
  216. n2 = _check(func(path, EXTATTR_NAMESPACE_USER, name, valuebuf, n), path)
  217. if n2 != n:
  218. raise Exception('getxattr failed')
  219. return valuebuf.raw
  220. def setxattr(path, name, value, *, follow_symlinks=True):
  221. name = os.fsencode(name)
  222. value = os.fsencode(value)
  223. if isinstance(path, str):
  224. path = os.fsencode(path)
  225. if isinstance(path, int):
  226. func = libc.extattr_set_fd
  227. elif follow_symlinks:
  228. func = libc.extattr_set_file
  229. else:
  230. func = libc.extattr_set_link
  231. _check(func(path, EXTATTR_NAMESPACE_USER, name, value, len(value)), path)
  232. else:
  233. raise Exception('Unsupported platform: %s' % sys.platform)