2
0

fuse.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. from collections import defaultdict
  2. import errno
  3. import io
  4. import llfuse
  5. import msgpack
  6. import os
  7. import stat
  8. import tempfile
  9. import time
  10. from attic.archive import Archive
  11. from attic.helpers import daemonize
  12. from attic.remote import cache_if_remote
  13. # Does this version of llfuse support ns precision?
  14. have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  15. class ItemCache:
  16. def __init__(self):
  17. self.fd = tempfile.TemporaryFile()
  18. self.offset = 1000000
  19. def add(self, item):
  20. pos = self.fd.seek(0, io.SEEK_END)
  21. self.fd.write(msgpack.packb(item))
  22. return pos + self.offset
  23. def get(self, inode):
  24. self.fd.seek(inode - self.offset, io.SEEK_SET)
  25. return next(msgpack.Unpacker(self.fd))
  26. class AtticOperations(llfuse.Operations):
  27. """Export Attic archive as a fuse filesystem
  28. """
  29. def __init__(self, key, repository, manifest, archive):
  30. super(AtticOperations, self).__init__()
  31. self._inode_count = 0
  32. self.key = key
  33. self.repository = cache_if_remote(repository)
  34. self.items = {}
  35. self.parent = {}
  36. self.contents = defaultdict(dict)
  37. self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()}
  38. self.pending_archives = {}
  39. self.cache = ItemCache()
  40. if archive:
  41. self.process_archive(archive)
  42. else:
  43. # Create root inode
  44. self.parent[1] = self.allocate_inode()
  45. self.items[1] = self.default_dir
  46. for archive_name in manifest.archives:
  47. # Create archive placeholder inode
  48. archive_inode = self.allocate_inode()
  49. self.items[archive_inode] = self.default_dir
  50. self.parent[archive_inode] = 1
  51. self.contents[1][os.fsencode(archive_name)] = archive_inode
  52. self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
  53. def process_archive(self, archive, prefix=[]):
  54. """Build fuse inode hierarchy from archive metadata
  55. """
  56. unpacker = msgpack.Unpacker()
  57. for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])):
  58. data = self.key.decrypt(key, chunk)
  59. unpacker.feed(data)
  60. for item in unpacker:
  61. segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/')
  62. del item[b'path']
  63. num_segments = len(segments)
  64. parent = 1
  65. for i, segment in enumerate(segments, 1):
  66. # Insert a default root inode if needed
  67. if self._inode_count == 0 and segment:
  68. archive_inode = self.allocate_inode()
  69. self.items[archive_inode] = self.default_dir
  70. self.parent[archive_inode] = parent
  71. # Leaf segment?
  72. if i == num_segments:
  73. if b'source' in item and stat.S_ISREG(item[b'mode']):
  74. inode = self._find_inode(item[b'source'], prefix)
  75. item = self.cache.get(inode)
  76. item[b'nlink'] = item.get(b'nlink', 1) + 1
  77. self.items[inode] = item
  78. else:
  79. inode = self.cache.add(item)
  80. self.parent[inode] = parent
  81. if segment:
  82. self.contents[parent][segment] = inode
  83. elif segment in self.contents[parent]:
  84. parent = self.contents[parent][segment]
  85. else:
  86. inode = self.allocate_inode()
  87. self.items[inode] = self.default_dir
  88. self.parent[inode] = parent
  89. if segment:
  90. self.contents[parent][segment] = inode
  91. parent = inode
  92. def allocate_inode(self):
  93. self._inode_count += 1
  94. return self._inode_count
  95. def statfs(self):
  96. stat_ = llfuse.StatvfsData()
  97. stat_.f_bsize = 512
  98. stat_.f_frsize = 512
  99. stat_.f_blocks = 0
  100. stat_.f_bfree = 0
  101. stat_.f_bavail = 0
  102. stat_.f_files = 0
  103. stat_.f_ffree = 0
  104. stat_.f_favail = 0
  105. return stat_
  106. def get_item(self, inode):
  107. try:
  108. return self.items[inode]
  109. except KeyError:
  110. return self.cache.get(inode)
  111. def _find_inode(self, path, prefix=[]):
  112. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  113. inode = 1
  114. for segment in segments:
  115. inode = self.contents[inode][segment]
  116. return inode
  117. def getattr(self, inode):
  118. item = self.get_item(inode)
  119. size = 0
  120. try:
  121. size = sum(size for _, size, _ in item[b'chunks'])
  122. except KeyError:
  123. pass
  124. entry = llfuse.EntryAttributes()
  125. entry.st_ino = inode
  126. entry.generation = 0
  127. entry.entry_timeout = 300
  128. entry.attr_timeout = 300
  129. entry.st_mode = item[b'mode']
  130. entry.st_nlink = item.get(b'nlink', 1)
  131. entry.st_uid = item[b'uid']
  132. entry.st_gid = item[b'gid']
  133. entry.st_rdev = item.get(b'rdev', 0)
  134. entry.st_size = size
  135. entry.st_blksize = 512
  136. entry.st_blocks = 1
  137. if have_fuse_mtime_ns:
  138. entry.st_atime_ns = item[b'mtime']
  139. entry.st_mtime_ns = item[b'mtime']
  140. entry.st_ctime_ns = item[b'mtime']
  141. else:
  142. entry.st_atime = item[b'mtime'] / 1e9
  143. entry.st_mtime = item[b'mtime'] / 1e9
  144. entry.st_ctime = item[b'mtime'] / 1e9
  145. return entry
  146. def listxattr(self, inode):
  147. item = self.get_item(inode)
  148. return item.get(b'xattrs', {}).keys()
  149. def getxattr(self, inode, name):
  150. item = self.get_item(inode)
  151. try:
  152. return item.get(b'xattrs', {})[name]
  153. except KeyError:
  154. raise llfuse.FUSEError(errno.ENODATA)
  155. def _load_pending_archive(self, inode):
  156. # Check if this is an archive we need to load
  157. archive = self.pending_archives.pop(inode, None)
  158. if archive:
  159. self.process_archive(archive, [os.fsencode(archive.name)])
  160. def lookup(self, parent_inode, name):
  161. self._load_pending_archive(parent_inode)
  162. if name == b'.':
  163. inode = parent_inode
  164. elif name == b'..':
  165. inode = self.parent[parent_inode]
  166. else:
  167. inode = self.contents[parent_inode].get(name)
  168. if not inode:
  169. raise llfuse.FUSEError(errno.ENOENT)
  170. return self.getattr(inode)
  171. def open(self, inode, flags):
  172. return inode
  173. def opendir(self, inode):
  174. self._load_pending_archive(inode)
  175. return inode
  176. def read(self, fh, offset, size):
  177. parts = []
  178. item = self.get_item(fh)
  179. for id, s, csize in item[b'chunks']:
  180. if s < offset:
  181. offset -= s
  182. continue
  183. n = min(size, s - offset)
  184. chunk = self.key.decrypt(id, self.repository.get(id))
  185. parts.append(chunk[offset:offset+n])
  186. offset = 0
  187. size -= n
  188. if not size:
  189. break
  190. return b''.join(parts)
  191. def readdir(self, fh, off):
  192. entries = [(b'.', fh), (b'..', self.parent[fh])]
  193. entries.extend(self.contents[fh].items())
  194. for i, (name, inode) in enumerate(entries[off:], off):
  195. yield name, self.getattr(inode), i + 1
  196. def readlink(self, inode):
  197. item = self.get_item(inode)
  198. return os.fsencode(item[b'source'])
  199. def mount(self, mountpoint, extra_options, foreground=False):
  200. options = ['fsname=atticfs', 'ro']
  201. if extra_options:
  202. options.extend(extra_options.split(','))
  203. llfuse.init(self, mountpoint, options)
  204. if not foreground:
  205. daemonize()
  206. try:
  207. llfuse.main(single=True)
  208. except:
  209. llfuse.close()
  210. raise
  211. llfuse.close()