fuse.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. from collections import defaultdict
  2. import errno
  3. import io
  4. import llfuse
  5. import os
  6. import stat
  7. import tempfile
  8. import time
  9. from .archive import Archive
  10. from .helpers import daemonize
  11. import msgpack
  12. # Does this version of llfuse support ns precision?
  13. have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  14. class ItemCache:
  15. def __init__(self):
  16. self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
  17. self.offset = 1000000
  18. def add(self, item):
  19. pos = self.fd.seek(0, io.SEEK_END)
  20. self.fd.write(msgpack.packb(item))
  21. return pos + self.offset
  22. def get(self, inode):
  23. self.fd.seek(inode - self.offset, io.SEEK_SET)
  24. return next(msgpack.Unpacker(self.fd, read_size=1024))
  25. class FuseOperations(llfuse.Operations):
  26. """Export archive as a fuse filesystem
  27. """
  28. def __init__(self, key, repository, manifest, archive, cached_repo):
  29. super().__init__()
  30. self._inode_count = 0
  31. self.key = key
  32. self.repository = cached_repo
  33. self.items = {}
  34. self.parent = {}
  35. self.contents = defaultdict(dict)
  36. self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()}
  37. self.pending_archives = {}
  38. self.accounted_chunks = {}
  39. self.cache = ItemCache()
  40. if archive:
  41. self.process_archive(archive)
  42. else:
  43. # Create root inode
  44. self.parent[1] = self.allocate_inode()
  45. self.items[1] = self.default_dir
  46. for archive_name in manifest.archives:
  47. # Create archive placeholder inode
  48. archive_inode = self.allocate_inode()
  49. self.items[archive_inode] = self.default_dir
  50. self.parent[archive_inode] = 1
  51. self.contents[1][os.fsencode(archive_name)] = archive_inode
  52. self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
  53. def process_archive(self, archive, prefix=[]):
  54. """Build fuse inode hierarchy from archive metadata
  55. """
  56. unpacker = msgpack.Unpacker()
  57. for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])):
  58. data = self.key.decrypt(key, chunk)
  59. unpacker.feed(data)
  60. for item in unpacker:
  61. segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/')
  62. del item[b'path']
  63. num_segments = len(segments)
  64. parent = 1
  65. for i, segment in enumerate(segments, 1):
  66. # Insert a default root inode if needed
  67. if self._inode_count == 0 and segment:
  68. archive_inode = self.allocate_inode()
  69. self.items[archive_inode] = self.default_dir
  70. self.parent[archive_inode] = parent
  71. # Leaf segment?
  72. if i == num_segments:
  73. if b'source' in item and stat.S_ISREG(item[b'mode']):
  74. inode = self._find_inode(item[b'source'], prefix)
  75. item = self.cache.get(inode)
  76. item[b'nlink'] = item.get(b'nlink', 1) + 1
  77. self.items[inode] = item
  78. else:
  79. inode = self.cache.add(item)
  80. self.parent[inode] = parent
  81. if segment:
  82. self.contents[parent][segment] = inode
  83. elif segment in self.contents[parent]:
  84. parent = self.contents[parent][segment]
  85. else:
  86. inode = self.allocate_inode()
  87. self.items[inode] = self.default_dir
  88. self.parent[inode] = parent
  89. if segment:
  90. self.contents[parent][segment] = inode
  91. parent = inode
  92. def allocate_inode(self):
  93. self._inode_count += 1
  94. return self._inode_count
  95. def statfs(self):
  96. stat_ = llfuse.StatvfsData()
  97. stat_.f_bsize = 512
  98. stat_.f_frsize = 512
  99. stat_.f_blocks = 0
  100. stat_.f_bfree = 0
  101. stat_.f_bavail = 0
  102. stat_.f_files = 0
  103. stat_.f_ffree = 0
  104. stat_.f_favail = 0
  105. return stat_
  106. def get_item(self, inode):
  107. try:
  108. return self.items[inode]
  109. except KeyError:
  110. return self.cache.get(inode)
  111. def _find_inode(self, path, prefix=[]):
  112. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  113. inode = 1
  114. for segment in segments:
  115. inode = self.contents[inode][segment]
  116. return inode
  117. def getattr(self, inode):
  118. item = self.get_item(inode)
  119. size = 0
  120. dsize = 0
  121. try:
  122. for key, chunksize, _ in item[b'chunks']:
  123. size += chunksize
  124. if self.accounted_chunks.get(key, inode) == inode:
  125. self.accounted_chunks[key] = inode
  126. dsize += chunksize
  127. except KeyError:
  128. pass
  129. entry = llfuse.EntryAttributes()
  130. entry.st_ino = inode
  131. entry.generation = 0
  132. entry.entry_timeout = 300
  133. entry.attr_timeout = 300
  134. entry.st_mode = item[b'mode']
  135. entry.st_nlink = item.get(b'nlink', 1)
  136. entry.st_uid = item[b'uid']
  137. entry.st_gid = item[b'gid']
  138. entry.st_rdev = item.get(b'rdev', 0)
  139. entry.st_size = size
  140. entry.st_blksize = 512
  141. entry.st_blocks = dsize / 512
  142. # note: older archives only have mtime (not atime nor ctime)
  143. if have_fuse_xtime_ns:
  144. entry.st_atime_ns = item.get(b'atime') or item[b'mtime']
  145. entry.st_mtime_ns = item[b'mtime']
  146. entry.st_ctime_ns = item.get(b'ctime') or item[b'mtime']
  147. else:
  148. entry.st_atime = (item.get(b'atime') or item[b'mtime']) / 1e9
  149. entry.st_mtime = item[b'mtime'] / 1e9
  150. entry.st_ctime = (item.get(b'ctime') or item[b'mtime']) / 1e9
  151. return entry
  152. def listxattr(self, inode):
  153. item = self.get_item(inode)
  154. return item.get(b'xattrs', {}).keys()
  155. def getxattr(self, inode, name):
  156. item = self.get_item(inode)
  157. try:
  158. return item.get(b'xattrs', {})[name]
  159. except KeyError:
  160. raise llfuse.FUSEError(errno.ENODATA) from None
  161. def _load_pending_archive(self, inode):
  162. # Check if this is an archive we need to load
  163. archive = self.pending_archives.pop(inode, None)
  164. if archive:
  165. self.process_archive(archive, [os.fsencode(archive.name)])
  166. def lookup(self, parent_inode, name):
  167. self._load_pending_archive(parent_inode)
  168. if name == b'.':
  169. inode = parent_inode
  170. elif name == b'..':
  171. inode = self.parent[parent_inode]
  172. else:
  173. inode = self.contents[parent_inode].get(name)
  174. if not inode:
  175. raise llfuse.FUSEError(errno.ENOENT)
  176. return self.getattr(inode)
  177. def open(self, inode, flags):
  178. return inode
  179. def opendir(self, inode):
  180. self._load_pending_archive(inode)
  181. return inode
  182. def read(self, fh, offset, size):
  183. parts = []
  184. item = self.get_item(fh)
  185. for id, s, csize in item[b'chunks']:
  186. if s < offset:
  187. offset -= s
  188. continue
  189. n = min(size, s - offset)
  190. chunk = self.key.decrypt(id, self.repository.get(id))
  191. parts.append(chunk[offset:offset + n])
  192. offset = 0
  193. size -= n
  194. if not size:
  195. break
  196. return b''.join(parts)
  197. def readdir(self, fh, off):
  198. entries = [(b'.', fh), (b'..', self.parent[fh])]
  199. entries.extend(self.contents[fh].items())
  200. for i, (name, inode) in enumerate(entries[off:], off):
  201. yield name, self.getattr(inode), i + 1
  202. def readlink(self, inode):
  203. item = self.get_item(inode)
  204. return os.fsencode(item[b'source'])
  205. def mount(self, mountpoint, extra_options, foreground=False):
  206. options = ['fsname=borgfs', 'ro']
  207. if extra_options:
  208. options.extend(extra_options.split(','))
  209. llfuse.init(self, mountpoint, options)
  210. if not foreground:
  211. daemonize()
  212. try:
  213. llfuse.main(single=True)
  214. finally:
  215. llfuse.close()