platform_linux.pyx 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import os
  2. import re
  3. import subprocess
  4. from stat import S_ISLNK
  5. from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode
  6. API_VERSION = '1.0_01'
  7. cdef extern from "sys/types.h":
  8. int ACL_TYPE_ACCESS
  9. int ACL_TYPE_DEFAULT
  10. cdef extern from "sys/acl.h":
  11. ctypedef struct _acl_t:
  12. pass
  13. ctypedef _acl_t *acl_t
  14. int acl_free(void *obj)
  15. acl_t acl_get_file(const char *path, int type)
  16. acl_t acl_set_file(const char *path, int type, acl_t acl)
  17. acl_t acl_from_text(const char *buf)
  18. char *acl_to_text(acl_t acl, ssize_t *len)
  19. cdef extern from "acl/libacl.h":
  20. int acl_extended_file(const char *path)
  21. _comment_re = re.compile(' *#.*', re.M)
  22. def acl_use_local_uid_gid(acl):
  23. """Replace the user/group field with the local uid/gid if possible
  24. """
  25. entries = []
  26. for entry in safe_decode(acl).split('\n'):
  27. if entry:
  28. fields = entry.split(':')
  29. if fields[0] == 'user' and fields[1]:
  30. fields[1] = str(user2uid(fields[1], fields[3]))
  31. elif fields[0] == 'group' and fields[1]:
  32. fields[1] = str(group2gid(fields[1], fields[3]))
  33. entries.append(':'.join(fields[:3]))
  34. return safe_encode('\n'.join(entries))
  35. cdef acl_append_numeric_ids(acl):
  36. """Extend the "POSIX 1003.1e draft standard 17" format with an additional uid/gid field
  37. """
  38. entries = []
  39. for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
  40. if entry:
  41. type, name, permission = entry.split(':')
  42. if name and type == 'user':
  43. entries.append(':'.join([type, name, permission, str(user2uid(name, name))]))
  44. elif name and type == 'group':
  45. entries.append(':'.join([type, name, permission, str(group2gid(name, name))]))
  46. else:
  47. entries.append(entry)
  48. return safe_encode('\n'.join(entries))
  49. cdef acl_numeric_ids(acl):
  50. """Replace the "POSIX 1003.1e draft standard 17" user/group field with uid/gid
  51. """
  52. entries = []
  53. for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
  54. if entry:
  55. type, name, permission = entry.split(':')
  56. if name and type == 'user':
  57. uid = str(user2uid(name, name))
  58. entries.append(':'.join([type, uid, permission, uid]))
  59. elif name and type == 'group':
  60. gid = str(group2gid(name, name))
  61. entries.append(':'.join([type, gid, permission, gid]))
  62. else:
  63. entries.append(entry)
  64. return safe_encode('\n'.join(entries))
  65. def acl_get(path, item, st, numeric_owner=False):
  66. """Saves ACL Entries
  67. If `numeric_owner` is True the user/group field is not preserved only uid/gid
  68. """
  69. cdef acl_t default_acl = NULL
  70. cdef acl_t access_acl = NULL
  71. cdef char *default_text = NULL
  72. cdef char *access_text = NULL
  73. p = <bytes>os.fsencode(path)
  74. if S_ISLNK(st.st_mode) or acl_extended_file(p) <= 0:
  75. return
  76. if numeric_owner:
  77. converter = acl_numeric_ids
  78. else:
  79. converter = acl_append_numeric_ids
  80. try:
  81. access_acl = acl_get_file(p, ACL_TYPE_ACCESS)
  82. if access_acl:
  83. access_text = acl_to_text(access_acl, NULL)
  84. if access_text:
  85. item[b'acl_access'] = converter(access_text)
  86. default_acl = acl_get_file(p, ACL_TYPE_DEFAULT)
  87. if default_acl:
  88. default_text = acl_to_text(default_acl, NULL)
  89. if default_text:
  90. item[b'acl_default'] = converter(default_text)
  91. finally:
  92. acl_free(default_text)
  93. acl_free(default_acl)
  94. acl_free(access_text)
  95. acl_free(access_acl)
  96. def acl_set(path, item, numeric_owner=False):
  97. """Restore ACL Entries
  98. If `numeric_owner` is True the stored uid/gid is used instead
  99. of the user/group names
  100. """
  101. cdef acl_t access_acl = NULL
  102. cdef acl_t default_acl = NULL
  103. p = <bytes>os.fsencode(path)
  104. if numeric_owner:
  105. converter = posix_acl_use_stored_uid_gid
  106. else:
  107. converter = acl_use_local_uid_gid
  108. access_text = item.get(b'acl_access')
  109. default_text = item.get(b'acl_default')
  110. if access_text:
  111. try:
  112. access_acl = acl_from_text(<bytes>converter(access_text))
  113. if access_acl:
  114. acl_set_file(p, ACL_TYPE_ACCESS, access_acl)
  115. finally:
  116. acl_free(access_acl)
  117. if default_text:
  118. try:
  119. default_acl = acl_from_text(<bytes>converter(default_text))
  120. if default_acl:
  121. acl_set_file(p, ACL_TYPE_DEFAULT, default_acl)
  122. finally:
  123. acl_free(default_acl)
  124. def umount(mountpoint):
  125. return subprocess.call(['fusermount', '-u', mountpoint])