xattr.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. """A basic extended attributes (xattr) implementation for Linux and MacOS X
  2. On Linux only the "user." namespace is accessed
  3. """
  4. import os
  5. import sys
  6. from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
  7. from ctypes.util import find_library
  8. libc = CDLL(find_library('c'), use_errno=True)
  9. def set(path_or_fd, name, value):
  10. if isinstance(path_or_fd, int):
  11. fsetxattr(path_or_fd, name, value)
  12. else:
  13. lsetxattr(path_or_fd, name, value)
  14. def get_all(path_or_fd):
  15. """Return a dictionary with all (user) xattrs for "path_or_fd"
  16. Symbolic links are not followed
  17. """
  18. if isinstance(path_or_fd, int):
  19. return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd))
  20. else:
  21. return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd))
  22. def _check(rv, path=None):
  23. if rv < 0:
  24. raise OSError(get_errno(), path)
  25. return rv
  26. if sys.platform.startswith('linux'):
  27. libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
  28. libc.llistxattr.restype = c_ssize_t
  29. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  30. libc.flistxattr.restype = c_ssize_t
  31. libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
  32. libc.lsetxattr.restype = c_int
  33. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
  34. libc.fsetxattr.restype = c_int
  35. libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
  36. libc.lgetxattr.restype = c_ssize_t
  37. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
  38. libc.fgetxattr.restype = c_ssize_t
  39. def llistxattr(path):
  40. path = os.fsencode(path)
  41. n = _check(libc.llistxattr(path, None, 0), path)
  42. if n == 0:
  43. []
  44. namebuf = create_string_buffer(n)
  45. n2 = _check(libc.llistxattr(path, namebuf, n))
  46. if n2 != n:
  47. raise Exception('llistxattr failed')
  48. return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
  49. def flistxattr(fd):
  50. n = _check(libc.flistxattr(fd, None, 0))
  51. if n == 0:
  52. []
  53. namebuf = create_string_buffer(n)
  54. n2 = _check(libc.flistxattr(fd, namebuf, n))
  55. if n2 != n:
  56. raise Exception('flistxattr failed')
  57. return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
  58. def lsetxattr(path, name, value, flags=0):
  59. _check(libc.lsetxattr(os.fsencode(path), b'user.' + name, value, len(value), flags), path)
  60. def fsetxattr(fd, name, value, flags=0):
  61. _check(libc.fsetxattr(fd, b'user.' + name, value, len(value), flags))
  62. def lgetxattr(path, name):
  63. path = os.fsencode(path)
  64. name = b'user.' + name
  65. n = _check(libc.lgetxattr(path, name, None, 0))
  66. if n == 0:
  67. return None
  68. valuebuf = create_string_buffer(n)
  69. n2 = _check(libc.lgetxattr(path, name, valuebuf, n), path)
  70. if n2 != n:
  71. raise Exception('lgetxattr failed')
  72. return valuebuf.raw
  73. def fgetxattr(fd, name):
  74. name = b'user.' + name
  75. n = _check(libc.fgetxattr(fd, name, None, 0))
  76. if n == 0:
  77. return None
  78. valuebuf = create_string_buffer(n)
  79. n2 = _check(libc.fgetxattr(fd, name, valuebuf, n))
  80. if n2 != n:
  81. raise Exception('fgetxattr failed')
  82. return valuebuf.raw
  83. elif sys.platform == 'darwin':
  84. libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
  85. libc.listxattr.restype = c_ssize_t
  86. libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
  87. libc.flistxattr.restype = c_ssize_t
  88. libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  89. libc.setxattr.restype = c_int
  90. libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  91. libc.fsetxattr.restype = c_int
  92. libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  93. libc.getxattr.restype = c_ssize_t
  94. libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
  95. libc.fgetxattr.restype = c_ssize_t
  96. XATTR_NOFOLLOW = 0x0001
  97. def llistxattr(path):
  98. path = os.fsencode(path)
  99. n = _check(libc.listxattr(path, None, 0, XATTR_NOFOLLOW), path)
  100. if n == 0:
  101. []
  102. namebuf = create_string_buffer(n)
  103. n2 = _check(libc.listxattr(path, namebuf, n, XATTR_NOFOLLOW))
  104. if n2 != n:
  105. raise Exception('llistxattr failed')
  106. return namebuf.raw.split(b'\0')[:-1]
  107. def flistxattr(fd):
  108. n = _check(libc.flistxattr(fd, None, 0, 0))
  109. if n == 0:
  110. []
  111. namebuf = create_string_buffer(n)
  112. n2 = _check(libc.flistxattr(fd, namebuf, n, 0))
  113. if n2 != n:
  114. raise Exception('flistxattr failed')
  115. return namebuf.raw.split(b'\0')[:-1]
  116. def lsetxattr(path, name, value, flags=XATTR_NOFOLLOW):
  117. _check(libc.setxattr(os.fsencode(path), name, value, len(value), 0, flags), path)
  118. def fsetxattr(fd, name, value, flags=0):
  119. _check(libc.fsetxattr(fd, name, value, len(value), 0, flags))
  120. def lgetxattr(path, name):
  121. path = os.fsencode(path)
  122. n = _check(libc.getxattr(path, name, None, 0, 0, XATTR_NOFOLLOW), path)
  123. if n == 0:
  124. return None
  125. valuebuf = create_string_buffer(n)
  126. n2 = _check(libc.getxattr(path, name, valuebuf, n, 0, XATTR_NOFOLLOW))
  127. if n2 != n:
  128. raise Exception('getxattr failed')
  129. return valuebuf.raw
  130. def fgetxattr(fd, name):
  131. n = _check(libc.fgetxattr(fd, name, None, 0, 0, 0))
  132. if n == 0:
  133. return None
  134. valuebuf = create_string_buffer(n)
  135. n2 = _check(libc.fgetxattr(fd, name, valuebuf, n, 0, 0))
  136. if n2 != n:
  137. Exception('fgetxattr failed')
  138. return valuebuf.raw
  139. else:
  140. raise Exception('Unsupported platform: %s' % sys.platform)