fuse.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. from collections import defaultdict
  2. import errno
  3. import io
  4. import llfuse
  5. import os
  6. import stat
  7. import tempfile
  8. import time
  9. from distutils.version import LooseVersion
  10. import msgpack
  11. from .archive import Archive
  12. from .helpers import daemonize, bigint_to_int
  13. from .logger import create_logger
  14. from .lrucache import LRUCache
  15. logger = create_logger()
  16. # Does this version of llfuse support ns precision?
  17. have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  18. fuse_version = LooseVersion(getattr(llfuse, '__version__', '0.1'))
  19. if fuse_version >= '0.42':
  20. def fuse_main():
  21. return llfuse.main(workers=1)
  22. else:
  23. def fuse_main():
  24. llfuse.main(single=True)
  25. return None
  26. class ItemCache:
  27. def __init__(self):
  28. self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
  29. self.offset = 1000000
  30. def add(self, item):
  31. pos = self.fd.seek(0, io.SEEK_END)
  32. self.fd.write(msgpack.packb(item))
  33. return pos + self.offset
  34. def get(self, inode):
  35. self.fd.seek(inode - self.offset, io.SEEK_SET)
  36. return next(msgpack.Unpacker(self.fd, read_size=1024))
  37. class FuseOperations(llfuse.Operations):
  38. """Export archive as a fuse filesystem
  39. """
  40. allow_damaged_files = False
  41. def __init__(self, key, repository, manifest, archive, cached_repo):
  42. super().__init__()
  43. self._inode_count = 0
  44. self.key = key
  45. self.repository = cached_repo
  46. self.items = {}
  47. self.parent = {}
  48. self.contents = defaultdict(dict)
  49. self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()}
  50. self.pending_archives = {}
  51. self.accounted_chunks = {}
  52. self.cache = ItemCache()
  53. data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
  54. logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
  55. self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
  56. self._create_dir(parent=1) # first call, create root dir (inode == 1)
  57. if archive:
  58. self.process_archive(archive)
  59. else:
  60. for archive_name in manifest.archives:
  61. # Create archive placeholder inode
  62. archive_inode = self._create_dir(parent=1)
  63. self.contents[1][os.fsencode(archive_name)] = archive_inode
  64. self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
  65. def mount(self, mountpoint, mount_options, foreground=False):
  66. """Mount filesystem on *mountpoint* with *mount_options*."""
  67. options = ['fsname=borgfs', 'ro']
  68. if mount_options:
  69. options.extend(mount_options.split(','))
  70. try:
  71. options.remove('allow_damaged_files')
  72. self.allow_damaged_files = True
  73. except ValueError:
  74. pass
  75. llfuse.init(self, mountpoint, options)
  76. if not foreground:
  77. daemonize()
  78. # If the file system crashes, we do not want to umount because in that
  79. # case the mountpoint suddenly appears to become empty. This can have
  80. # nasty consequences, imagine the user has e.g. an active rsync mirror
  81. # job - seeing the mountpoint empty, rsync would delete everything in the
  82. # mirror.
  83. umount = False
  84. try:
  85. signal = fuse_main()
  86. umount = (signal is None) # no crash and no signal -> umount request
  87. finally:
  88. llfuse.close(umount)
  89. def _create_dir(self, parent):
  90. """Create directory
  91. """
  92. ino = self.allocate_inode()
  93. self.items[ino] = self.default_dir
  94. self.parent[ino] = parent
  95. return ino
  96. def process_archive(self, archive, prefix=[]):
  97. """Build fuse inode hierarchy from archive metadata
  98. """
  99. unpacker = msgpack.Unpacker()
  100. for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])):
  101. data = self.key.decrypt(key, chunk)
  102. unpacker.feed(data)
  103. for item in unpacker:
  104. try:
  105. # This can happen if an archive was created with a command line like
  106. # $ borg create ... dir1/file dir1
  107. # In this case the code below will have created a default_dir inode for dir1 already.
  108. inode = self._find_inode(item[b'path'], prefix)
  109. except KeyError:
  110. pass
  111. else:
  112. self.items[inode] = item
  113. continue
  114. segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/')
  115. del item[b'path']
  116. num_segments = len(segments)
  117. parent = 1
  118. for i, segment in enumerate(segments, 1):
  119. # Leaf segment?
  120. if i == num_segments:
  121. if b'source' in item and stat.S_ISREG(item[b'mode']):
  122. inode = self._find_inode(item[b'source'], prefix)
  123. item = self.cache.get(inode)
  124. item[b'nlink'] = item.get(b'nlink', 1) + 1
  125. self.items[inode] = item
  126. else:
  127. inode = self.cache.add(item)
  128. self.parent[inode] = parent
  129. if segment:
  130. self.contents[parent][segment] = inode
  131. elif segment in self.contents[parent]:
  132. parent = self.contents[parent][segment]
  133. else:
  134. inode = self._create_dir(parent)
  135. if segment:
  136. self.contents[parent][segment] = inode
  137. parent = inode
  138. def allocate_inode(self):
  139. self._inode_count += 1
  140. return self._inode_count
  141. def statfs(self, ctx=None):
  142. stat_ = llfuse.StatvfsData()
  143. stat_.f_bsize = 512
  144. stat_.f_frsize = 512
  145. stat_.f_blocks = 0
  146. stat_.f_bfree = 0
  147. stat_.f_bavail = 0
  148. stat_.f_files = 0
  149. stat_.f_ffree = 0
  150. stat_.f_favail = 0
  151. return stat_
  152. def get_item(self, inode):
  153. try:
  154. return self.items[inode]
  155. except KeyError:
  156. return self.cache.get(inode)
  157. def _find_inode(self, path, prefix=[]):
  158. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  159. inode = 1
  160. for segment in segments:
  161. inode = self.contents[inode][segment]
  162. return inode
  163. def getattr(self, inode, ctx=None):
  164. item = self.get_item(inode)
  165. size = 0
  166. dsize = 0
  167. try:
  168. for key, chunksize, _ in item[b'chunks']:
  169. size += chunksize
  170. if self.accounted_chunks.get(key, inode) == inode:
  171. self.accounted_chunks[key] = inode
  172. dsize += chunksize
  173. except KeyError:
  174. pass
  175. entry = llfuse.EntryAttributes()
  176. entry.st_ino = inode
  177. entry.generation = 0
  178. entry.entry_timeout = 300
  179. entry.attr_timeout = 300
  180. entry.st_mode = item[b'mode']
  181. entry.st_nlink = item.get(b'nlink', 1)
  182. entry.st_uid = item[b'uid']
  183. entry.st_gid = item[b'gid']
  184. entry.st_rdev = item.get(b'rdev', 0)
  185. entry.st_size = size
  186. entry.st_blksize = 512
  187. entry.st_blocks = dsize / 512
  188. # note: older archives only have mtime (not atime nor ctime)
  189. if have_fuse_xtime_ns:
  190. entry.st_mtime_ns = bigint_to_int(item[b'mtime'])
  191. if b'atime' in item:
  192. entry.st_atime_ns = bigint_to_int(item[b'atime'])
  193. else:
  194. entry.st_atime_ns = bigint_to_int(item[b'mtime'])
  195. if b'ctime' in item:
  196. entry.st_ctime_ns = bigint_to_int(item[b'ctime'])
  197. else:
  198. entry.st_ctime_ns = bigint_to_int(item[b'mtime'])
  199. else:
  200. entry.st_mtime = bigint_to_int(item[b'mtime']) / 1e9
  201. if b'atime' in item:
  202. entry.st_atime = bigint_to_int(item[b'atime']) / 1e9
  203. else:
  204. entry.st_atime = bigint_to_int(item[b'mtime']) / 1e9
  205. if b'ctime' in item:
  206. entry.st_ctime = bigint_to_int(item[b'ctime']) / 1e9
  207. else:
  208. entry.st_ctime = bigint_to_int(item[b'mtime']) / 1e9
  209. return entry
  210. def listxattr(self, inode, ctx=None):
  211. item = self.get_item(inode)
  212. return item.get(b'xattrs', {}).keys()
  213. def getxattr(self, inode, name, ctx=None):
  214. item = self.get_item(inode)
  215. try:
  216. return item.get(b'xattrs', {})[name]
  217. except KeyError:
  218. raise llfuse.FUSEError(llfuse.ENOATTR) from None
  219. def _load_pending_archive(self, inode):
  220. # Check if this is an archive we need to load
  221. archive = self.pending_archives.pop(inode, None)
  222. if archive:
  223. self.process_archive(archive, [os.fsencode(archive.name)])
  224. def lookup(self, parent_inode, name, ctx=None):
  225. self._load_pending_archive(parent_inode)
  226. if name == b'.':
  227. inode = parent_inode
  228. elif name == b'..':
  229. inode = self.parent[parent_inode]
  230. else:
  231. inode = self.contents[parent_inode].get(name)
  232. if not inode:
  233. raise llfuse.FUSEError(errno.ENOENT)
  234. return self.getattr(inode)
  235. def open(self, inode, flags, ctx=None):
  236. if not self.allow_damaged_files:
  237. item = self.get_item(inode)
  238. if b'chunks_healthy' in item:
  239. # Processed archive items don't carry the path anymore; for converting the inode
  240. # to the path we'd either have to store the inverse of the current structure,
  241. # or search the entire archive. So we just don't print it. It's easy to correlate anyway.
  242. logger.warning('File has damaged (all-zero) chunks. Try running borg check --repair. '
  243. 'Mount with allow_damaged_files to read damaged files.')
  244. raise llfuse.FUSEError(errno.EIO)
  245. return inode
  246. def opendir(self, inode, ctx=None):
  247. self._load_pending_archive(inode)
  248. return inode
  249. def read(self, fh, offset, size):
  250. parts = []
  251. item = self.get_item(fh)
  252. for id, s, csize in item[b'chunks']:
  253. if s < offset:
  254. offset -= s
  255. continue
  256. n = min(size, s - offset)
  257. if id in self.data_cache:
  258. data = self.data_cache[id]
  259. if offset + n == len(data):
  260. # evict fully read chunk from cache
  261. del self.data_cache[id]
  262. else:
  263. data = self.key.decrypt(id, self.repository.get(id))
  264. if offset + n < len(data):
  265. # chunk was only partially read, cache it
  266. self.data_cache[id] = data
  267. parts.append(data[offset:offset + n])
  268. offset = 0
  269. size -= n
  270. if not size:
  271. break
  272. return b''.join(parts)
  273. def readdir(self, fh, off):
  274. entries = [(b'.', fh), (b'..', self.parent[fh])]
  275. entries.extend(self.contents[fh].items())
  276. for i, (name, inode) in enumerate(entries[off:], off):
  277. yield name, self.getattr(inode), i + 1
  278. def readlink(self, inode, ctx=None):
  279. item = self.get_item(inode)
  280. return os.fsencode(item[b'source'])