xattr.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """A basic extended attributes (xattr) implementation for Linux and MacOS X
  2. """
  3. import errno
  4. import os
  5. import sys
  6. import tempfile
  7. def is_enabled():
  8. """Determine if xattr is enabled on the filesystem
  9. """
  10. with tempfile.NamedTemporaryFile() as fd:
  11. try:
  12. setxattr(fd.fileno(), 'user.name', b'value')
  13. except OSError:
  14. return False
  15. return getxattr(fd.fileno(), 'user.name') == b'value'
  16. def get_all(path, follow_symlinks=True):
  17. return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
  18. for name in listxattr(path, follow_symlinks=follow_symlinks))
  19. try:
  20. # Currently only available on Python 3.3+ on Linux
  21. from os import getxattr, setxattr, listxattr2
  22. except ImportError:
  23. from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
  24. from ctypes.util import find_library
  25. libc = CDLL(find_library('c'), use_errno=True)
  26. def _check(rv, path=None):
  27. if rv < 0:
  28. raise OSError(get_errno(), path)
  29. return rv
  30. if sys.platform.startswith('linux'):
  31. libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
  32. libc.llistxattr.restype = c_ssize_t
  33. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  34. libc.flistxattr.restype = c_ssize_t
  35. libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
  36. libc.lsetxattr.restype = c_int
  37. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
  38. libc.fsetxattr.restype = c_int
  39. libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
  40. libc.lgetxattr.restype = c_ssize_t
  41. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
  42. libc.fgetxattr.restype = c_ssize_t
  43. def listxattr(path, *, follow_symlinks=True):
  44. if isinstance(path, str):
  45. path = os.fsencode(path)
  46. if isinstance(path, int):
  47. func = libc.flistxattr
  48. elif follow_symlinks:
  49. func = libc.listxattr
  50. else:
  51. func = libc.llistxattr
  52. n = _check(func(path, None, 0), path)
  53. if n == 0:
  54. return []
  55. namebuf = create_string_buffer(n)
  56. n2 = _check(func(path, namebuf, n), path)
  57. if n2 != n:
  58. raise Exception('listxattr failed')
  59. return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
  60. def getxattr(path, name, *, follow_symlinks=True):
  61. name = os.fsencode(name)
  62. if isinstance(path, str):
  63. path = os.fsencode(path)
  64. if isinstance(path, int):
  65. func = libc.fgetxattr
  66. elif follow_symlinks:
  67. func = libc.getxattr
  68. else:
  69. func = libc.lgetxattr
  70. n = _check(func(path, name, None, 0))
  71. if n == 0:
  72. return
  73. valuebuf = create_string_buffer(n)
  74. n2 = _check(func(path, name, valuebuf, n), path)
  75. if n2 != n:
  76. raise Exception('getxattr failed')
  77. return valuebuf.raw
  78. def setxattr(path, name, value, *, follow_symlinks=True):
  79. name = os.fsencode(name)
  80. value = os.fsencode(value)
  81. if isinstance(path, str):
  82. path = os.fsencode(path)
  83. if isinstance(path, int):
  84. func = libc.fsetxattr
  85. elif follow_symlinks:
  86. func = libc.setxattr
  87. else:
  88. func = libc.lsetxattr
  89. _check(func(path, name, value, len(value), 0), path)
  90. elif sys.platform == 'darwin':
  91. libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
  92. libc.listxattr.restype = c_ssize_t
  93. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  94. libc.flistxattr.restype = c_ssize_t
  95. libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  96. libc.setxattr.restype = c_int
  97. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  98. libc.fsetxattr.restype = c_int
  99. libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  100. libc.getxattr.restype = c_ssize_t
  101. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  102. libc.fgetxattr.restype = c_ssize_t
  103. XATTR_NOFOLLOW = 0x0001
  104. def listxattr(path, *, follow_symlinks=True):
  105. func = libc.listxattr
  106. flags = 0
  107. if isinstance(path, str):
  108. path = os.fsencode(path)
  109. if isinstance(path, int):
  110. func = libc.flistxattr
  111. elif not follow_symlinks:
  112. flags = XATTR_NOFOLLOW
  113. try:
  114. n = _check(func(path, None, 0, flags), path)
  115. except OSError as e:
  116. if e.errno == errno.EPERM:
  117. return []
  118. raise
  119. if n == 0:
  120. return []
  121. namebuf = create_string_buffer(n)
  122. n2 = _check(func(path, namebuf, n, flags), path)
  123. if n2 != n:
  124. raise Exception('listxattr failed')
  125. return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
  126. def getxattr(path, name, *, follow_symlinks=True):
  127. name = os.fsencode(name)
  128. func = libc.getxattr
  129. flags = 0
  130. if isinstance(path, str):
  131. path = os.fsencode(path)
  132. if isinstance(path, int):
  133. func = libc.fgetxattr
  134. elif not follow_symlinks:
  135. flags = XATTR_NOFOLLOW
  136. n = _check(func(path, name, None, 0, 0, flags))
  137. if n == 0:
  138. return
  139. valuebuf = create_string_buffer(n)
  140. n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
  141. if n2 != n:
  142. raise Exception('getxattr failed')
  143. return valuebuf.raw
  144. def setxattr(path, name, value, *, follow_symlinks=True):
  145. name = os.fsencode(name)
  146. value = os.fsencode(value)
  147. func = libc.setxattr
  148. flags = 0
  149. if isinstance(path, str):
  150. path = os.fsencode(path)
  151. if isinstance(path, int):
  152. func = libc.fsetxattr
  153. elif not follow_symlinks:
  154. flags = XATTR_NOFOLLOW
  155. _check(func(path, name, value, len(value), 0, flags), path)
  156. elif sys.platform.startswith('freebsd'):
  157. EXTATTR_NAMESPACE_USER = 0x0001
  158. libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
  159. libc.extattr_list_fd.restype = c_ssize_t
  160. libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
  161. libc.extattr_list_link.restype = c_ssize_t
  162. libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
  163. libc.extattr_list_file.restype = c_ssize_t
  164. libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
  165. libc.extattr_get_fd.restype = c_ssize_t
  166. libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  167. libc.extattr_get_link.restype = c_ssize_t
  168. libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  169. libc.extattr_get_file.restype = c_ssize_t
  170. libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
  171. libc.extattr_set_fd.restype = c_int
  172. libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  173. libc.extattr_set_link.restype = c_int
  174. libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
  175. libc.extattr_set_file.restype = c_int
  176. def listxattr(path, *, follow_symlinks=True):
  177. ns = EXTATTR_NAMESPACE_USER
  178. if isinstance(path, str):
  179. path = os.fsencode(path)
  180. if isinstance(path, int):
  181. func = libc.extattr_list_fd
  182. elif follow_symlinks:
  183. func = libc.extattr_list_file
  184. else:
  185. func = libc.extattr_list_link
  186. try:
  187. n = _check(func(path, ns, None, 0), path)
  188. except OSError as e:
  189. if e.errno == errno.ENOTSUP:
  190. return []
  191. raise
  192. if n == 0:
  193. return []
  194. namebuf = create_string_buffer(n)
  195. n2 = _check(func(path, ns, namebuf, n), path)
  196. if n2 != n:
  197. raise Exception('listxattr failed')
  198. names = []
  199. mv = memoryview(namebuf.raw)
  200. while mv:
  201. length = mv[0]
  202. # Python < 3.3 returns bytes instead of int
  203. if isinstance(length, bytes):
  204. length = ord(length)
  205. names.append(os.fsdecode(bytes(mv[1:1+length])))
  206. mv = mv[1+length:]
  207. return names
  208. def getxattr(path, name, *, follow_symlinks=True):
  209. name = os.fsencode(name)
  210. if isinstance(path, str):
  211. path = os.fsencode(path)
  212. if isinstance(path, int):
  213. func = libc.extattr_get_fd
  214. elif follow_symlinks:
  215. func = libc.extattr_get_file
  216. else:
  217. func = libc.extattr_get_link
  218. n = _check(func(path, EXTATTR_NAMESPACE_USER, name, None, 0))
  219. if n == 0:
  220. return
  221. valuebuf = create_string_buffer(n)
  222. n2 = _check(func(path, EXTATTR_NAMESPACE_USER, name, valuebuf, n), path)
  223. if n2 != n:
  224. raise Exception('getxattr failed')
  225. return valuebuf.raw
  226. def setxattr(path, name, value, *, follow_symlinks=True):
  227. name = os.fsencode(name)
  228. value = os.fsencode(value)
  229. if isinstance(path, str):
  230. path = os.fsencode(path)
  231. if isinstance(path, int):
  232. func = libc.extattr_set_fd
  233. elif follow_symlinks:
  234. func = libc.extattr_set_file
  235. else:
  236. func = libc.extattr_set_link
  237. _check(func(path, EXTATTR_NAMESPACE_USER, name, value, len(value)), path)
  238. else:
  239. raise Exception('Unsupported platform: %s' % sys.platform)