瀏覽代碼

Merge pull request #2676 from enkore/f/fusecache

fuse: redo ItemCache
enkore 8 年之前
父節點
當前提交
b6a4cf19bc
共有 4 個文件被更改,包括 212 次插入47 次删除
  1. 1 1
      src/borg/archiver.py
  2. 197 46
      src/borg/fuse.py
  3. 11 0
      src/borg/lrucache.py
  4. 3 0
      src/borg/testsuite/lrucache.py

+ 1 - 1
src/borg/archiver.py

@@ -1279,7 +1279,7 @@ class Archiver:
     def _do_mount(self, args, repository, manifest, key):
     def _do_mount(self, args, repository, manifest, key):
         from .fuse import FuseOperations
         from .fuse import FuseOperations
 
 
-        with cache_if_remote(repository) as cached_repo:
+        with cache_if_remote(repository, decrypted_cache=key) as cached_repo:
             operations = FuseOperations(key, repository, manifest, args, cached_repo)
             operations = FuseOperations(key, repository, manifest, args, cached_repo)
             logger.info("Mounting filesystem")
             logger.info("Mounting filesystem")
             try:
             try:

+ 197 - 46
src/borg/fuse.py

@@ -2,6 +2,7 @@ import errno
 import io
 import io
 import os
 import os
 import stat
 import stat
+import struct
 import sys
 import sys
 import tempfile
 import tempfile
 import time
 import time
@@ -35,22 +36,168 @@ else:
 
 
 
 
 class ItemCache:
 class ItemCache:
-    def __init__(self):
-        self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
+    """
+    This is the "meat" of the file system's metadata storage.
+
+    This class generates inode numbers that efficiently index items in archives,
+    and retrieves items from these inode numbers.
+    """
+
+    # 2 MiB are approximately ~230000 items (depends on the average number of items per metadata chunk).
+    #
+    # Since growing a bytearray has to copy it, growing it will converge to O(n^2), however,
+    # this is not yet relevant due to the swiftness of copying memory. If it becomes an issue,
+    # use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need
+    # to resize it in the first place; that's free).
+    GROW_META_BY = 2 * 1024 * 1024
+
+    indirect_entry_struct = struct.Struct('=cII')
+    assert indirect_entry_struct.size == 9
+
+    def __init__(self, decrypted_repository):
+        self.decrypted_repository = decrypted_repository
+        # self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
+        # It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
+        # unices did this).
+        # The meta-array contains chunk IDs and item entries (described in iter_archive_items).
+        # The chunk IDs are referenced by item entries through relative offsets,
+        # which are bounded by the metadata chunk size.
+        self.meta = bytearray()
+        # The current write offset in self.meta
+        self.write_offset = 0
+
+        # Offset added to meta-indices, resulting in inodes,
+        # or subtracted from inodes, resulting in meta-indices.
+        # XXX: Merge FuseOperations.items and ItemCache to avoid
+        #      this implicit limitation / hack (on the number of synthetic inodes, degenerate
+        #      cases can inflate their number far beyond the number of archives).
         self.offset = 1000000
         self.offset = 1000000
 
 
-    def add(self, item):
-        pos = self.fd.seek(0, io.SEEK_END)
-        self.fd.write(msgpack.packb(item.as_dict()))
-        return pos + self.offset
+        # A temporary file that contains direct items, i.e. items directly cached in this layer.
+        # These are items that span more than one chunk and thus cannot be efficiently cached
+        # by the object cache (self.decrypted_repository), which would require variable-length structures;
+        # possible but not worth the effort, see iter_archive_items.
+        self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
+
+        # A small LRU cache for chunks requested by ItemCache.get() from the object cache,
+        # this significantly speeds up directory traversal and similar operations which
+        # tend to re-read the same chunks over and over.
+        # The capacity is kept low because increasing it does not provide any significant advantage,
+        # but makes LRUCache's square behaviour noticeable and consumes more memory.
+        self.chunks = LRUCache(capacity=10, dispose=lambda _: None)
+
+        # Instrumentation
+        # Count of indirect items, i.e. data is cached in the object cache, not directly in this cache
+        self.indirect_items = 0
+        # Count of direct items, i.e. data is in self.fd
+        self.direct_items = 0
 
 
     def get(self, inode):
     def get(self, inode):
         offset = inode - self.offset
         offset = inode - self.offset
         if offset < 0:
         if offset < 0:
             raise ValueError('ItemCache.get() called with an invalid inode number')
             raise ValueError('ItemCache.get() called with an invalid inode number')
-        self.fd.seek(offset, io.SEEK_SET)
-        item = next(msgpack.Unpacker(self.fd, read_size=1024))
-        return Item(internal_dict=item)
+        if self.meta[offset] == ord(b'I'):
+            _, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset)
+            chunk_id_offset = offset - chunk_id_relative_offset
+            # bytearray slices are bytearrays as well, explicitly convert to bytes()
+            chunk_id = bytes(self.meta[chunk_id_offset:chunk_id_offset + 32])
+            chunk = self.chunks.get(chunk_id)
+            if not chunk:
+                csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
+                self.chunks[chunk_id] = chunk
+            data = memoryview(chunk)[chunk_offset:]
+            unpacker = msgpack.Unpacker()
+            unpacker.feed(data)
+            return Item(internal_dict=next(unpacker))
+        elif self.meta[offset] == ord(b'S'):
+            fd_offset = int.from_bytes(self.meta[offset + 1:offset + 9], 'little')
+            self.fd.seek(fd_offset, io.SEEK_SET)
+            return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
+        else:
+            raise ValueError('Invalid entry type in self.meta')
+
+    def iter_archive_items(self, archive_item_ids):
+        unpacker = msgpack.Unpacker()
+
+        # Current offset in the metadata stream, which consists of all metadata chunks glued together
+        stream_offset = 0
+        # Offset of the current chunk in the metadata stream
+        chunk_begin = 0
+        # Length of the chunk preciding the current chunk
+        last_chunk_length = 0
+        msgpacked_bytes = b''
+
+        write_offset = self.write_offset
+        meta = self.meta
+        pack_indirect_into = self.indirect_entry_struct.pack_into
+
+        def write_bytes(append_msgpacked_bytes):
+            # XXX: Future versions of msgpack include an Unpacker.tell() method that provides this for free.
+            nonlocal msgpacked_bytes
+            nonlocal stream_offset
+            msgpacked_bytes += append_msgpacked_bytes
+            stream_offset += len(append_msgpacked_bytes)
+
+        for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
+            # Store the chunk ID in the meta-array
+            if write_offset + 32 >= len(meta):
+                self.meta = meta = meta + bytes(self.GROW_META_BY)
+            meta[write_offset:write_offset + 32] = key
+            current_id_offset = write_offset
+            write_offset += 32
+
+            # The chunk boundaries cannot be tracked through write_bytes, because the unpack state machine
+            # *can* and *will* consume partial items, so calls to write_bytes are unrelated to chunk boundaries.
+            chunk_begin += last_chunk_length
+            last_chunk_length = len(data)
+
+            unpacker.feed(data)
+            while True:
+                try:
+                    item = unpacker.unpack(write_bytes)
+                except msgpack.OutOfData:
+                    # Need more data, feed the next chunk
+                    break
+
+                current_item = msgpacked_bytes
+                current_item_length = len(current_item)
+                current_spans_chunks = stream_offset - current_item_length < chunk_begin
+                msgpacked_bytes = b''
+
+                if write_offset + 9 >= len(meta):
+                    self.meta = meta = meta + bytes(self.GROW_META_BY)
+
+                # item entries in the meta-array come in two different flavours, both nine bytes long.
+                # (1) for items that span chunks:
+                #
+                #     'S' + 8 byte offset into the self.fd file, where the msgpacked item starts.
+                #
+                # (2) for items that are completely contained in one chunk, which usually is the great majority
+                #     (about 700:1 for system backups)
+                #
+                #     'I' + 4 byte offset where the chunk ID is + 4 byte offset in the chunk
+                #     where the msgpacked items starts
+                #
+                #     The chunk ID offset is the number of bytes _back_ from the start of the entry, i.e.:
+                #
+                #     |Chunk ID| ....          |S1234abcd|
+                #      ^------ offset ----------^
+
+                if current_spans_chunks:
+                    pos = self.fd.seek(0, io.SEEK_END)
+                    self.fd.write(current_item)
+                    meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
+                    self.direct_items += 1
+                else:
+                    item_offset = stream_offset - current_item_length - chunk_begin
+                    pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset)
+                    self.indirect_items += 1
+                inode = write_offset + self.offset
+                write_offset += 9
+
+                yield inode, Item(internal_dict=item)
+
+        self.write_offset = write_offset
 
 
 
 
 class FuseOperations(llfuse.Operations):
 class FuseOperations(llfuse.Operations):
@@ -60,22 +207,30 @@ class FuseOperations(llfuse.Operations):
     allow_damaged_files = False
     allow_damaged_files = False
     versions = False
     versions = False
 
 
-    def __init__(self, key, repository, manifest, args, cached_repo):
+    def __init__(self, key, repository, manifest, args, decrypted_repository):
         super().__init__()
         super().__init__()
         self.repository_uncached = repository
         self.repository_uncached = repository
-        self.repository = cached_repo
+        self.decrypted_repository = decrypted_repository
         self.args = args
         self.args = args
         self.manifest = manifest
         self.manifest = manifest
         self.key = key
         self.key = key
-        self._inode_count = 0
+        # Maps inode numbers to Item instances. This is used for synthetic inodes,
+        # i.e. file-system objects that are made up by FuseOperations and are not contained
+        # in the archives. For example archive directories or intermediate directories
+        # not contained in archives.
         self.items = {}
         self.items = {}
+        # _inode_count is the current count of synthetic inodes, i.e. those in self.items
+        self._inode_count = 0
+        # Maps inode numbers to the inode number of the parent
         self.parent = {}
         self.parent = {}
+        # Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers,
+        # i.e. this contains all dirents of everything that is mounted. (It becomes really big).
         self.contents = defaultdict(dict)
         self.contents = defaultdict(dict)
         self.default_uid = os.getuid()
         self.default_uid = os.getuid()
         self.default_gid = os.getgid()
         self.default_gid = os.getgid()
         self.default_dir = Item(mode=0o40755, mtime=int(time.time() * 1e9), uid=self.default_uid, gid=self.default_gid)
         self.default_dir = Item(mode=0o40755, mtime=int(time.time() * 1e9), uid=self.default_uid, gid=self.default_gid)
         self.pending_archives = {}
         self.pending_archives = {}
-        self.cache = ItemCache()
+        self.cache = ItemCache(decrypted_repository)
         data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
         data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
         logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
         logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
         self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
         self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
@@ -97,18 +252,19 @@ class FuseOperations(llfuse.Operations):
                     self.pending_archives[archive_inode] = archive_name
                     self.pending_archives[archive_inode] = archive_name
 
 
     def sig_info_handler(self, sig_no, stack):
     def sig_info_handler(self, sig_no, stack):
-        logger.debug('fuse: %d inodes, %d synth inodes, %d edges (%s)',
-                     self._inode_count, len(self.items), len(self.parent),
+        logger.debug('fuse: %d synth inodes, %d edges (%s)',
+                     self._inode_count, len(self.parent),
                      # getsizeof is the size of the dict itself; key and value are two small-ish integers,
                      # getsizeof is the size of the dict itself; key and value are two small-ish integers,
                      # which are shared due to code structure (this has been verified).
                      # which are shared due to code structure (this has been verified).
                      format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count)))
                      format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count)))
         logger.debug('fuse: %d pending archives', len(self.pending_archives))
         logger.debug('fuse: %d pending archives', len(self.pending_archives))
-        logger.debug('fuse: ItemCache %d entries, %s',
-                     self._inode_count - len(self.items),
+        logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s',
+                     self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items,
+                     format_file_size(sys.getsizeof(self.cache.meta)),
                      format_file_size(os.stat(self.cache.fd.fileno()).st_size))
                      format_file_size(os.stat(self.cache.fd.fileno()).st_size))
         logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity,
         logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity,
                      format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())))
                      format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())))
-        self.repository.log_instrumentation()
+        self.decrypted_repository.log_instrumentation()
 
 
     def mount(self, mountpoint, mount_options, foreground=False):
     def mount(self, mountpoint, mount_options, foreground=False):
         """Mount filesystem on *mountpoint* with *mount_options*."""
         """Mount filesystem on *mountpoint* with *mount_options*."""
@@ -158,36 +314,31 @@ class FuseOperations(llfuse.Operations):
         """
         """
         self.file_versions = {}  # for versions mode: original path -> version
         self.file_versions = {}  # for versions mode: original path -> version
         t0 = time.perf_counter()
         t0 = time.perf_counter()
-        unpacker = msgpack.Unpacker()
         archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name,
         archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name,
                           consider_part_files=self.args.consider_part_files)
                           consider_part_files=self.args.consider_part_files)
-        for key, chunk in zip(archive.metadata.items, self.repository.get_many(archive.metadata.items)):
-            data = self.key.decrypt(key, chunk)
-            unpacker.feed(data)
-            for item in unpacker:
-                item = Item(internal_dict=item)
-                path = os.fsencode(os.path.normpath(item.path))
-                is_dir = stat.S_ISDIR(item.mode)
-                if is_dir:
-                    try:
-                        # This can happen if an archive was created with a command line like
-                        # $ borg create ... dir1/file dir1
-                        # In this case the code below will have created a default_dir inode for dir1 already.
-                        inode = self._find_inode(path, prefix)
-                    except KeyError:
-                        pass
-                    else:
-                        self.items[inode] = item
-                        continue
-                segments = prefix + path.split(b'/')
-                parent = 1
-                for segment in segments[:-1]:
-                    parent = self.process_inner(segment, parent)
-                self.process_leaf(segments[-1], item, parent, prefix, is_dir)
+        for item_inode, item in self.cache.iter_archive_items(archive.metadata.items):
+            path = os.fsencode(item.path)
+            is_dir = stat.S_ISDIR(item.mode)
+            if is_dir:
+                try:
+                    # This can happen if an archive was created with a command line like
+                    # $ borg create ... dir1/file dir1
+                    # In this case the code below will have created a default_dir inode for dir1 already.
+                    inode = self._find_inode(path, prefix)
+                except KeyError:
+                    pass
+                else:
+                    self.items[inode] = item
+                    continue
+            segments = prefix + path.split(b'/')
+            parent = 1
+            for segment in segments[:-1]:
+                parent = self.process_inner(segment, parent)
+            self.process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode)
         duration = time.perf_counter() - t0
         duration = time.perf_counter() - t0
         logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name)
         logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name)
 
 
-    def process_leaf(self, name, item, parent, prefix, is_dir):
+    def process_leaf(self, name, item, parent, prefix, is_dir, item_inode):
         def file_version(item):
         def file_version(item):
             if 'chunks' in item:
             if 'chunks' in item:
                 ident = 0
                 ident = 0
@@ -208,14 +359,14 @@ class FuseOperations(llfuse.Operations):
             if version is not None:
             if version is not None:
                 # regular file, with contents - maybe a hardlink master
                 # regular file, with contents - maybe a hardlink master
                 name = make_versioned_name(name, version)
                 name = make_versioned_name(name, version)
-                path = os.fsencode(os.path.normpath(item.path))
+                path = os.fsencode(item.path)
                 self.file_versions[path] = version
                 self.file_versions[path] = version
 
 
         path = item.path
         path = item.path
         del item.path  # safe some space
         del item.path  # safe some space
         if 'source' in item and hardlinkable(item.mode):
         if 'source' in item and hardlinkable(item.mode):
             # a hardlink, no contents, <source> is the hardlink master
             # a hardlink, no contents, <source> is the hardlink master
-            source = os.fsencode(os.path.normpath(item.source))
+            source = os.fsencode(item.source)
             if self.versions:
             if self.versions:
                 # adjust source name with version
                 # adjust source name with version
                 version = self.file_versions[source]
                 version = self.file_versions[source]
@@ -230,7 +381,7 @@ class FuseOperations(llfuse.Operations):
             item.nlink = item.get('nlink', 1) + 1
             item.nlink = item.get('nlink', 1) + 1
             self.items[inode] = item
             self.items[inode] = item
         else:
         else:
-            inode = self.cache.add(item)
+            inode = item_inode
         self.parent[inode] = parent
         self.parent[inode] = parent
         if name:
         if name:
             self.contents[parent][name] = inode
             self.contents[parent][name] = inode

+ 11 - 0
src/borg/lrucache.py

@@ -1,3 +1,6 @@
+sentinel = object()
+
+
 class LRUCache:
 class LRUCache:
     def __init__(self, capacity, dispose):
     def __init__(self, capacity, dispose):
         self._cache = {}
         self._cache = {}
@@ -28,6 +31,14 @@ class LRUCache:
     def __contains__(self, key):
     def __contains__(self, key):
         return key in self._cache
         return key in self._cache
 
 
+    def get(self, key, default=None):
+        value = self._cache.get(key, sentinel)
+        if value is sentinel:
+            return default
+        self._lru.remove(key)
+        self._lru.append(key)
+        return value
+
     def clear(self):
     def clear(self):
         for value in self._cache.values():
         for value in self._cache.values():
             self._dispose(value)
             self._dispose(value)

+ 3 - 0
src/borg/testsuite/lrucache.py

@@ -19,7 +19,10 @@ class TestLRUCache:
         assert 'b' in c
         assert 'b' in c
         with pytest.raises(KeyError):
         with pytest.raises(KeyError):
             c['a']
             c['a']
+        assert c.get('a') is None
+        assert c.get('a', 'foo') == 'foo'
         assert c['b'] == 1
         assert c['b'] == 1
+        assert c.get('b') == 1
         assert c['c'] == 2
         assert c['c'] == 2
         c['d'] = 3
         c['d'] = 3
         assert len(c) == 2
         assert len(c) == 2