Browse Source

fuse: implement versions view

all archives, all items are read to build a unified view.
files are represented by a same-name directory with the versions of the file.
A filename suffix computed by adler32(chunkids) is used to disambiguate the versions.

also: refactor code a little, create methods for leaves, inner nodes.
Thomas Waldmann 9 years ago
parent
commit
1f04820d9d
5 changed files with 105 additions and 33 deletions
  1. 5 1
      docs/changes.rst
  2. 11 2
      docs/usage.rst
  3. 2 0
      src/borg/archiver.py
  4. 72 30
      src/borg/fuse.py
  5. 15 0
      src/borg/testsuite/archiver.py

+ 5 - 1
docs/changes.rst

@@ -108,6 +108,11 @@ New features:
 - borg info:
 
   - output is now more similar to borg create --stats, #977
+- borg mount:
+
+  - provide "borgfs" wrapper for borg mount, enables usage via fstab, #743
+  - "versions" mount option - when used with a repository mount, this gives
+    a merged, versioned view of the files in all archives, #729
 - repository:
 
   - added progress information to commit/compaction phase (often takes some time when deleting/pruning), #1519
@@ -123,7 +128,6 @@ New features:
 - options that imply output (--show-rc, --show-version, --list, --stats,
   --progress) don't need -v/--info to have that output displayed, #865
 - add archive comments (via borg (re)create --comment), #842
-- provide "borgfs" wrapper for borg mount, enables usage via fstab, #743
 - borg list/prune/delete: also output archive id, #731
 - --show-version: shows/logs the borg version, #725
 - added --debug-topic for granular debug logging, #1447

+ 11 - 2
docs/usage.rst

@@ -499,8 +499,8 @@ Examples
 
 Examples
 ~~~~~~~~
-borg mount/borgfs
-+++++++++++++++++
+borg mount
+++++++++++
 ::
 
     $ borg mount /path/to/repo::root-2016-02-15 /tmp/mymountpoint
@@ -508,6 +508,15 @@ borg mount/borgfs
     bin  boot  etc	home  lib  lib64  lost+found  media  mnt  opt  root  sbin  srv  tmp  usr  var
     $ fusermount -u /tmp/mymountpoint
 
+::
+
+    $ borg mount -o versions /path/to/repo /tmp/mymountpoint
+    $ ls -l /tmp/mymountpoint/home/user/doc.txt/
+    total 24
+    -rw-rw-r-- 1 user group 12357 Aug 26 21:19 doc.txt.cda00bc9
+    -rw-rw-r-- 1 user group 12204 Aug 26 21:04 doc.txt.fa760f28
+    $ fusermount -u /tmp/mymountpoint
+
 borgfs
 ++++++
 ::

+ 2 - 0
src/borg/archiver.py

@@ -1835,6 +1835,8 @@ class Archiver:
         For mount options, see the fuse(8) manual page. Additional mount options
         supported by borg:
 
+        - versions: when used with a repository mount, this gives a merged, versioned
+          view of the files in the archives. EXPERIMENTAL, layout may change in future.
         - allow_damaged_files: by default damaged files (where missing chunks were
           replaced with runs of zeros by borg check --repair) are not readable and
           return EIO (I/O error). Set this option to read such files.

+ 72 - 30
src/borg/fuse.py

@@ -6,12 +6,12 @@ import tempfile
 import time
 from collections import defaultdict
 from distutils.version import LooseVersion
+from zlib import adler32
 
 import llfuse
 import msgpack
 
 from .logger import create_logger
-from .lrucache import LRUCache
 logger = create_logger()
 
 from .archive import Archive
@@ -51,14 +51,18 @@ class ItemCache:
 class FuseOperations(llfuse.Operations):
     """Export archive as a fuse filesystem
     """
-
+    # mount options
     allow_damaged_files = False
+    versions = False
 
     def __init__(self, key, repository, manifest, archive, cached_repo):
         super().__init__()
-        self._inode_count = 0
-        self.key = key
+        self.repository_uncached = repository
         self.repository = cached_repo
+        self.archive = archive
+        self.manifest = manifest
+        self.key = key
+        self._inode_count = 0
         self.items = {}
         self.parent = {}
         self.contents = defaultdict(dict)
@@ -69,15 +73,22 @@ class FuseOperations(llfuse.Operations):
         data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
         logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
         self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
+
+    def _create_filesystem(self):
         self._create_dir(parent=1)  # first call, create root dir (inode == 1)
-        if archive:
-            self.process_archive(archive)
+        if self.archive:
+            self.process_archive(self.archive)
         else:
-            for name in manifest.archives:
-                # Create archive placeholder inode
-                archive_inode = self._create_dir(parent=1)
-                self.contents[1][os.fsencode(name)] = archive_inode
-                self.pending_archives[archive_inode] = Archive(repository, key, manifest, name)
+            for name in self.manifest.archives:
+                archive = Archive(self.repository_uncached, self.key, self.manifest, name)
+                if self.versions:
+                    # process archives immediately
+                    self.process_archive(archive)
+                else:
+                    # lazy load archives, create archive placeholder inode
+                    archive_inode = self._create_dir(parent=1)
+                    self.contents[1][os.fsencode(name)] = archive_inode
+                    self.pending_archives[archive_inode] = archive
 
     def mount(self, mountpoint, mount_options, foreground=False):
         """Mount filesystem on *mountpoint* with *mount_options*."""
@@ -89,6 +100,12 @@ class FuseOperations(llfuse.Operations):
             self.allow_damaged_files = True
         except ValueError:
             pass
+        try:
+            options.remove('versions')
+            self.versions = True
+        except ValueError:
+            pass
+        self._create_filesystem()
         llfuse.init(self, mountpoint, options)
         if not foreground:
             daemonize()
@@ -122,11 +139,16 @@ class FuseOperations(llfuse.Operations):
             unpacker.feed(data)
             for item in unpacker:
                 item = Item(internal_dict=item)
+                is_dir = stat.S_ISDIR(item.mode)
                 try:
                     # This can happen if an archive was created with a command line like
                     # $ borg create ... dir1/file dir1
                     # In this case the code below will have created a default_dir inode for dir1 already.
-                    inode = self._find_inode(safe_encode(item.path), prefix)
+                    path = safe_encode(item.path)
+                    if not is_dir:
+                        # not a directory -> no lookup needed
+                        raise KeyError
+                    inode = self._find_inode(path, prefix)
                 except KeyError:
                     pass
                 else:
@@ -137,25 +159,46 @@ class FuseOperations(llfuse.Operations):
                 num_segments = len(segments)
                 parent = 1
                 for i, segment in enumerate(segments, 1):
-                    # Leaf segment?
                     if i == num_segments:
-                        if 'source' in item and stat.S_ISREG(item.mode):
-                            inode = self._find_inode(item.source, prefix)
-                            item = self.cache.get(inode)
-                            item.nlink = item.get('nlink', 1) + 1
-                            self.items[inode] = item
-                        else:
-                            inode = self.cache.add(item)
-                        self.parent[inode] = parent
-                        if segment:
-                            self.contents[parent][segment] = inode
-                    elif segment in self.contents[parent]:
-                        parent = self.contents[parent][segment]
+                        self.process_leaf(segment, item, parent, prefix, is_dir)
                     else:
-                        inode = self._create_dir(parent)
-                        if segment:
-                            self.contents[parent][segment] = inode
-                        parent = inode
+                        parent = self.process_inner(segment, parent)
+
+    def process_leaf(self, name, item, parent, prefix, is_dir):
+        def version_name(name, item):
+            if 'chunks' in item:
+                ident = 0
+                for chunkid, _, _ in item.chunks:
+                    ident = adler32(chunkid, ident)
+                name = name + safe_encode('.%08x' % ident)
+            return name
+
+        if self.versions and not is_dir:
+            parent = self.process_inner(name, parent)
+            name = version_name(name, item)
+        self.process_real_leaf(name, item, parent, prefix)
+
+    def process_real_leaf(self, name, item, parent, prefix):
+        if 'source' in item and stat.S_ISREG(item.mode):
+            inode = self._find_inode(item.source, prefix)
+            item = self.cache.get(inode)
+            item.nlink = item.get('nlink', 1) + 1
+            self.items[inode] = item
+        else:
+            inode = self.cache.add(item)
+        self.parent[inode] = parent
+        if name:
+            self.contents[parent][name] = inode
+
+    def process_inner(self, name, parent):
+        if name in self.contents[parent]:
+            parent = self.contents[parent][name]
+        else:
+            inode = self._create_dir(parent)
+            if name:
+                self.contents[parent][name] = inode
+            parent = inode
+        return parent
 
     def allocate_inode(self):
         self._inode_count += 1
@@ -280,7 +323,6 @@ class FuseOperations(llfuse.Operations):
                     # evict fully read chunk from cache
                     del self.data_cache[id]
             else:
-                # XXX
                 _, data = self.key.decrypt(id, self.repository.get(id))
                 if offset + n < len(data):
                     # chunk was only partially read, cache it

+ 15 - 0
src/borg/testsuite/archiver.py

@@ -1441,6 +1441,21 @@ class ArchiverTestCase(ArchiverTestCaseBase):
                 sto = os.stat(out_fn)
                 assert stat.S_ISFIFO(sto.st_mode)
 
+    @unittest.skipUnless(has_llfuse, 'llfuse not installed')
+    def test_fuse_versions_view(self):
+        self.cmd('init', self.repository_location)
+        self.create_regular_file('test', contents=b'first')
+        self.cmd('create', self.repository_location + '::archive1', 'input')
+        self.create_regular_file('test', contents=b'second')
+        self.cmd('create', self.repository_location + '::archive2', 'input')
+        mountpoint = os.path.join(self.tmpdir, 'mountpoint')
+        # mount the whole repository, archive contents shall show up in versioned view:
+        with self.fuse_mount(self.repository_location, mountpoint, 'versions'):
+            path = os.path.join(mountpoint, 'input', 'test')  # filename shows up as directory ...
+            files = os.listdir(path)
+            assert all(f.startswith('test.') for f in files)  # ... with files test.xxxxxxxx in there
+            assert {b'first', b'second'} == {open(os.path.join(path, f), 'rb').read() for f in files}
+
     @unittest.skipUnless(has_llfuse, 'llfuse not installed')
     def test_fuse_allow_damaged_files(self):
         self.cmd('init', self.repository_location)