浏览代码

Merge pull request #2648 from textshell/feature/mandatory-features-master

Add minimal version of in repository mandatory feature flags. (master)
textshell 8 年之前
父节点
当前提交
86363dcd4b
共有 7 个文件被更改,包括 299 次插入45 次删除
  1. 30 6
      conftest.py
  2. 2 0
      src/borg/__init__.py
  3. 1 1
      src/borg/archive.py
  4. 37 23
      src/borg/archiver.py
  5. 49 1
      src/borg/cache.py
  6. 66 2
      src/borg/helpers.py
  7. 114 12
      src/borg/testsuite/archiver.py

+ 30 - 6
conftest.py

@@ -2,10 +2,20 @@ import os
 
 import pytest
 
+# IMPORTANT keep this above all other borg imports to avoid inconsistent values
+# for `from borg.constants import PBKDF2_ITERATIONS` (or star import) usages before
+# this is executed
+from borg import constants
+# no fixture-based monkey-patching since star-imports are used for the constants module
+constants.PBKDF2_ITERATIONS = 1
+
+
 # needed to get pretty assertion failures in unit tests:
 if hasattr(pytest, 'register_assert_rewrite'):
     pytest.register_assert_rewrite('borg.testsuite')
 
+
+import borg.cache
 from borg.logger import setup_logging
 
 # Ensure that the loggers exist for all tests
@@ -14,12 +24,7 @@ setup_logging()
 from borg.testsuite import has_lchflags, has_llfuse
 from borg.testsuite import are_symlinks_supported, are_hardlinks_supported, is_utime_fully_supported
 from borg.testsuite.platform import fakeroot_detected, are_acls_working
-from borg import xattr, constants
-
-
-def pytest_configure(config):
-    # no fixture-based monkey-patching since star-imports are used for the constants module
-    constants.PBKDF2_ITERATIONS = 1
+from borg import xattr
 
 
 @pytest.fixture(autouse=True)
@@ -53,3 +58,22 @@ def pytest_report_header(config, startdir):
     output = "Tests enabled: " + ", ".join(enabled) + "\n"
     output += "Tests disabled: " + ", ".join(disabled)
     return output
+
+
+class DefaultPatches:
+    def __init__(self, request):
+        self.org_cache_wipe_cache = borg.cache.Cache.wipe_cache
+
+        def wipe_should_not_be_called(*a, **kw):
+            raise AssertionError("Cache wipe was triggered, if this is part of the test add @pytest.mark.allow_cache_wipe")
+        if 'allow_cache_wipe' not in request.keywords:
+            borg.cache.Cache.wipe_cache = wipe_should_not_be_called
+        request.addfinalizer(self.undo)
+
+    def undo(self):
+        borg.cache.Cache.wipe_cache = self.org_cache_wipe_cache
+
+
+@pytest.fixture(autouse=True)
+def default_patches(request):
+    return DefaultPatches(request)

+ 2 - 0
src/borg/__init__.py

@@ -1,5 +1,7 @@
 from distutils.version import LooseVersion
 
+# IMPORTANT keep imports from borg here to a minimum because our testsuite depends on
+# beeing able to import borg.constants and then monkey patching borg.constants.PBKDF2_ITERATIONS
 from ._version import version as __version__
 
 

+ 1 - 1
src/borg/archive.py

@@ -1152,7 +1152,7 @@ class ArchiveChecker:
             self.manifest = self.rebuild_manifest()
         else:
             try:
-                self.manifest, _ = Manifest.load(repository, key=self.key)
+                self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
             except IntegrityError as exc:
                 logger.error('Repository manifest is corrupted: %s', exc)
                 self.error_found = True

+ 37 - 23
src/borg/archiver.py

@@ -87,8 +87,9 @@ def argument(args, str_or_bool):
     return str_or_bool
 
 
-def with_repository(fake=False, invert_fake=False, create=False, lock=True, exclusive=False, manifest=True, cache=False,
-                    secure=True):
+def with_repository(fake=False, invert_fake=False, create=False, lock=True,
+                    exclusive=False, manifest=True, cache=False, secure=True,
+                    compatibility=None):
     """
     Method decorator for subcommand-handling methods: do_XYZ(self, args, repository, …)
 
@@ -100,7 +101,20 @@ def with_repository(fake=False, invert_fake=False, create=False, lock=True, excl
     :param manifest: load manifest and key, pass them as keyword arguments
     :param cache: open cache, pass it as keyword argument (implies manifest)
     :param secure: do assert_secure after loading manifest
+    :param compatibility: mandatory if not create and (manifest or cache), specifies mandatory feature categories to check
     """
+
+    if not create and (manifest or cache):
+        if compatibility is None:
+            raise AssertionError("with_repository decorator used without compatibility argument")
+        if type(compatibility) is not tuple:
+            raise AssertionError("with_repository decorator compatibility argument must be of type tuple")
+    else:
+        if compatibility is not None:
+            raise AssertionError("with_repository called with compatibility argument but would not check" + repr(compatibility))
+        if create:
+            compatibility = Manifest.NO_OPERATION_CHECK
+
     def decorator(method):
         @functools.wraps(method)
         def wrapper(self, args, **kwargs):
@@ -117,7 +131,7 @@ def with_repository(fake=False, invert_fake=False, create=False, lock=True, excl
                                         append_only=append_only)
             with repository:
                 if manifest or cache:
-                    kwargs['manifest'], kwargs['key'] = Manifest.load(repository)
+                    kwargs['manifest'], kwargs['key'] = Manifest.load(repository, compatibility)
                     if 'compression' in args:
                         kwargs['key'].compressor = args.compression.compressor
                     if secure:
@@ -276,7 +290,7 @@ class Archiver:
             return EXIT_WARNING
         return EXIT_SUCCESS
 
-    @with_repository()
+    @with_repository(compatibility=(Manifest.Operation.CHECK,))
     def do_change_passphrase(self, args, repository, manifest, key):
         """Change repository key file passphrase"""
         if not hasattr(key, 'change_passphrase'):
@@ -413,7 +427,7 @@ class Archiver:
             print(fmt % ('U', msg, total_size_MB / dt_update, count, file_size_formatted, content, dt_update))
             print(fmt % ('D', msg, total_size_MB / dt_delete, count, file_size_formatted, content, dt_delete))
 
-    @with_repository(fake='dry_run', exclusive=True)
+    @with_repository(fake='dry_run', exclusive=True, compatibility=(Manifest.Operation.WRITE,))
     def do_create(self, args, repository, manifest=None, key=None):
         """Create new archive"""
         matcher = PatternMatcher(fallback=True)
@@ -632,7 +646,7 @@ class Archiver:
                 return matched
         return item_filter
 
-    @with_repository()
+    @with_repository(compatibility=(Manifest.Operation.READ,))
     @with_archive
     def do_extract(self, args, repository, manifest, key, archive):
         """Extract archive contents"""
@@ -714,7 +728,7 @@ class Archiver:
             pi.finish()
         return self.exit_code
 
-    @with_repository()
+    @with_repository(compatibility=(Manifest.Operation.READ,))
     @with_archive
     def do_export_tar(self, args, repository, manifest, key, archive):
         """Export archive contents as a tarball"""
@@ -927,7 +941,7 @@ class Archiver:
             self.print_warning("Include pattern '%s' never matched.", pattern)
         return self.exit_code
 
-    @with_repository()
+    @with_repository(compatibility=(Manifest.Operation.READ,))
     @with_archive
     def do_diff(self, args, repository, manifest, key, archive):
         """Diff contents of two archives"""
@@ -1140,7 +1154,7 @@ class Archiver:
 
         return self.exit_code
 
-    @with_repository(exclusive=True, cache=True)
+    @with_repository(exclusive=True, cache=True, compatibility=(Manifest.Operation.CHECK,))
     @with_archive
     def do_rename(self, args, repository, manifest, key, cache, archive):
         """Rename an existing archive"""
@@ -1161,7 +1175,7 @@ class Archiver:
 
     def _delete_archives(self, args, repository):
         """Delete archives"""
-        manifest, key = Manifest.load(repository)
+        manifest, key = Manifest.load(repository, (Manifest.Operation.DELETE,))
 
         if args.location.archive:
             archive_names = (args.location.archive,)
@@ -1219,7 +1233,7 @@ class Archiver:
         if not args.cache_only:
             msg = []
             try:
-                manifest, key = Manifest.load(repository)
+                manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             except NoManifestError:
                 msg.append("You requested to completely DELETE the repository *including* all archives it may "
                            "contain.")
@@ -1258,7 +1272,7 @@ class Archiver:
 
         return self._do_mount(args)
 
-    @with_repository()
+    @with_repository(compatibility=(Manifest.Operation.READ,))
     def _do_mount(self, args, repository, manifest, key):
         from .fuse import FuseOperations
 
@@ -1276,7 +1290,7 @@ class Archiver:
         """un-mount the FUSE filesystem"""
         return umount(args.mountpoint)
 
-    @with_repository()
+    @with_repository(compatibility=(Manifest.Operation.READ,))
     def do_list(self, args, repository, manifest, key):
         """List archive or repository contents"""
         if not hasattr(sys.stdout, 'buffer'):
@@ -1348,7 +1362,7 @@ class Archiver:
 
         return self.exit_code
 
-    @with_repository(cache=True)
+    @with_repository(cache=True, compatibility=(Manifest.Operation.READ,))
     def do_info(self, args, repository, manifest, key, cache):
         """Show archive details such as disk space used"""
         if any((args.location.archive, args.first, args.last, args.prefix)):
@@ -1438,7 +1452,7 @@ class Archiver:
             print(str(cache))
         return self.exit_code
 
-    @with_repository(exclusive=True)
+    @with_repository(exclusive=True, compatibility=(Manifest.Operation.DELETE,))
     def do_prune(self, args, repository, manifest, key):
         """Prune repository archives according to specified rules"""
         if not any((args.secondly, args.minutely, args.hourly, args.daily,
@@ -1517,7 +1531,7 @@ class Archiver:
     def do_upgrade(self, args, repository, manifest=None, key=None):
         """upgrade a repository from a previous version"""
         if args.tam:
-            manifest, key = Manifest.load(repository, force_tam_not_required=args.force)
+            manifest, key = Manifest.load(repository, (Manifest.Operation.CHECK,), force_tam_not_required=args.force)
 
             if not hasattr(key, 'change_passphrase'):
                 print('This repository is not encrypted, cannot enable TAM.')
@@ -1542,7 +1556,7 @@ class Archiver:
                 open(tam_file, 'w').close()
                 print('Updated security database')
         elif args.disable_tam:
-            manifest, key = Manifest.load(repository, force_tam_not_required=True)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK, force_tam_not_required=True)
             if tam_required(repository):
                 os.unlink(tam_required_file(repository))
             if key.tam_required:
@@ -1570,7 +1584,7 @@ class Archiver:
                 print("warning: %s" % e)
         return self.exit_code
 
-    @with_repository(cache=True, exclusive=True)
+    @with_repository(cache=True, exclusive=True, compatibility=(Manifest.Operation.CHECK,))
     def do_recreate(self, args, repository, manifest, key, cache):
         """Re-create archives"""
         msg = ("recreate is an experimental feature.\n"
@@ -1647,7 +1661,7 @@ class Archiver:
         print('Process ID:', get_process_id())
         return EXIT_SUCCESS
 
-    @with_repository()
+    @with_repository(compatibility=Manifest.NO_OPERATION_CHECK)
     def do_debug_dump_archive_items(self, args, repository, manifest, key):
         """dump (decrypted, decompressed) archive items metadata (not: data)"""
         archive = Archive(repository, key, manifest, args.location.archive,
@@ -1661,7 +1675,7 @@ class Archiver:
         print('Done.')
         return EXIT_SUCCESS
 
-    @with_repository()
+    @with_repository(compatibility=Manifest.NO_OPERATION_CHECK)
     def do_debug_dump_archive(self, args, repository, manifest, key):
         """dump decoded archive metadata (not: data)"""
 
@@ -1714,7 +1728,7 @@ class Archiver:
                 output(fd)
         return EXIT_SUCCESS
 
-    @with_repository()
+    @with_repository(compatibility=Manifest.NO_OPERATION_CHECK)
     def do_debug_dump_manifest(self, args, repository, manifest, key):
         """dump decoded repository manifest"""
 
@@ -1729,7 +1743,7 @@ class Archiver:
                 json.dump(meta, fd, indent=4)
         return EXIT_SUCCESS
 
-    @with_repository()
+    @with_repository(compatibility=Manifest.NO_OPERATION_CHECK)
     def do_debug_dump_repo_objs(self, args, repository, manifest, key):
         """dump (decrypted, decompressed) repo objects"""
         marker = None
@@ -1803,7 +1817,7 @@ class Archiver:
         print('Done.')
         return EXIT_SUCCESS
 
-    @with_repository(manifest=False, exclusive=True, cache=True)
+    @with_repository(manifest=False, exclusive=True, cache=True, compatibility=Manifest.NO_OPERATION_CHECK)
     def do_debug_refcount_obj(self, args, repository, manifest, key, cache):
         """display refcounts for the objects with the given IDs"""
         for hex_id in args.ids:

+ 49 - 1
src/borg/cache.py

@@ -15,8 +15,9 @@ from .constants import CACHE_README
 from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
 from .helpers import Location
 from .helpers import Error
+from .helpers import Manifest
 from .helpers import get_cache_dir, get_security_dir
-from .helpers import int_to_bigint, bigint_to_int, bin_to_hex
+from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, parse_stringified_list
 from .helpers import format_file_size
 from .helpers import safe_ns
 from .helpers import yes, hostname_is_unique
@@ -257,6 +258,8 @@ class CacheConfig:
         self.manifest_id = unhexlify(self._config.get('cache', 'manifest'))
         self.timestamp = self._config.get('cache', 'timestamp', fallback=None)
         self.key_type = self._config.get('cache', 'key_type', fallback=None)
+        self.ignored_features = set(parse_stringified_list(self._config.get('cache', 'ignored_features', fallback='')))
+        self.mandatory_features = set(parse_stringified_list(self._config.get('cache', 'mandatory_features', fallback='')))
         try:
             self.integrity = dict(self._config.items('integrity'))
             if self._config.get('cache', 'manifest') != self.integrity.pop('manifest'):
@@ -281,6 +284,8 @@ class CacheConfig:
         if manifest:
             self._config.set('cache', 'manifest', manifest.id_str)
             self._config.set('cache', 'timestamp', manifest.timestamp)
+            self._config.set('cache', 'ignored_features', ','.join(self.ignored_features))
+            self._config.set('cache', 'mandatory_features', ','.join(self.mandatory_features))
             if not self._config.has_section('integrity'):
                 self._config.add_section('integrity')
             for file, integrity_data in self.integrity.items():
@@ -370,6 +375,12 @@ class Cache:
         self.open()
         try:
             self.security_manager.assert_secure(manifest, key, cache_config=self.cache_config)
+
+            if not self.check_cache_compatibility():
+                self.wipe_cache()
+
+            self.update_compatibility()
+
             if sync and self.manifest.id != self.cache_config.manifest_id:
                 self.sync()
                 self.commit()
@@ -718,6 +729,43 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             self.do_cache = os.path.isdir(archive_path)
             self.chunks = create_master_idx(self.chunks)
 
+    def check_cache_compatibility(self):
+        my_features = Manifest.SUPPORTED_REPO_FEATURES
+        if self.cache_config.ignored_features & my_features:
+            # The cache might not contain references of chunks that need a feature that is mandatory for some operation
+            # and which this version supports. To avoid corruption while executing that operation force rebuild.
+            return False
+        if not self.cache_config.mandatory_features <= my_features:
+            # The cache was build with consideration to at least one feature that this version does not understand.
+            # This client might misinterpret the cache. Thus force a rebuild.
+            return False
+        return True
+
+    def wipe_cache(self):
+        logger.warning("Discarding incompatible cache and forcing a cache rebuild")
+        archive_path = os.path.join(self.path, 'chunks.archive.d')
+        if os.path.isdir(archive_path):
+            shutil.rmtree(os.path.join(self.path, 'chunks.archive.d'))
+            os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
+        self.chunks = ChunkIndex()
+        with open(os.path.join(self.path, 'files'), 'wb'):
+            pass  # empty file
+        self.cache_config.manifest_id = ''
+        self.cache_config._config.set('cache', 'manifest', '')
+
+        self.cache_config.ignored_features = set()
+        self.cache_config.mandatory_features = set()
+
+    def update_compatibility(self):
+        operation_to_features_map = self.manifest.get_all_mandatory_features()
+        my_features = Manifest.SUPPORTED_REPO_FEATURES
+        repo_features = set()
+        for operation, features in operation_to_features_map.items():
+            repo_features.update(features)
+
+        self.cache_config.ignored_features.update(repo_features - my_features)
+        self.cache_config.mandatory_features.update(repo_features & my_features)
+
     def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
         if not self.txn_active:
             self.begin_txn()

+ 66 - 2
src/borg/helpers.py

@@ -1,6 +1,7 @@
 import argparse
 import contextlib
 import collections
+import enum
 import grp
 import hashlib
 import logging
@@ -123,6 +124,10 @@ def check_python():
         raise PythonLibcTooOld
 
 
+class MandatoryFeatureUnsupported(Error):
+    """Unsupported repository feature(s) {}. A newer version of borg is required to access this repository."""
+
+
 def check_extension_modules():
     from . import platform, compress, item
     if hashindex.API_VERSION != '1.1_03':
@@ -222,6 +227,34 @@ class Archives(abc.MutableMapping):
 
 class Manifest:
 
+    @enum.unique
+    class Operation(enum.Enum):
+        # The comments here only roughly describe the scope of each feature. In the end, additions need to be
+        # based on potential problems older clients could produce when accessing newer repositories and the
+        # tradeofs of locking version out or still allowing access. As all older versions and their exact
+        # behaviours are known when introducing new features sometimes this might not match the general descriptions
+        # below.
+
+        # The READ operation describes which features are needed to safely list and extract the archives in the
+        # repository.
+        READ = 'read'
+        # The CHECK operation is for all operations that need either to understand every detail
+        # of the repository (for consistency checks and repairs) or are seldom used functions that just
+        # should use the most restrictive feature set because more fine grained compatibility tracking is
+        # not needed.
+        CHECK = 'check'
+        # The WRITE operation is for adding archives. Features here ensure that older clients don't add archives
+        # in an old format, or is used to lock out clients that for other reasons can no longer safely add new
+        # archives.
+        WRITE = 'write'
+        # The DELETE operation is for all operations (like archive deletion) that need a 100% correct reference
+        # count and the need to be able to find all (directly and indirectly) referenced chunks of a given archive.
+        DELETE = 'delete'
+
+    NO_OPERATION_CHECK = tuple()
+
+    SUPPORTED_REPO_FEATURES = frozenset([])
+
     MANIFEST_ID = b'\0' * 32
 
     def __init__(self, key, repository, item_keys=None):
@@ -242,7 +275,7 @@ class Manifest:
         return datetime.strptime(self.timestamp, "%Y-%m-%dT%H:%M:%S.%f")
 
     @classmethod
-    def load(cls, repository, key=None, force_tam_not_required=False):
+    def load(cls, repository, operations, key=None, force_tam_not_required=False):
         from .item import ManifestItem
         from .crypto.key import key_factory, tam_required_file, tam_required
         from .repository import Repository
@@ -257,7 +290,7 @@ class Manifest:
         manifest_dict, manifest.tam_verified = key.unpack_and_verify_manifest(data, force_tam_not_required=force_tam_not_required)
         m = ManifestItem(internal_dict=manifest_dict)
         manifest.id = key.id_hash(data)
-        if m.get('version') != 1:
+        if m.get('version') not in (1, 2):
             raise ValueError('Invalid manifest version')
         manifest.archives.set_raw_dict(m.archives)
         manifest.timestamp = m.get('timestamp')
@@ -275,8 +308,34 @@ class Manifest:
             if not manifest_required and security_required:
                 logger.debug('Manifest is TAM verified and says TAM is *not* required, updating security database...')
                 os.unlink(tam_required_file(repository))
+        manifest.check_repository_compatibility(operations)
         return manifest, key
 
+    def check_repository_compatibility(self, operations):
+        for operation in operations:
+            assert isinstance(operation, self.Operation)
+            feature_flags = self.config.get(b'feature_flags', None)
+            if feature_flags is None:
+                return
+            if operation.value.encode() not in feature_flags:
+                continue
+            requirements = feature_flags[operation.value.encode()]
+            if b'mandatory' in requirements:
+                unsupported = set(requirements[b'mandatory']) - self.SUPPORTED_REPO_FEATURES
+                if unsupported:
+                    raise MandatoryFeatureUnsupported([f.decode() for f in unsupported])
+
+    def get_all_mandatory_features(self):
+        result = {}
+        feature_flags = self.config.get(b'feature_flags', None)
+        if feature_flags is None:
+            return result
+
+        for operation, requirements in feature_flags.items():
+            if b'mandatory' in requirements:
+                result[operation.decode()] = set([feature.decode() for feature in requirements[b'mandatory']])
+        return result
+
     def write(self):
         from .item import ManifestItem
         if self.key.tam_required:
@@ -775,6 +834,11 @@ def bin_to_hex(binary):
     return hexlify(binary).decode('ascii')
 
 
+def parse_stringified_list(s):
+    l = re.split(" *, *", s)
+    return [item for item in l if item != '']
+
+
 class Location:
     """Object representing a repository / archive location
     """

+ 114 - 12
src/borg/testsuite/archiver.py

@@ -41,7 +41,7 @@ from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMReq
 from ..crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
 from ..crypto.file_integrity import FileIntegrityError
 from ..helpers import Location, get_security_dir
-from ..helpers import Manifest
+from ..helpers import Manifest, MandatoryFeatureUnsupported
 from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
 from ..helpers import bin_to_hex
 from ..helpers import MAX_S
@@ -289,7 +289,7 @@ class ArchiverTestCaseBase(BaseTestCase):
     def open_archive(self, name):
         repository = Repository(self.repository_path, exclusive=True)
         with repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             archive = Archive(repository, key, manifest, name)
         return archive, repository
 
@@ -1248,7 +1248,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('extract', '--dry-run', self.repository_location + '::test.4')
         # Make sure both archives have been renamed
         with Repository(self.repository_path) as repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
         self.assert_equal(len(manifest.archives), 2)
         self.assert_in('test.3', manifest.archives)
         self.assert_in('test.4', manifest.archives)
@@ -1349,7 +1349,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('init', '--encryption=none', self.repository_location)
         self.create_src_archive('test')
         with Repository(self.repository_path, exclusive=True) as repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             archive = Archive(repository, key, manifest, 'test')
             for item in archive.iter_items():
                 if 'chunks' in item:
@@ -1367,7 +1367,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('init', '--encryption=none', self.repository_location)
         self.create_src_archive('test')
         with Repository(self.repository_path, exclusive=True) as repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             archive = Archive(repository, key, manifest, 'test')
             id = archive.metadata.items[0]
             repository.put(id, b'corrupted items metadata stream chunk')
@@ -1417,9 +1417,111 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('create', '--dry-run', self.repository_location + '::test', 'input')
         # Make sure no archive has been created
         with Repository(self.repository_path) as repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
         self.assert_equal(len(manifest.archives), 0)
 
+    def add_unknown_feature(self, operation):
+        with Repository(self.repository_path, exclusive=True) as repository:
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
+            manifest.config[b'feature_flags'] = {operation.value.encode(): {b'mandatory': [b'unknown-feature']}}
+            manifest.write()
+            repository.commit()
+
+    def cmd_raises_unknown_feature(self, args):
+        if self.FORK_DEFAULT:
+            self.cmd(*args, exit_code=EXIT_ERROR)
+        else:
+            with pytest.raises(MandatoryFeatureUnsupported) as excinfo:
+                self.cmd(*args)
+            assert excinfo.value.args == (['unknown-feature'],)
+
+    def test_unknown_feature_on_create(self):
+        print(self.cmd('init', '--encryption=repokey', self.repository_location))
+        self.add_unknown_feature(Manifest.Operation.WRITE)
+        self.cmd_raises_unknown_feature(['create', self.repository_location + '::test', 'input'])
+
+    def test_unknown_feature_on_change_passphrase(self):
+        print(self.cmd('init', '--encryption=repokey', self.repository_location))
+        self.add_unknown_feature(Manifest.Operation.CHECK)
+        self.cmd_raises_unknown_feature(['change-passphrase', self.repository_location])
+
+    def test_unknown_feature_on_read(self):
+        print(self.cmd('init', '--encryption=repokey', self.repository_location))
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.add_unknown_feature(Manifest.Operation.READ)
+        with changedir('output'):
+            self.cmd_raises_unknown_feature(['extract', self.repository_location + '::test'])
+
+        self.cmd_raises_unknown_feature(['list', self.repository_location])
+        self.cmd_raises_unknown_feature(['info', self.repository_location + '::test'])
+
+    def test_unknown_feature_on_rename(self):
+        print(self.cmd('init', '--encryption=repokey', self.repository_location))
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.add_unknown_feature(Manifest.Operation.CHECK)
+        self.cmd_raises_unknown_feature(['rename', self.repository_location + '::test', 'other'])
+
+    def test_unknown_feature_on_delete(self):
+        print(self.cmd('init', '--encryption=repokey', self.repository_location))
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.add_unknown_feature(Manifest.Operation.DELETE)
+        # delete of an archive raises
+        self.cmd_raises_unknown_feature(['delete', self.repository_location + '::test'])
+        self.cmd_raises_unknown_feature(['prune', '--keep-daily=3', self.repository_location])
+        # delete of the whole repository ignores features
+        self.cmd('delete', self.repository_location)
+
+    @unittest.skipUnless(has_llfuse, 'llfuse not installed')
+    def test_unknown_feature_on_mount(self):
+        self.cmd('init', '--encryption=repokey', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.add_unknown_feature(Manifest.Operation.READ)
+        mountpoint = os.path.join(self.tmpdir, 'mountpoint')
+        os.mkdir(mountpoint)
+        # XXX this might hang if it doesn't raise an error
+        self.cmd_raises_unknown_feature(['mount', self.repository_location + '::test', mountpoint])
+
+    @pytest.mark.allow_cache_wipe
+    def test_unknown_mandatory_feature_in_cache(self):
+        if self.prefix:
+            path_prefix = 'ssh://__testsuite__'
+        else:
+            path_prefix = ''
+
+        print(self.cmd('init', '--encryption=repokey', self.repository_location))
+
+        with Repository(self.repository_path, exclusive=True) as repository:
+            if path_prefix:
+                repository._location = Location(self.repository_location)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
+            with Cache(repository, key, manifest) as cache:
+                cache.begin_txn()
+                cache.cache_config.mandatory_features = set(['unknown-feature'])
+                cache.commit()
+
+        if self.FORK_DEFAULT:
+            self.cmd('create', self.repository_location + '::test', 'input')
+        else:
+            called = False
+            wipe_cache_safe = Cache.wipe_cache
+
+            def wipe_wrapper(*args):
+                nonlocal called
+                called = True
+                wipe_cache_safe(*args)
+
+            with patch.object(Cache, 'wipe_cache', wipe_wrapper):
+                self.cmd('create', self.repository_location + '::test', 'input')
+
+            assert called
+
+        with Repository(self.repository_path, exclusive=True) as repository:
+            if path_prefix:
+                repository._location = Location(self.repository_location)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
+            with Cache(repository, key, manifest) as cache:
+                assert cache.cache_config.mandatory_features == set([])
+
     def test_progress_on(self):
         self.create_regular_file('file1', size=1024 * 80)
         self.cmd('init', '--encryption=repokey', self.repository_location)
@@ -2091,7 +2193,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('check', self.repository_location)
         # Then check that the cache on disk matches exactly what's in the repo.
         with self.open_repository() as repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             with Cache(repository, key, manifest, sync=False) as cache:
                 original_chunks = cache.chunks
             cache.destroy(repository)
@@ -2112,7 +2214,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('init', '--encryption=repokey', self.repository_location)
         self.cmd('create', self.repository_location + '::test', 'input')
         with self.open_repository() as repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             with Cache(repository, key, manifest, sync=False) as cache:
                 cache.begin_txn()
                 cache.chunks.incref(list(cache.chunks.iteritems())[0][0])
@@ -2750,7 +2852,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
 
         archive, repository = self.open_archive('archive1')
         with repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             with Cache(repository, key, manifest) as cache:
                 archive = Archive(repository, key, manifest, '0.13', cache=cache, create=True)
                 archive.items_buffer.add(Attic013Item)
@@ -2762,7 +2864,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
 class ManifestAuthenticationTest(ArchiverTestCaseBase):
     def spoof_manifest(self, repository):
         with repository:
-            _, key = Manifest.load(repository)
+            _, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             repository.put(Manifest.MANIFEST_ID, key.encrypt(msgpack.packb({
                 'version': 1,
                 'archives': {},
@@ -2775,7 +2877,7 @@ class ManifestAuthenticationTest(ArchiverTestCaseBase):
         self.cmd('init', '--encryption=repokey', self.repository_location)
         repository = Repository(self.repository_path, exclusive=True)
         with repository:
-            manifest, key = Manifest.load(repository)
+            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             repository.put(Manifest.MANIFEST_ID, key.encrypt(msgpack.packb({
                 'version': 1,
                 'archives': {},
@@ -2792,7 +2894,7 @@ class ManifestAuthenticationTest(ArchiverTestCaseBase):
         repository = Repository(self.repository_path, exclusive=True)
         with repository:
             shutil.rmtree(get_security_dir(bin_to_hex(repository.id)))
-            _, key = Manifest.load(repository)
+            _, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             key.tam_required = False
             key.change_passphrase(key._passphrase)