|
|
@@ -3,7 +3,7 @@ import re
|
|
|
import stat
|
|
|
|
|
|
from .posix import posix_acl_use_stored_uid_gid
|
|
|
-from .posix import user2uid, group2gid
|
|
|
+from . import posix_ug
|
|
|
from ..helpers import workarounds
|
|
|
from ..helpers import safe_decode, safe_encode
|
|
|
from .base import SyncFile as BaseSyncFile
|
|
|
@@ -47,11 +47,12 @@ cdef extern from "sys/acl.h":
|
|
|
int acl_set_file(const char *path, int type, acl_t acl)
|
|
|
int acl_set_fd(int fd, acl_t acl)
|
|
|
acl_t acl_from_text(const char *buf)
|
|
|
- char *acl_to_text(acl_t acl, ssize_t *len)
|
|
|
|
|
|
cdef extern from "acl/libacl.h":
|
|
|
int acl_extended_file_nofollow(const char *path)
|
|
|
int acl_extended_fd(int fd)
|
|
|
+ char *acl_to_any_text(acl_t acl, const char *prefix, char separator, int options)
|
|
|
+ int TEXT_NUMERIC_IDS
|
|
|
|
|
|
cdef extern from "linux/fs.h":
|
|
|
# ioctls
|
|
|
@@ -203,46 +204,66 @@ def acl_use_local_uid_gid(acl):
|
|
|
if entry:
|
|
|
fields = entry.split(':')
|
|
|
if fields[0] == 'user' and fields[1]:
|
|
|
- fields[1] = str(user2uid(fields[1], fields[3]))
|
|
|
+ fields[1] = str(posix_ug._user2uid(fields[1], fields[3]))
|
|
|
elif fields[0] == 'group' and fields[1]:
|
|
|
- fields[1] = str(group2gid(fields[1], fields[3]))
|
|
|
+ fields[1] = str(posix_ug._group2gid(fields[1], fields[3]))
|
|
|
entries.append(':'.join(fields[:3]))
|
|
|
return safe_encode('\n'.join(entries))
|
|
|
|
|
|
|
|
|
-cdef acl_append_numeric_ids(acl):
|
|
|
- """Extend the "POSIX 1003.1e draft standard 17" format with an additional uid/gid field
|
|
|
+def _acl_from_numeric_to_named_with_id(acl):
|
|
|
+ """Convert numeric-id ACL entries to name entries and append numeric id as 4th field.
|
|
|
+
|
|
|
+ Input format (Linux libacl): lines like 'user:1000:rwx' or 'group:100:r-x' or 'user::rwx'.
|
|
|
+ Output format: for entries with a name/id field, become 'user:uname:rwx:uid' or 'group:gname:r-x:gid'.
|
|
|
"""
|
|
|
assert isinstance(acl, bytes)
|
|
|
entries = []
|
|
|
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
|
|
|
- if entry:
|
|
|
- type, name, permission = entry.split(':')
|
|
|
- if name and type == 'user':
|
|
|
- entries.append(':'.join([type, name, permission, str(user2uid(name, name))]))
|
|
|
- elif name and type == 'group':
|
|
|
- entries.append(':'.join([type, name, permission, str(group2gid(name, name))]))
|
|
|
+ if not entry:
|
|
|
+ continue
|
|
|
+ fields = entry.split(':')
|
|
|
+ # Expected 3 fields: type, ugid_or_empty, perms
|
|
|
+ if len(fields) >= 3:
|
|
|
+ typ, ugid_str, perm = fields[0], fields[1], fields[2]
|
|
|
+ if ugid_str and typ == 'user':
|
|
|
+ try:
|
|
|
+ uid = int(ugid_str)
|
|
|
+ except ValueError:
|
|
|
+ uid = None
|
|
|
+ uname = posix_ug._uid2user(uid, ugid_str) if uid is not None else ugid_str
|
|
|
+ entries.append(':'.join([typ, uname, perm, str(uid if uid is not None else ugid_str)]))
|
|
|
+ elif ugid_str and typ == 'group':
|
|
|
+ try:
|
|
|
+ gid = int(ugid_str)
|
|
|
+ except ValueError:
|
|
|
+ gid = None
|
|
|
+ gname = posix_ug._gid2group(gid, ugid_str) if gid is not None else ugid_str
|
|
|
+ entries.append(':'.join([typ, gname, perm, str(gid if gid is not None else ugid_str)]))
|
|
|
else:
|
|
|
- entries.append(entry)
|
|
|
+ # owner, group_obj, mask, other (empty ugid_str field) stay as-is
|
|
|
+ entries.append(':'.join([typ, '', perm]))
|
|
|
+ else:
|
|
|
+ entries.append(entry)
|
|
|
return safe_encode('\n'.join(entries))
|
|
|
|
|
|
|
|
|
-cdef acl_numeric_ids(acl):
|
|
|
- """Replace the "POSIX 1003.1e draft standard 17" user/group field with uid/gid
|
|
|
- """
|
|
|
+def _acl_from_numeric_to_numeric_with_id(acl):
|
|
|
+ """Keep numeric ids in name field and append the same id as 4th field where applicable."""
|
|
|
assert isinstance(acl, bytes)
|
|
|
entries = []
|
|
|
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
|
|
|
- if entry:
|
|
|
- type, name, permission = entry.split(':')
|
|
|
- if name and type == 'user':
|
|
|
- uid = str(user2uid(name, name))
|
|
|
- entries.append(':'.join([type, uid, permission, uid]))
|
|
|
- elif name and type == 'group':
|
|
|
- gid = str(group2gid(name, name))
|
|
|
- entries.append(':'.join([type, gid, permission, gid]))
|
|
|
+ if not entry:
|
|
|
+ continue
|
|
|
+ fields = entry.split(':')
|
|
|
+ if len(fields) >= 3:
|
|
|
+ typ, ugid, perm = fields[0], fields[1], fields[2]
|
|
|
+ if ugid and (typ == 'user' or typ == 'group'):
|
|
|
+ entries.append(':'.join([typ, ugid, perm, ugid]))
|
|
|
else:
|
|
|
- entries.append(entry)
|
|
|
+ entries.append(':'.join([typ, '', perm]))
|
|
|
+ else:
|
|
|
+ entries.append(entry)
|
|
|
return safe_encode('\n'.join(entries))
|
|
|
|
|
|
|
|
|
@@ -266,9 +287,9 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
|
|
|
# note: this should also be the case for symlink fs objects, as they can not have ACLs.
|
|
|
return
|
|
|
if numeric_ids:
|
|
|
- converter = acl_numeric_ids
|
|
|
+ converter = _acl_from_numeric_to_numeric_with_id
|
|
|
else:
|
|
|
- converter = acl_append_numeric_ids
|
|
|
+ converter = _acl_from_numeric_to_named_with_id
|
|
|
try:
|
|
|
if fd is not None:
|
|
|
access_acl = acl_get_fd(fd)
|
|
|
@@ -276,7 +297,7 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
|
|
|
access_acl = acl_get_file(path, ACL_TYPE_ACCESS)
|
|
|
if access_acl == NULL:
|
|
|
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
|
|
|
- access_text = acl_to_text(access_acl, NULL)
|
|
|
+ access_text = acl_to_any_text(access_acl, NULL, '\n', TEXT_NUMERIC_IDS)
|
|
|
if access_text == NULL:
|
|
|
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
|
|
|
item['acl_access'] = converter(access_text)
|
|
|
@@ -289,7 +310,7 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
|
|
|
default_acl = acl_get_file(path, ACL_TYPE_DEFAULT)
|
|
|
if default_acl == NULL:
|
|
|
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
|
|
|
- default_text = acl_to_text(default_acl, NULL)
|
|
|
+ default_text = acl_to_any_text(default_acl, NULL, '\n', TEXT_NUMERIC_IDS)
|
|
|
if default_text == NULL:
|
|
|
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
|
|
|
item['acl_default'] = converter(default_text)
|