Przeglądaj źródła

fuse: Separate creation of filesystem from implementation of llfuse funcs (#3042)

fuse: Separate creation of filesystem from implementation of llfuse funcs
Simon Frei 8 lat temu
rodzic
commit
b148a366fe
1 zmienionych plików z 121 dodań i 114 usunięć
  1. 121 114
      src/borg/fuse.py

+ 121 - 114
src/borg/fuse.py

@@ -202,27 +202,21 @@ class ItemCache:
         self.write_offset = write_offset
 
 
-class FuseOperations(llfuse.Operations):
-    """Export archive as a FUSE filesystem
+class FuseBackend(object):
+    """Virtual filesystem based on archive(s) to provide information to fuse
     """
-    # mount options
-    allow_damaged_files = False
-    versions = False
 
-    def __init__(self, key, repository, manifest, args, decrypted_repository):
-        super().__init__()
+    def __init__(self, key, manifest, repository, args, decrypted_repository):
         self.repository_uncached = repository
-        self.decrypted_repository = decrypted_repository
-        self.args = args
-        self.manifest = manifest
+        self._args = args
+        self._manifest = manifest
         self.key = key
-        # Maps inode numbers to Item instances. This is used for synthetic inodes,
-        # i.e. file-system objects that are made up by FuseOperations and are not contained
-        # in the archives. For example archive directories or intermediate directories
+        # Maps inode numbers to Item instances. This is used for synthetic inodes, i.e. file-system objects that are
+        # made up and are not contained in the archives. For example archive directories or intermediate directories
         # not contained in archives.
-        self.items = {}
-        # _inode_count is the current count of synthetic inodes, i.e. those in self.items
-        self._inode_count = 0
+        self._items = {}
+        # _inode_count is the current count of synthetic inodes, i.e. those in self._items
+        self.inode_count = 0
         # Maps inode numbers to the inode number of the parent
         self.parent = {}
         # Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers,
@@ -231,100 +225,70 @@ class FuseOperations(llfuse.Operations):
         self.default_uid = os.getuid()
         self.default_gid = os.getgid()
         self.default_dir = Item(mode=0o40755, mtime=int(time.time() * 1e9), uid=self.default_uid, gid=self.default_gid)
+        # Archives to be loaded when first accessed, mapped by their placeholder inode
         self.pending_archives = {}
         self.cache = ItemCache(decrypted_repository)
-        data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
-        logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
-        self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
+        self.allow_damaged_files = False
+        self.versions = False
 
     def _create_filesystem(self):
         self._create_dir(parent=1)  # first call, create root dir (inode == 1)
-        if self.args.location.archive:
-            self.process_archive(self.args.location.archive)
+        if self._args.location.archive:
+            self._process_archive(self._args.location.archive)
         else:
             self.versions_index = FuseVersionsIndex()
-            for archive in self.manifest.archives.list_considering(self.args):
+            for archive in self._manifest.archives.list_considering(self._args):
                 if self.versions:
                     # process archives immediately
-                    self.process_archive(archive.name)
+                    self._process_archive(archive.name)
                 else:
                     # lazily load archives, create archive placeholder inode
                     archive_inode = self._create_dir(parent=1, mtime=int(archive.ts.timestamp() * 1e9))
                     self.contents[1][os.fsencode(archive.name)] = archive_inode
                     self.pending_archives[archive_inode] = archive.name
 
-    def sig_info_handler(self, sig_no, stack):
-        logger.debug('fuse: %d synth inodes, %d edges (%s)',
-                     self._inode_count, len(self.parent),
-                     # getsizeof is the size of the dict itself; key and value are two small-ish integers,
-                     # which are shared due to code structure (this has been verified).
-                     format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count)))
-        logger.debug('fuse: %d pending archives', len(self.pending_archives))
-        logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s',
-                     self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items,
-                     format_file_size(sys.getsizeof(self.cache.meta)),
-                     format_file_size(os.stat(self.cache.fd.fileno()).st_size))
-        logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity,
-                     format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())))
-        self.decrypted_repository.log_instrumentation()
-
-    def mount(self, mountpoint, mount_options, foreground=False):
-        """Mount filesystem on *mountpoint* with *mount_options*."""
-        options = ['fsname=borgfs', 'ro']
-        if mount_options:
-            options.extend(mount_options.split(','))
-        try:
-            options.remove('allow_damaged_files')
-            self.allow_damaged_files = True
-        except ValueError:
-            pass
+    def get_item(self, inode):
         try:
-            options.remove('versions')
-            self.versions = True
-        except ValueError:
-            pass
-        self._create_filesystem()
-        llfuse.init(self, mountpoint, options)
-        if not foreground:
-            old_id, new_id = daemonize()
-            if not isinstance(self.repository_uncached, RemoteRepository):
-                # local repo and the locking process' PID just changed, migrate it:
-                self.repository_uncached.migrate_lock(old_id, new_id)
+            return self._items[inode]
+        except KeyError:
+            return self.cache.get(inode)
 
-        # If the file system crashes, we do not want to umount because in that
-        # case the mountpoint suddenly appears to become empty. This can have
-        # nasty consequences, imagine the user has e.g. an active rsync mirror
-        # job - seeing the mountpoint empty, rsync would delete everything in the
-        # mirror.
-        umount = False
-        try:
-            with signal_handler('SIGUSR1', self.sig_info_handler), \
-                 signal_handler('SIGINFO', self.sig_info_handler):
-                signal = fuse_main()
-            # no crash and no signal (or it's ^C and we're in the foreground) -> umount request
-            umount = (signal is None or (signal == SIGINT and foreground))
-        finally:
-            llfuse.close(umount)
+    def check_pending_archive(self, inode):
+        # Check if this is an archive we need to load
+        archive_name = self.pending_archives.pop(inode, None)
+        if archive_name is not None:
+            self._process_archive(archive_name, [os.fsencode(archive_name)])
+
+    def _allocate_inode(self):
+        self.inode_count += 1
+        return self.inode_count
 
     def _create_dir(self, parent, mtime=None):
         """Create directory
         """
-        ino = self.allocate_inode()
+        ino = self._allocate_inode()
         if mtime is not None:
-            self.items[ino] = Item(**self.default_dir.as_dict())
-            self.items[ino].mtime = mtime
+            self._items[ino] = Item(**self.default_dir.as_dict())
+            self._items[ino].mtime = mtime
         else:
-            self.items[ino] = self.default_dir
+            self._items[ino] = self.default_dir
         self.parent[ino] = parent
         return ino
 
-    def process_archive(self, archive_name, prefix=[]):
+    def find_inode(self, path, prefix=[]):
+        segments = prefix + path.split(b'/')
+        inode = 1
+        for segment in segments:
+            inode = self.contents[inode][segment]
+        return inode
+
+    def _process_archive(self, archive_name, prefix=[]):
         """Build FUSE inode hierarchy from archive metadata
         """
         self.file_versions = {}  # for versions mode: original path -> version
         t0 = time.perf_counter()
-        archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name,
-                          consider_part_files=self.args.consider_part_files)
+        archive = Archive(self.repository_uncached, self.key, self._manifest, archive_name,
+                          consider_part_files=self._args.consider_part_files)
         for item_inode, item in self.cache.iter_archive_items(archive.metadata.items):
             path = os.fsencode(item.path)
             is_dir = stat.S_ISDIR(item.mode)
@@ -333,21 +297,21 @@ class FuseOperations(llfuse.Operations):
                     # This can happen if an archive was created with a command line like
                     # $ borg create ... dir1/file dir1
                     # In this case the code below will have created a default_dir inode for dir1 already.
-                    inode = self._find_inode(path, prefix)
+                    inode = self.find_inode(path, prefix)
                 except KeyError:
                     pass
                 else:
-                    self.items[inode] = item
+                    self._items[inode] = item
                     continue
             segments = prefix + path.split(b'/')
             parent = 1
             for segment in segments[:-1]:
-                parent = self.process_inner(segment, parent)
-            self.process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode)
+                parent = self._process_inner(segment, parent)
+            self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode)
         duration = time.perf_counter() - t0
-        logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name)
+        logger.debug('fuse: _process_archive completed in %.1f s for archive %s', duration, archive.name)
 
-    def process_leaf(self, name, item, parent, prefix, is_dir, item_inode):
+    def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode):
         def file_version(item, path):
             if 'chunks' in item:
                 file_id = blake2b_128(path)
@@ -373,7 +337,7 @@ class FuseOperations(llfuse.Operations):
             return name + version_enc + ext
 
         if self.versions and not is_dir:
-            parent = self.process_inner(name, parent)
+            parent = self._process_inner(name, parent)
             path = os.fsencode(item.path)
             version = file_version(item, path)
             if version is not None:
@@ -392,20 +356,20 @@ class FuseOperations(llfuse.Operations):
                 source = make_versioned_name(source, version, add_dir=True)
                 name = make_versioned_name(name, version)
             try:
-                inode = self._find_inode(source, prefix)
+                inode = self.find_inode(source, prefix)
             except KeyError:
                 logger.warning('Skipping broken hard link: %s -> %s', path, item.source)
                 return
             item = self.cache.get(inode)
             item.nlink = item.get('nlink', 1) + 1
-            self.items[inode] = item
+            self._items[inode] = item
         else:
             inode = item_inode
         self.parent[inode] = parent
         if name:
             self.contents[parent][name] = inode
 
-    def process_inner(self, name, parent_inode):
+    def _process_inner(self, name, parent_inode):
         dir = self.contents[parent_inode]
         if name in dir:
             inode = dir[name]
@@ -415,9 +379,71 @@ class FuseOperations(llfuse.Operations):
                 dir[name] = inode
         return inode
 
-    def allocate_inode(self):
-        self._inode_count += 1
-        return self._inode_count
+
+class FuseOperations(llfuse.Operations, FuseBackend):
+    """Export archive as a FUSE filesystem
+    """
+
+    def __init__(self, key, repository, manifest, args, decrypted_repository):
+        llfuse.Operations.__init__(self)
+        FuseBackend.__init__(self, key, manifest, repository, args, decrypted_repository)
+        self.decrypted_repository = decrypted_repository
+        data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
+        logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
+        self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
+
+    def sig_info_handler(self, sig_no, stack):
+        logger.debug('fuse: %d synth inodes, %d edges (%s)',
+                     self.inode_count, len(self.parent),
+                     # getsizeof is the size of the dict itself; key and value are two small-ish integers,
+                     # which are shared due to code structure (this has been verified).
+                     format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self.inode_count)))
+        logger.debug('fuse: %d pending archives', len(self.pending_archives))
+        logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s',
+                     self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items,
+                     format_file_size(sys.getsizeof(self.cache.meta)),
+                     format_file_size(os.stat(self.cache.fd.fileno()).st_size))
+        logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity,
+                     format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())))
+        self.decrypted_repository.log_instrumentation()
+
+    def mount(self, mountpoint, mount_options, foreground=False):
+        """Mount filesystem on *mountpoint* with *mount_options*."""
+        options = ['fsname=borgfs', 'ro']
+        if mount_options:
+            options.extend(mount_options.split(','))
+        try:
+            options.remove('allow_damaged_files')
+            self.allow_damaged_files = True
+        except ValueError:
+            pass
+        try:
+            options.remove('versions')
+            self.versions = True
+        except ValueError:
+            pass
+        self._create_filesystem()
+        llfuse.init(self, mountpoint, options)
+        if not foreground:
+            old_id, new_id = daemonize()
+            if not isinstance(self.repository_uncached, RemoteRepository):
+                # local repo and the locking process' PID just changed, migrate it:
+                self.repository_uncached.migrate_lock(old_id, new_id)
+
+        # If the file system crashes, we do not want to umount because in that
+        # case the mountpoint suddenly appears to become empty. This can have
+        # nasty consequences, imagine the user has e.g. an active rsync mirror
+        # job - seeing the mountpoint empty, rsync would delete everything in the
+        # mirror.
+        umount = False
+        try:
+            with signal_handler('SIGUSR1', self.sig_info_handler), \
+                 signal_handler('SIGINFO', self.sig_info_handler):
+                signal = fuse_main()
+            # no crash and no signal (or it's ^C and we're in the foreground) -> umount request
+            umount = (signal is None or (signal == SIGINT and foreground))
+        finally:
+            llfuse.close(umount)
 
     def statfs(self, ctx=None):
         stat_ = llfuse.StatvfsData()
@@ -431,19 +457,6 @@ class FuseOperations(llfuse.Operations):
         stat_.f_favail = 0
         return stat_
 
-    def get_item(self, inode):
-        try:
-            return self.items[inode]
-        except KeyError:
-            return self.cache.get(inode)
-
-    def _find_inode(self, path, prefix=[]):
-        segments = prefix + path.split(b'/')
-        inode = 1
-        for segment in segments:
-            inode = self.contents[inode][segment]
-        return inode
-
     def getattr(self, inode, ctx=None):
         item = self.get_item(inode)
         entry = llfuse.EntryAttributes()
@@ -482,14 +495,8 @@ class FuseOperations(llfuse.Operations):
         except KeyError:
             raise llfuse.FUSEError(llfuse.ENOATTR) from None
 
-    def _load_pending_archive(self, inode):
-        # Check if this is an archive we need to load
-        archive_name = self.pending_archives.pop(inode, None)
-        if archive_name:
-            self.process_archive(archive_name, [os.fsencode(archive_name)])
-
     def lookup(self, parent_inode, name, ctx=None):
-        self._load_pending_archive(parent_inode)
+        self.check_pending_archive(parent_inode)
         if name == b'.':
             inode = parent_inode
         elif name == b'..':
@@ -513,7 +520,7 @@ class FuseOperations(llfuse.Operations):
         return inode
 
     def opendir(self, inode, ctx=None):
-        self._load_pending_archive(inode)
+        self.check_pending_archive(inode)
         return inode
 
     def read(self, fh, offset, size):