xattr.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. """A basic extended attributes (xattr) implementation for Linux and MacOS X
  2. """
  3. import errno
  4. import os
  5. import sys
  6. import tempfile
  7. from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
  8. from ctypes.util import find_library
  9. def is_enabled():
  10. """Determine if xattr is enabled on the filesystem
  11. """
  12. with tempfile.NamedTemporaryFile() as fd:
  13. try:
  14. setxattr(fd.fileno(), 'user.name', b'value')
  15. except OSError:
  16. return False
  17. return getxattr(fd.fileno(), 'user.name') == b'value'
  18. def get_all(path, follow_symlinks=True):
  19. return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
  20. for name in listxattr(path, follow_symlinks=follow_symlinks))
  21. try:
  22. return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
  23. for name in listxattr(path, follow_symlinks=follow_symlinks))
  24. except OSError as e:
  25. if e.errno in (errno.ENOTSUP, errno.EPERM):
  26. return {}
  27. libc = CDLL(find_library('c'), use_errno=True)
  28. def _check(rv, path=None):
  29. if rv < 0:
  30. raise OSError(get_errno(), path)
  31. return rv
  32. if sys.platform.startswith('linux'):
  33. libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
  34. libc.llistxattr.restype = c_ssize_t
  35. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  36. libc.flistxattr.restype = c_ssize_t
  37. libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
  38. libc.lsetxattr.restype = c_int
  39. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
  40. libc.fsetxattr.restype = c_int
  41. libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
  42. libc.lgetxattr.restype = c_ssize_t
  43. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
  44. libc.fgetxattr.restype = c_ssize_t
  45. def listxattr(path, *, follow_symlinks=True):
  46. if isinstance(path, str):
  47. path = os.fsencode(path)
  48. if isinstance(path, int):
  49. func = libc.flistxattr
  50. elif follow_symlinks:
  51. func = libc.listxattr
  52. else:
  53. func = libc.llistxattr
  54. n = _check(func(path, None, 0), path)
  55. if n == 0:
  56. return []
  57. namebuf = create_string_buffer(n)
  58. n2 = _check(func(path, namebuf, n), path)
  59. if n2 != n:
  60. raise Exception('listxattr failed')
  61. return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
  62. def getxattr(path, name, *, follow_symlinks=True):
  63. name = os.fsencode(name)
  64. if isinstance(path, str):
  65. path = os.fsencode(path)
  66. if isinstance(path, int):
  67. func = libc.fgetxattr
  68. elif follow_symlinks:
  69. func = libc.getxattr
  70. else:
  71. func = libc.lgetxattr
  72. n = _check(func(path, name, None, 0))
  73. if n == 0:
  74. return
  75. valuebuf = create_string_buffer(n)
  76. n2 = _check(func(path, name, valuebuf, n), path)
  77. if n2 != n:
  78. raise Exception('getxattr failed')
  79. return valuebuf.raw
  80. def setxattr(path, name, value, *, follow_symlinks=True):
  81. name = os.fsencode(name)
  82. value = os.fsencode(value)
  83. if isinstance(path, str):
  84. path = os.fsencode(path)
  85. if isinstance(path, int):
  86. func = libc.fsetxattr
  87. elif follow_symlinks:
  88. func = libc.setxattr
  89. else:
  90. func = libc.lsetxattr
  91. _check(func(path, name, value, len(value), 0), path)
  92. elif sys.platform == 'darwin':
  93. libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
  94. libc.listxattr.restype = c_ssize_t
  95. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  96. libc.flistxattr.restype = c_ssize_t
  97. libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  98. libc.setxattr.restype = c_int
  99. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  100. libc.fsetxattr.restype = c_int
  101. libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  102. libc.getxattr.restype = c_ssize_t
  103. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  104. libc.fgetxattr.restype = c_ssize_t
  105. XATTR_NOFOLLOW = 0x0001
  106. def listxattr(path, *, follow_symlinks=True):
  107. func = libc.listxattr
  108. flags = 0
  109. if isinstance(path, str):
  110. path = os.fsencode(path)
  111. if isinstance(path, int):
  112. func = libc.flistxattr
  113. elif not follow_symlinks:
  114. flags = XATTR_NOFOLLOW
  115. n = _check(func(path, None, 0, flags), path)
  116. if n == 0:
  117. return []
  118. namebuf = create_string_buffer(n)
  119. n2 = _check(func(path, namebuf, n, flags), path)
  120. if n2 != n:
  121. raise Exception('listxattr failed')
  122. return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
  123. def getxattr(path, name, *, follow_symlinks=True):
  124. name = os.fsencode(name)
  125. func = libc.getxattr
  126. flags = 0
  127. if isinstance(path, str):
  128. path = os.fsencode(path)
  129. if isinstance(path, int):
  130. func = libc.fgetxattr
  131. elif not follow_symlinks:
  132. flags = XATTR_NOFOLLOW
  133. n = _check(func(path, name, None, 0, 0, flags))
  134. if n == 0:
  135. return
  136. valuebuf = create_string_buffer(n)
  137. n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
  138. if n2 != n:
  139. raise Exception('getxattr failed')
  140. return valuebuf.raw
  141. def setxattr(path, name, value, *, follow_symlinks=True):
  142. name = os.fsencode(name)
  143. value = os.fsencode(value)
  144. func = libc.setxattr
  145. flags = 0
  146. if isinstance(path, str):
  147. path = os.fsencode(path)
  148. if isinstance(path, int):
  149. func = libc.fsetxattr
  150. elif not follow_symlinks:
  151. flags = XATTR_NOFOLLOW
  152. _check(func(path, name, value, len(value), 0, flags), path)
  153. elif sys.platform.startswith('freebsd'):
  154. EXTATTR_NAMESPACE_USER = 0x0001
  155. libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
  156. libc.extattr_list_fd.restype = c_ssize_t
  157. libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
  158. libc.extattr_list_link.restype = c_ssize_t
  159. libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
  160. libc.extattr_list_file.restype = c_ssize_t
  161. libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
  162. libc.extattr_get_fd.restype = c_ssize_t
  163. libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  164. libc.extattr_get_link.restype = c_ssize_t
  165. libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  166. libc.extattr_get_file.restype = c_ssize_t
  167. libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
  168. libc.extattr_set_fd.restype = c_int
  169. libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  170. libc.extattr_set_link.restype = c_int
  171. libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  172. libc.extattr_set_file.restype = c_int
  173. def listxattr(path, *, follow_symlinks=True):
  174. ns = EXTATTR_NAMESPACE_USER
  175. if isinstance(path, str):
  176. path = os.fsencode(path)
  177. if isinstance(path, int):
  178. func = libc.extattr_list_fd
  179. elif follow_symlinks:
  180. func = libc.extattr_list_file
  181. else:
  182. func = libc.extattr_list_link
  183. n = _check(func(path, ns, None, 0), path)
  184. if n == 0:
  185. return []
  186. namebuf = create_string_buffer(n)
  187. n2 = _check(func(path, ns, namebuf, n), path)
  188. if n2 != n:
  189. raise Exception('listxattr failed')
  190. names = []
  191. mv = memoryview(namebuf.raw)
  192. while mv:
  193. length = mv[0]
  194. # Python < 3.3 returns bytes instead of int
  195. if isinstance(length, bytes):
  196. length = ord(length)
  197. names.append(os.fsdecode(bytes(mv[1:1+length])))
  198. mv = mv[1+length:]
  199. return names
  200. def getxattr(path, name, *, follow_symlinks=True):
  201. name = os.fsencode(name)
  202. if isinstance(path, str):
  203. path = os.fsencode(path)
  204. if isinstance(path, int):
  205. func = libc.extattr_get_fd
  206. elif follow_symlinks:
  207. func = libc.extattr_get_file
  208. else:
  209. func = libc.extattr_get_link
  210. n = _check(func(path, EXTATTR_NAMESPACE_USER, name, None, 0))
  211. if n == 0:
  212. return
  213. valuebuf = create_string_buffer(n)
  214. n2 = _check(func(path, EXTATTR_NAMESPACE_USER, name, valuebuf, n), path)
  215. if n2 != n:
  216. raise Exception('getxattr failed')
  217. return valuebuf.raw
  218. def setxattr(path, name, value, *, follow_symlinks=True):
  219. name = os.fsencode(name)
  220. value = os.fsencode(value)
  221. if isinstance(path, str):
  222. path = os.fsencode(path)
  223. if isinstance(path, int):
  224. func = libc.extattr_set_fd
  225. elif follow_symlinks:
  226. func = libc.extattr_set_file
  227. else:
  228. func = libc.extattr_set_link
  229. _check(func(path, EXTATTR_NAMESPACE_USER, name, value, len(value)), path)
  230. else:
  231. raise Exception('Unsupported platform: %s' % sys.platform)