fuse.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. from collections import defaultdict
  2. import errno
  3. import io
  4. import llfuse
  5. import msgpack
  6. import os
  7. import stat
  8. import tempfile
  9. import time
  10. from attic.archive import Archive
  11. from attic.helpers import daemonize
  12. from attic.remote import cache_if_remote
  13. # Does this version of llfuse support ns precision?
  14. have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  15. class ItemCache:
  16. def __init__(self):
  17. self.fd = tempfile.TemporaryFile()
  18. self.offset = 1000000
  19. def add(self, item):
  20. pos = self.fd.seek(0, io.SEEK_END)
  21. self.fd.write(msgpack.packb(item))
  22. return pos + self.offset
  23. def get(self, inode):
  24. self.fd.seek(inode - self.offset, io.SEEK_SET)
  25. return next(msgpack.Unpacker(self.fd))
  26. class FuseOperations(llfuse.Operations):
  27. """Export archive as a fuse filesystem
  28. """
  29. def __init__(self, key, repository, manifest, archive):
  30. super(AtticOperations, self).__init__()
  31. self._inode_count = 0
  32. self.key = key
  33. self.repository = cache_if_remote(repository)
  34. self.items = {}
  35. self.parent = {}
  36. self.contents = defaultdict(dict)
  37. self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()}
  38. self.pending_archives = {}
  39. self.accounted_chunks = {}
  40. self.cache = ItemCache()
  41. if archive:
  42. self.process_archive(archive)
  43. else:
  44. # Create root inode
  45. self.parent[1] = self.allocate_inode()
  46. self.items[1] = self.default_dir
  47. for archive_name in manifest.archives:
  48. # Create archive placeholder inode
  49. archive_inode = self.allocate_inode()
  50. self.items[archive_inode] = self.default_dir
  51. self.parent[archive_inode] = 1
  52. self.contents[1][os.fsencode(archive_name)] = archive_inode
  53. self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
  54. def process_archive(self, archive, prefix=[]):
  55. """Build fuse inode hierarchy from archive metadata
  56. """
  57. unpacker = msgpack.Unpacker()
  58. for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])):
  59. data = self.key.decrypt(key, chunk)
  60. unpacker.feed(data)
  61. for item in unpacker:
  62. segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/')
  63. del item[b'path']
  64. num_segments = len(segments)
  65. parent = 1
  66. for i, segment in enumerate(segments, 1):
  67. # Insert a default root inode if needed
  68. if self._inode_count == 0 and segment:
  69. archive_inode = self.allocate_inode()
  70. self.items[archive_inode] = self.default_dir
  71. self.parent[archive_inode] = parent
  72. # Leaf segment?
  73. if i == num_segments:
  74. if b'source' in item and stat.S_ISREG(item[b'mode']):
  75. inode = self._find_inode(item[b'source'], prefix)
  76. item = self.cache.get(inode)
  77. item[b'nlink'] = item.get(b'nlink', 1) + 1
  78. self.items[inode] = item
  79. else:
  80. inode = self.cache.add(item)
  81. self.parent[inode] = parent
  82. if segment:
  83. self.contents[parent][segment] = inode
  84. elif segment in self.contents[parent]:
  85. parent = self.contents[parent][segment]
  86. else:
  87. inode = self.allocate_inode()
  88. self.items[inode] = self.default_dir
  89. self.parent[inode] = parent
  90. if segment:
  91. self.contents[parent][segment] = inode
  92. parent = inode
  93. def allocate_inode(self):
  94. self._inode_count += 1
  95. return self._inode_count
  96. def statfs(self):
  97. stat_ = llfuse.StatvfsData()
  98. stat_.f_bsize = 512
  99. stat_.f_frsize = 512
  100. stat_.f_blocks = 0
  101. stat_.f_bfree = 0
  102. stat_.f_bavail = 0
  103. stat_.f_files = 0
  104. stat_.f_ffree = 0
  105. stat_.f_favail = 0
  106. return stat_
  107. def get_item(self, inode):
  108. try:
  109. return self.items[inode]
  110. except KeyError:
  111. return self.cache.get(inode)
  112. def _find_inode(self, path, prefix=[]):
  113. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  114. inode = 1
  115. for segment in segments:
  116. inode = self.contents[inode][segment]
  117. return inode
  118. def getattr(self, inode):
  119. item = self.get_item(inode)
  120. size = 0
  121. dsize = 0
  122. try:
  123. for key, chunksize, _ in item[b'chunks']:
  124. size += chunksize
  125. if self.accounted_chunks.get(key, inode) == inode:
  126. self.accounted_chunks[key] = inode
  127. dsize += chunksize
  128. except KeyError:
  129. pass
  130. entry = llfuse.EntryAttributes()
  131. entry.st_ino = inode
  132. entry.generation = 0
  133. entry.entry_timeout = 300
  134. entry.attr_timeout = 300
  135. entry.st_mode = item[b'mode']
  136. entry.st_nlink = item.get(b'nlink', 1)
  137. entry.st_uid = item[b'uid']
  138. entry.st_gid = item[b'gid']
  139. entry.st_rdev = item.get(b'rdev', 0)
  140. entry.st_size = size
  141. entry.st_blksize = 512
  142. entry.st_blocks = dsize / 512
  143. if have_fuse_mtime_ns:
  144. entry.st_atime_ns = item[b'mtime']
  145. entry.st_mtime_ns = item[b'mtime']
  146. entry.st_ctime_ns = item[b'mtime']
  147. else:
  148. entry.st_atime = item[b'mtime'] / 1e9
  149. entry.st_mtime = item[b'mtime'] / 1e9
  150. entry.st_ctime = item[b'mtime'] / 1e9
  151. return entry
  152. def listxattr(self, inode):
  153. item = self.get_item(inode)
  154. return item.get(b'xattrs', {}).keys()
  155. def getxattr(self, inode, name):
  156. item = self.get_item(inode)
  157. try:
  158. return item.get(b'xattrs', {})[name]
  159. except KeyError:
  160. raise llfuse.FUSEError(errno.ENODATA)
  161. def _load_pending_archive(self, inode):
  162. # Check if this is an archive we need to load
  163. archive = self.pending_archives.pop(inode, None)
  164. if archive:
  165. self.process_archive(archive, [os.fsencode(archive.name)])
  166. def lookup(self, parent_inode, name):
  167. self._load_pending_archive(parent_inode)
  168. if name == b'.':
  169. inode = parent_inode
  170. elif name == b'..':
  171. inode = self.parent[parent_inode]
  172. else:
  173. inode = self.contents[parent_inode].get(name)
  174. if not inode:
  175. raise llfuse.FUSEError(errno.ENOENT)
  176. return self.getattr(inode)
  177. def open(self, inode, flags):
  178. return inode
  179. def opendir(self, inode):
  180. self._load_pending_archive(inode)
  181. return inode
  182. def read(self, fh, offset, size):
  183. parts = []
  184. item = self.get_item(fh)
  185. for id, s, csize in item[b'chunks']:
  186. if s < offset:
  187. offset -= s
  188. continue
  189. n = min(size, s - offset)
  190. chunk = self.key.decrypt(id, self.repository.get(id))
  191. parts.append(chunk[offset:offset+n])
  192. offset = 0
  193. size -= n
  194. if not size:
  195. break
  196. return b''.join(parts)
  197. def readdir(self, fh, off):
  198. entries = [(b'.', fh), (b'..', self.parent[fh])]
  199. entries.extend(self.contents[fh].items())
  200. for i, (name, inode) in enumerate(entries[off:], off):
  201. yield name, self.getattr(inode), i + 1
  202. def readlink(self, inode):
  203. item = self.get_item(inode)
  204. return os.fsencode(item[b'source'])
  205. def mount(self, mountpoint, extra_options, foreground=False):
  206. options = ['fsname=borgfs', 'ro']
  207. if extra_options:
  208. options.extend(extra_options.split(','))
  209. llfuse.init(self, mountpoint, options)
  210. if not foreground:
  211. daemonize()
  212. try:
  213. llfuse.main(single=True)
  214. finally:
  215. llfuse.close()