2
0

fuse.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. from collections import defaultdict
  2. import errno
  3. import io
  4. import llfuse
  5. import os
  6. import stat
  7. import tempfile
  8. import time
  9. from .archive import Archive
  10. from .helpers import daemonize, bigint_to_int
  11. from distutils.version import LooseVersion
  12. import msgpack
  13. # Does this version of llfuse support ns precision?
  14. have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  15. fuse_version = LooseVersion(getattr(llfuse, '__version__', '0.1'))
  16. if fuse_version >= '0.42':
  17. def fuse_main():
  18. return llfuse.main(workers=1)
  19. else:
  20. def fuse_main():
  21. llfuse.main(single=True)
  22. return None
  23. class ItemCache:
  24. def __init__(self):
  25. self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
  26. self.offset = 1000000
  27. def add(self, item):
  28. pos = self.fd.seek(0, io.SEEK_END)
  29. self.fd.write(msgpack.packb(item))
  30. return pos + self.offset
  31. def get(self, inode):
  32. self.fd.seek(inode - self.offset, io.SEEK_SET)
  33. return next(msgpack.Unpacker(self.fd, read_size=1024))
  34. class FuseOperations(llfuse.Operations):
  35. """Export archive as a fuse filesystem
  36. """
  37. def __init__(self, key, repository, manifest, archive, cached_repo):
  38. super().__init__()
  39. self._inode_count = 0
  40. self.key = key
  41. self.repository = cached_repo
  42. self.items = {}
  43. self.parent = {}
  44. self.contents = defaultdict(dict)
  45. self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()}
  46. self.pending_archives = {}
  47. self.accounted_chunks = {}
  48. self.cache = ItemCache()
  49. if archive:
  50. self.process_archive(archive)
  51. else:
  52. # Create root inode
  53. self.parent[1] = self.allocate_inode()
  54. self.items[1] = self.default_dir
  55. for archive_name in manifest.archives:
  56. # Create archive placeholder inode
  57. archive_inode = self.allocate_inode()
  58. self.items[archive_inode] = self.default_dir
  59. self.parent[archive_inode] = 1
  60. self.contents[1][os.fsencode(archive_name)] = archive_inode
  61. self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
  62. def process_archive(self, archive, prefix=[]):
  63. """Build fuse inode hierarchy from archive metadata
  64. """
  65. unpacker = msgpack.Unpacker()
  66. for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])):
  67. data = self.key.decrypt(key, chunk)
  68. unpacker.feed(data)
  69. for item in unpacker:
  70. segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/')
  71. del item[b'path']
  72. num_segments = len(segments)
  73. parent = 1
  74. for i, segment in enumerate(segments, 1):
  75. # Insert a default root inode if needed
  76. if self._inode_count == 0 and segment:
  77. archive_inode = self.allocate_inode()
  78. self.items[archive_inode] = self.default_dir
  79. self.parent[archive_inode] = parent
  80. # Leaf segment?
  81. if i == num_segments:
  82. if b'source' in item and stat.S_ISREG(item[b'mode']):
  83. inode = self._find_inode(item[b'source'], prefix)
  84. item = self.cache.get(inode)
  85. item[b'nlink'] = item.get(b'nlink', 1) + 1
  86. self.items[inode] = item
  87. else:
  88. inode = self.cache.add(item)
  89. self.parent[inode] = parent
  90. if segment:
  91. self.contents[parent][segment] = inode
  92. elif segment in self.contents[parent]:
  93. parent = self.contents[parent][segment]
  94. else:
  95. inode = self.allocate_inode()
  96. self.items[inode] = self.default_dir
  97. self.parent[inode] = parent
  98. if segment:
  99. self.contents[parent][segment] = inode
  100. parent = inode
  101. def allocate_inode(self):
  102. self._inode_count += 1
  103. return self._inode_count
  104. def statfs(self, ctx=None):
  105. stat_ = llfuse.StatvfsData()
  106. stat_.f_bsize = 512
  107. stat_.f_frsize = 512
  108. stat_.f_blocks = 0
  109. stat_.f_bfree = 0
  110. stat_.f_bavail = 0
  111. stat_.f_files = 0
  112. stat_.f_ffree = 0
  113. stat_.f_favail = 0
  114. return stat_
  115. def get_item(self, inode):
  116. try:
  117. return self.items[inode]
  118. except KeyError:
  119. return self.cache.get(inode)
  120. def _find_inode(self, path, prefix=[]):
  121. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  122. inode = 1
  123. for segment in segments:
  124. inode = self.contents[inode][segment]
  125. return inode
  126. def getattr(self, inode, ctx=None):
  127. item = self.get_item(inode)
  128. size = 0
  129. dsize = 0
  130. try:
  131. for key, chunksize, _ in item[b'chunks']:
  132. size += chunksize
  133. if self.accounted_chunks.get(key, inode) == inode:
  134. self.accounted_chunks[key] = inode
  135. dsize += chunksize
  136. except KeyError:
  137. pass
  138. entry = llfuse.EntryAttributes()
  139. entry.st_ino = inode
  140. entry.generation = 0
  141. entry.entry_timeout = 300
  142. entry.attr_timeout = 300
  143. entry.st_mode = item[b'mode']
  144. entry.st_nlink = item.get(b'nlink', 1)
  145. entry.st_uid = item[b'uid']
  146. entry.st_gid = item[b'gid']
  147. entry.st_rdev = item.get(b'rdev', 0)
  148. entry.st_size = size
  149. entry.st_blksize = 512
  150. entry.st_blocks = dsize / 512
  151. # note: older archives only have mtime (not atime nor ctime)
  152. if have_fuse_xtime_ns:
  153. entry.st_mtime_ns = bigint_to_int(item[b'mtime'])
  154. if b'atime' in item:
  155. entry.st_atime_ns = bigint_to_int(item[b'atime'])
  156. else:
  157. entry.st_atime_ns = bigint_to_int(item[b'mtime'])
  158. if b'ctime' in item:
  159. entry.st_ctime_ns = bigint_to_int(item[b'ctime'])
  160. else:
  161. entry.st_ctime_ns = bigint_to_int(item[b'mtime'])
  162. else:
  163. entry.st_mtime_ns = bigint_to_int(item[b'mtime']) / 1e9
  164. if b'atime' in item:
  165. entry.st_atime_ns = bigint_to_int(item[b'atime']) / 1e9
  166. else:
  167. entry.st_atime_ns = bigint_to_int(item[b'mtime']) / 1e9
  168. if b'ctime' in item:
  169. entry.st_ctime_ns = bigint_to_int(item[b'ctime']) / 1e9
  170. else:
  171. entry.st_ctime_ns = bigint_to_int(item[b'mtime']) / 1e9
  172. return entry
  173. def listxattr(self, inode, ctx=None):
  174. item = self.get_item(inode)
  175. return item.get(b'xattrs', {}).keys()
  176. def getxattr(self, inode, name, ctx=None):
  177. item = self.get_item(inode)
  178. try:
  179. return item.get(b'xattrs', {})[name]
  180. except KeyError:
  181. raise llfuse.FUSEError(errno.ENODATA) from None
  182. def _load_pending_archive(self, inode):
  183. # Check if this is an archive we need to load
  184. archive = self.pending_archives.pop(inode, None)
  185. if archive:
  186. self.process_archive(archive, [os.fsencode(archive.name)])
  187. def lookup(self, parent_inode, name, ctx=None):
  188. self._load_pending_archive(parent_inode)
  189. if name == b'.':
  190. inode = parent_inode
  191. elif name == b'..':
  192. inode = self.parent[parent_inode]
  193. else:
  194. inode = self.contents[parent_inode].get(name)
  195. if not inode:
  196. raise llfuse.FUSEError(errno.ENOENT)
  197. return self.getattr(inode)
  198. def open(self, inode, flags, ctx=None):
  199. return inode
  200. def opendir(self, inode, ctx=None):
  201. self._load_pending_archive(inode)
  202. return inode
  203. def read(self, fh, offset, size):
  204. parts = []
  205. item = self.get_item(fh)
  206. for id, s, csize in item[b'chunks']:
  207. if s < offset:
  208. offset -= s
  209. continue
  210. n = min(size, s - offset)
  211. chunk = self.key.decrypt(id, self.repository.get(id))
  212. parts.append(chunk[offset:offset + n])
  213. offset = 0
  214. size -= n
  215. if not size:
  216. break
  217. return b''.join(parts)
  218. def readdir(self, fh, off):
  219. entries = [(b'.', fh), (b'..', self.parent[fh])]
  220. entries.extend(self.contents[fh].items())
  221. for i, (name, inode) in enumerate(entries[off:], off):
  222. yield name, self.getattr(inode), i + 1
  223. def readlink(self, inode, ctx=None):
  224. item = self.get_item(inode)
  225. return os.fsencode(item[b'source'])
  226. def mount(self, mountpoint, extra_options, foreground=False):
  227. options = ['fsname=borgfs', 'ro']
  228. if extra_options:
  229. options.extend(extra_options.split(','))
  230. llfuse.init(self, mountpoint, options)
  231. if not foreground:
  232. daemonize()
  233. # If the file system crashes, we do not want to umount because in that
  234. # case the mountpoint suddenly appears to become empty. This can have
  235. # nasty consequences, imagine the user has e.g. an active rsync mirror
  236. # job - seeing the mountpoint empty, rsync would delete everything in the
  237. # mirror.
  238. umount = False
  239. try:
  240. signal = fuse_main()
  241. umount = (signal is None) # no crash and no signal -> umount request
  242. finally:
  243. llfuse.close(umount)