2
0

fuse.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. from collections import defaultdict
  2. import errno
  3. import io
  4. import llfuse
  5. import os
  6. import stat
  7. import tempfile
  8. import time
  9. from signal import SIGINT
  10. from distutils.version import LooseVersion
  11. import msgpack
  12. from .archive import Archive
  13. from .helpers import daemonize, bigint_to_int
  14. from .logger import create_logger
  15. from .lrucache import LRUCache
  16. logger = create_logger()
  17. # Does this version of llfuse support ns precision?
  18. have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
  19. fuse_version = LooseVersion(getattr(llfuse, '__version__', '0.1'))
  20. if fuse_version >= '0.42':
  21. def fuse_main():
  22. return llfuse.main(workers=1)
  23. else:
  24. def fuse_main():
  25. llfuse.main(single=True)
  26. return None
  27. class ItemCache:
  28. def __init__(self):
  29. self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
  30. self.offset = 1000000
  31. def add(self, item):
  32. pos = self.fd.seek(0, io.SEEK_END)
  33. self.fd.write(msgpack.packb(item))
  34. return pos + self.offset
  35. def get(self, inode):
  36. self.fd.seek(inode - self.offset, io.SEEK_SET)
  37. return next(msgpack.Unpacker(self.fd, read_size=1024))
  38. class FuseOperations(llfuse.Operations):
  39. """Export archive as a fuse filesystem
  40. """
  41. allow_damaged_files = False
  42. def __init__(self, key, repository, manifest, archive, cached_repo):
  43. super().__init__()
  44. self._inode_count = 0
  45. self.key = key
  46. self.repository = cached_repo
  47. self.items = {}
  48. self.parent = {}
  49. self.contents = defaultdict(dict)
  50. self.default_uid = os.getuid()
  51. self.default_gid = os.getgid()
  52. self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9),
  53. b'uid': self.default_uid, b'gid': self.default_gid}
  54. self.pending_archives = {}
  55. self.accounted_chunks = {}
  56. self.cache = ItemCache()
  57. data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
  58. logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
  59. self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
  60. self._create_dir(parent=1) # first call, create root dir (inode == 1)
  61. if archive:
  62. self.process_archive(archive)
  63. else:
  64. for archive_name in manifest.archives:
  65. # Create archive placeholder inode
  66. archive_inode = self._create_dir(parent=1)
  67. self.contents[1][os.fsencode(archive_name)] = archive_inode
  68. self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
  69. def mount(self, mountpoint, mount_options, foreground=False):
  70. """Mount filesystem on *mountpoint* with *mount_options*."""
  71. options = ['fsname=borgfs', 'ro']
  72. if mount_options:
  73. options.extend(mount_options.split(','))
  74. try:
  75. options.remove('allow_damaged_files')
  76. self.allow_damaged_files = True
  77. except ValueError:
  78. pass
  79. llfuse.init(self, mountpoint, options)
  80. if not foreground:
  81. daemonize()
  82. # If the file system crashes, we do not want to umount because in that
  83. # case the mountpoint suddenly appears to become empty. This can have
  84. # nasty consequences, imagine the user has e.g. an active rsync mirror
  85. # job - seeing the mountpoint empty, rsync would delete everything in the
  86. # mirror.
  87. umount = False
  88. try:
  89. signal = fuse_main()
  90. # no crash and no signal (or it's ^C and we're in the foreground) -> umount request
  91. umount = (signal is None or (signal == SIGINT and foreground))
  92. finally:
  93. llfuse.close(umount)
  94. def _create_dir(self, parent):
  95. """Create directory
  96. """
  97. ino = self.allocate_inode()
  98. self.items[ino] = self.default_dir
  99. self.parent[ino] = parent
  100. return ino
  101. def process_archive(self, archive, prefix=[]):
  102. """Build fuse inode hierarchy from archive metadata
  103. """
  104. unpacker = msgpack.Unpacker()
  105. for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])):
  106. data = self.key.decrypt(key, chunk)
  107. unpacker.feed(data)
  108. for item in unpacker:
  109. try:
  110. # This can happen if an archive was created with a command line like
  111. # $ borg create ... dir1/file dir1
  112. # In this case the code below will have created a default_dir inode for dir1 already.
  113. inode = self._find_inode(item[b'path'], prefix)
  114. except KeyError:
  115. pass
  116. else:
  117. self.items[inode] = item
  118. continue
  119. path = item.pop(b'path')
  120. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  121. num_segments = len(segments)
  122. parent = 1
  123. for i, segment in enumerate(segments, 1):
  124. # Leaf segment?
  125. if i == num_segments:
  126. if b'source' in item and stat.S_ISREG(item[b'mode']):
  127. try:
  128. inode = self._find_inode(item[b'source'], prefix)
  129. except KeyError:
  130. file = path.decode(errors='surrogateescape')
  131. source = item[b'source'].decode(errors='surrogateescape')
  132. logger.warning('Skipping broken hard link: %s -> %s', file, source)
  133. continue
  134. item = self.cache.get(inode)
  135. item[b'nlink'] = item.get(b'nlink', 1) + 1
  136. self.items[inode] = item
  137. else:
  138. inode = self.cache.add(item)
  139. self.parent[inode] = parent
  140. if segment:
  141. self.contents[parent][segment] = inode
  142. elif segment in self.contents[parent]:
  143. parent = self.contents[parent][segment]
  144. else:
  145. inode = self._create_dir(parent)
  146. if segment:
  147. self.contents[parent][segment] = inode
  148. parent = inode
  149. def allocate_inode(self):
  150. self._inode_count += 1
  151. return self._inode_count
  152. def statfs(self, ctx=None):
  153. stat_ = llfuse.StatvfsData()
  154. stat_.f_bsize = 512
  155. stat_.f_frsize = 512
  156. stat_.f_blocks = 0
  157. stat_.f_bfree = 0
  158. stat_.f_bavail = 0
  159. stat_.f_files = 0
  160. stat_.f_ffree = 0
  161. stat_.f_favail = 0
  162. return stat_
  163. def get_item(self, inode):
  164. try:
  165. return self.items[inode]
  166. except KeyError:
  167. return self.cache.get(inode)
  168. def _find_inode(self, path, prefix=[]):
  169. segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/')
  170. inode = 1
  171. for segment in segments:
  172. inode = self.contents[inode][segment]
  173. return inode
  174. def getattr(self, inode, ctx=None):
  175. item = self.get_item(inode)
  176. size = 0
  177. dsize = 0
  178. try:
  179. for key, chunksize, _ in item[b'chunks']:
  180. size += chunksize
  181. if self.accounted_chunks.get(key, inode) == inode:
  182. self.accounted_chunks[key] = inode
  183. dsize += chunksize
  184. except KeyError:
  185. pass
  186. entry = llfuse.EntryAttributes()
  187. entry.st_ino = inode
  188. entry.generation = 0
  189. entry.entry_timeout = 300
  190. entry.attr_timeout = 300
  191. entry.st_mode = item[b'mode']
  192. entry.st_nlink = item.get(b'nlink', 1)
  193. entry.st_uid = item[b'uid'] if item[b'uid'] >= 0 else self.default_uid
  194. entry.st_gid = item[b'gid'] if item[b'gid'] >= 0 else self.default_gid
  195. entry.st_rdev = item.get(b'rdev', 0)
  196. entry.st_size = size
  197. entry.st_blksize = 512
  198. entry.st_blocks = (dsize + entry.st_blksize - 1) // entry.st_blksize
  199. # note: older archives only have mtime (not atime nor ctime)
  200. if have_fuse_xtime_ns:
  201. entry.st_mtime_ns = bigint_to_int(item[b'mtime'])
  202. if b'atime' in item:
  203. entry.st_atime_ns = bigint_to_int(item[b'atime'])
  204. else:
  205. entry.st_atime_ns = bigint_to_int(item[b'mtime'])
  206. if b'ctime' in item:
  207. entry.st_ctime_ns = bigint_to_int(item[b'ctime'])
  208. else:
  209. entry.st_ctime_ns = bigint_to_int(item[b'mtime'])
  210. else:
  211. entry.st_mtime = bigint_to_int(item[b'mtime']) / 1e9
  212. if b'atime' in item:
  213. entry.st_atime = bigint_to_int(item[b'atime']) / 1e9
  214. else:
  215. entry.st_atime = bigint_to_int(item[b'mtime']) / 1e9
  216. if b'ctime' in item:
  217. entry.st_ctime = bigint_to_int(item[b'ctime']) / 1e9
  218. else:
  219. entry.st_ctime = bigint_to_int(item[b'mtime']) / 1e9
  220. return entry
  221. def listxattr(self, inode, ctx=None):
  222. item = self.get_item(inode)
  223. return item.get(b'xattrs', {}).keys()
  224. def getxattr(self, inode, name, ctx=None):
  225. item = self.get_item(inode)
  226. try:
  227. return item.get(b'xattrs', {})[name] or b''
  228. except KeyError:
  229. raise llfuse.FUSEError(llfuse.ENOATTR) from None
  230. def _load_pending_archive(self, inode):
  231. # Check if this is an archive we need to load
  232. archive = self.pending_archives.pop(inode, None)
  233. if archive:
  234. self.process_archive(archive, [os.fsencode(archive.name)])
  235. def lookup(self, parent_inode, name, ctx=None):
  236. self._load_pending_archive(parent_inode)
  237. if name == b'.':
  238. inode = parent_inode
  239. elif name == b'..':
  240. inode = self.parent[parent_inode]
  241. else:
  242. inode = self.contents[parent_inode].get(name)
  243. if not inode:
  244. raise llfuse.FUSEError(errno.ENOENT)
  245. return self.getattr(inode)
  246. def open(self, inode, flags, ctx=None):
  247. if not self.allow_damaged_files:
  248. item = self.get_item(inode)
  249. if b'chunks_healthy' in item:
  250. # Processed archive items don't carry the path anymore; for converting the inode
  251. # to the path we'd either have to store the inverse of the current structure,
  252. # or search the entire archive. So we just don't print it. It's easy to correlate anyway.
  253. logger.warning('File has damaged (all-zero) chunks. Try running borg check --repair. '
  254. 'Mount with allow_damaged_files to read damaged files.')
  255. raise llfuse.FUSEError(errno.EIO)
  256. return inode
  257. def opendir(self, inode, ctx=None):
  258. self._load_pending_archive(inode)
  259. return inode
  260. def read(self, fh, offset, size):
  261. parts = []
  262. item = self.get_item(fh)
  263. for id, s, csize in item[b'chunks']:
  264. if s < offset:
  265. offset -= s
  266. continue
  267. n = min(size, s - offset)
  268. if id in self.data_cache:
  269. data = self.data_cache[id]
  270. if offset + n == len(data):
  271. # evict fully read chunk from cache
  272. del self.data_cache[id]
  273. else:
  274. data = self.key.decrypt(id, self.repository.get(id))
  275. if offset + n < len(data):
  276. # chunk was only partially read, cache it
  277. self.data_cache[id] = data
  278. parts.append(data[offset:offset + n])
  279. offset = 0
  280. size -= n
  281. if not size:
  282. break
  283. return b''.join(parts)
  284. def readdir(self, fh, off):
  285. entries = [(b'.', fh), (b'..', self.parent[fh])]
  286. entries.extend(self.contents[fh].items())
  287. for i, (name, inode) in enumerate(entries[off:], off):
  288. yield name, self.getattr(inode), i + 1
  289. def readlink(self, inode, ctx=None):
  290. item = self.get_item(inode)
  291. return os.fsencode(item[b'source'])