Selaa lähdekoodia

Implement storage quotas

Marian Beermann 8 vuotta sitten
vanhempi
sitoutus
4edf77788d

+ 61 - 0
docs/internals/data-structures.rst

@@ -185,6 +185,67 @@ commit logic) showing the principal operation of compaction:
 (The actual algorithm is more complex to avoid various consistency issues, refer to
 the ``borg.repository`` module for more comments and documentation on these issues.)
 
+.. _internals_storage_quota:
+
+Storage quotas
+~~~~~~~~~~~~~~
+
+Quotas are implemented at the Repository level. The active quota of a repository
+is determined by the ``storage_quota`` `config` entry or a run-time override (via :ref:`borg_serve`).
+The currently used quota is stored in the hints file. Operations (PUT and DELETE) during
+a transaction modify the currently used quota:
+
+- A PUT adds the size of the *log entry* to the quota,
+  i.e. the length of the data plus the 41 byte header.
+- A DELETE subtracts the size of the deleted log entry from the quota,
+  which includes the header.
+
+Thus, PUT and DELETE are symmetric and cancel each other out precisely.
+
+The quota does not track on-disk size overheads (due to conditional compaction
+or append-only mode). In normal operation the inclusion of the log entry headers
+in the quota act as a faithful proxy for index and hints overheads.
+
+By tracking effective content size, the client can *always* recover from a full quota
+by deleting archives. This would not be possible if the quota tracked on-disk size,
+since journaling DELETEs requires extra disk space before space is freed.
+Tracking effective size on the other hand accounts DELETEs immediately as freeing quota.
+
+.. rubric:: Enforcing the quota
+
+The storage quota is meant as a robust mechanism for service providers, therefore
+:ref:`borg_serve` has to enforce it without loopholes (e.g. modified clients).
+
+The quota is enforcible only if *all* :ref:`borg_serve` versions
+accessible to clients support quotas (see next section). Further, quota is
+per repository. Therefore, ensure clients can only access a defined set of repositories
+with their quotas set, using ``--restrict-to-path``.
+
+If the client exceeds the storage quota the ``StorageQuotaExceeded`` exception is
+raised. Normally a client could ignore such an exception and just send a ``commit()``
+command anyway, circumventing the quota. However, when ``StorageQuotaExceeded`` is raised,
+it is stored in the ``transaction_doomed`` attribute of the repository.
+If the transaction is doomed, then commit will re-raise this exception, aborting the commit.
+
+The transaction_doomed indicator is reset on a rollback (which erases the quota-exceeding
+state).
+
+.. rubric:: Compatibility with older servers and enabling quota after-the-fact
+
+If no quota data is stored in the hints file, Borg assumes zero quota is used.
+Thus, if a repository with an enabled quota is written to with an older version
+that does not understand quotas, then the quota usage will be erased.
+
+A similar situation arises when upgrading from a Borg release that did not have quotas.
+Borg will start tracking quota use from the time of the upgrade, starting at zero.
+
+If the quota shall be enforced accurately in these cases, either
+
+- delete the ``index.N`` and ``hints.N`` files, forcing Borg to rebuild both,
+  re-acquiring quota data in the process, or
+- edit the msgpacked ``hints.N`` file (not recommended and thus not
+  documented further).
+
 .. _manifest:
 
 The manifest

+ 2 - 0
docs/usage/init.rst.inc

@@ -17,6 +17,8 @@ optional arguments
         | select encryption key mode **(required)**
     ``-a``, ``--append-only``
         | create an append-only mode repository
+    ``--storage-quota``
+        | Set storage quota of the new repository (e.g. 5G, 1.5T). Default: no quota.
 
 `Common options`_
     |

+ 2 - 0
docs/usage/serve.rst.inc

@@ -13,6 +13,8 @@ optional arguments
         | restrict repository access to PATH. Can be specified multiple times to allow the client access to several directories. Access to all sub-directories is granted implicitly; PATH doesn't need to directly point to a repository.
     ``--append-only``
         | only allow appending to repository segment files
+    ``--storage-quota``
+        | Override storage quota of the repository (e.g. 5G, 1.5T). When a new repository is initialized, sets the storage quota on the new repository as well. Default: no quota.
 
 `Common options`_
     |

+ 21 - 2
src/borg/archiver.py

@@ -46,7 +46,7 @@ from .helpers import Error, NoManifestError, set_ec
 from .helpers import location_validator, archivename_validator, ChunkerParams
 from .helpers import PrefixSpec, SortBySpec, HUMAN_SORT_KEYS
 from .helpers import BaseFormatter, ItemFormatter, ArchiveFormatter
-from .helpers import format_timedelta, format_file_size, format_archive
+from .helpers import format_timedelta, format_file_size, parse_file_size, format_archive
 from .helpers import safe_encode, remove_surrogates, bin_to_hex, prepare_dump_dict
 from .helpers import prune_within, prune_split
 from .helpers import timestamp
@@ -142,6 +142,13 @@ def with_archive(method):
     return wrapper
 
 
+def parse_storage_quota(storage_quota):
+    parsed = parse_file_size(storage_quota)
+    if parsed < parse_file_size('10M'):
+        raise argparse.ArgumentTypeError('quota is too small (%s). At least 10M are required.' % storage_quota)
+    return parsed
+
+
 class Archiver:
 
     def __init__(self, lock_wait=None, prog=None):
@@ -206,7 +213,11 @@ class Archiver:
 
     def do_serve(self, args):
         """Start in server mode. This command is usually not used manually."""
-        return RepositoryServer(restrict_to_paths=args.restrict_to_paths, append_only=args.append_only).serve()
+        return RepositoryServer(
+            restrict_to_paths=args.restrict_to_paths,
+            append_only=args.append_only,
+            storage_quota=args.storage_quota,
+        ).serve()
 
     @with_repository(create=True, exclusive=True, manifest=False)
     def do_init(self, args, repository):
@@ -2330,6 +2341,11 @@ class Archiver:
                                                     'Access to all sub-directories is granted implicitly; PATH doesn\'t need to directly point to a repository.')
         subparser.add_argument('--append-only', dest='append_only', action='store_true',
                                help='only allow appending to repository segment files')
+        subparser.add_argument('--storage-quota', dest='storage_quota', default=None,
+                               type=parse_storage_quota,
+                               help='Override storage quota of the repository (e.g. 5G, 1.5T). '
+                                    'When a new repository is initialized, sets the storage quota on the new '
+                                    'repository as well. Default: no quota.')
 
         init_epilog = process_epilog("""
         This command initializes an empty repository. A repository is a filesystem
@@ -2420,6 +2436,9 @@ class Archiver:
                                help='select encryption key mode **(required)**')
         subparser.add_argument('-a', '--append-only', dest='append_only', action='store_true',
                                help='create an append-only mode repository')
+        subparser.add_argument('--storage-quota', dest='storage_quota', default=None,
+                               type=parse_storage_quota,
+                               help='Set storage quota of the new repository (e.g. 5G, 1.5T). Default: no quota.')
 
         check_epilog = process_epilog("""
         The check command verifies the consistency of a repository and the corresponding archives.

+ 6 - 1
src/borg/remote.py

@@ -178,7 +178,7 @@ class RepositoryServer:  # pragma: no cover
         'inject_exception',
     )
 
-    def __init__(self, restrict_to_paths, append_only):
+    def __init__(self, restrict_to_paths, append_only, storage_quota):
         self.repository = None
         self.restrict_to_paths = restrict_to_paths
         # This flag is parsed from the serve command line via Archiver.do_serve,
@@ -186,6 +186,7 @@ class RepositoryServer:  # pragma: no cover
         # whatever the client wants, except when initializing a new repository
         # (see RepositoryServer.open below).
         self.append_only = append_only
+        self.storage_quota = storage_quota
         self.client_version = parse_version('1.0.8')  # fallback version if client is too old to send version information
 
     def positional_to_named(self, method, argv):
@@ -360,6 +361,7 @@ class RepositoryServer:  # pragma: no cover
         append_only = (not create and self.append_only) or append_only
         self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
                                      append_only=append_only,
+                                     storage_quota=self.storage_quota,
                                      exclusive=exclusive)
         self.repository.__enter__()  # clean exit handled by serve() method
         return self.repository.id
@@ -671,6 +673,9 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                     topic = 'borg.debug.' + topic
                 if 'repository' in topic:
                     opts.append('--debug-topic=%s' % topic)
+
+            if 'storage_quota' in args and args.storage_quota:
+                opts.append('--storage-quota=%s' % args.storage_quota)
         env_vars = []
         if not hostname_is_unique():
             env_vars.append('BORG_HOSTNAME_IS_UNIQUE=no')

+ 42 - 3
src/borg/repository.py

@@ -107,10 +107,14 @@ class Repository:
     class InsufficientFreeSpaceError(Error):
         """Insufficient free space to complete transaction (required: {}, available: {})."""
 
-    def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False):
+    class StorageQuotaExceeded(Error):
+        """The storage quota ({}) has been exceeded ({}). Try deleting some archives."""
+
+    def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True,
+                 append_only=False, storage_quota=None):
         self.path = os.path.abspath(path)
         self._location = Location('file://%s' % self.path)
-        self.io = None
+        self.io = None  # type: LoggedIO
         self.lock = None
         self.index = None
         # This is an index of shadowed log entries during this transaction. Consider the following sequence:
@@ -124,6 +128,9 @@ class Repository:
         self.created = False
         self.exclusive = exclusive
         self.append_only = append_only
+        self.storage_quota = storage_quota
+        self.storage_quota_use = 0
+        self.transaction_doomed = None
 
     def __del__(self):
         if self.lock:
@@ -209,6 +216,10 @@ class Repository:
         config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
         config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
         config.set('repository', 'append_only', str(int(self.append_only)))
+        if self.storage_quota:
+            config.set('repository', 'storage_quota', str(self.storage_quota))
+        else:
+            config.set('repository', 'storage_quota', '0')
         config.set('repository', 'additional_free_space', '0')
         config.set('repository', 'id', bin_to_hex(os.urandom(32)))
         self.save_config(path, config)
@@ -331,6 +342,9 @@ class Repository:
         # append_only can be set in the constructor
         # it shouldn't be overridden (True -> False) here
         self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
+        if self.storage_quota is None:
+            # self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
+            self.storage_quota = self.config.getint('repository', 'storage_quota', fallback=0)
         self.id = unhexlify(self.config.get('repository', 'id').strip())
         self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
 
@@ -346,7 +360,12 @@ class Repository:
         """Commit transaction
         """
         # save_space is not used anymore, but stays for RPC/API compatibility.
+        if self.transaction_doomed:
+            exception = self.transaction_doomed
+            self.rollback()
+            raise exception
         self.check_free_space()
+        self.log_storage_quota()
         self.io.write_commit()
         if not self.append_only:
             self.compact_segments()
@@ -398,6 +417,7 @@ class Repository:
         if transaction_id is None:
             self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
             self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
+            self.storage_quota_use = 0
             self.shadow_index.clear()
         else:
             if do_cleanup:
@@ -420,6 +440,7 @@ class Repository:
                 logger.debug('Upgrading from v1 hints.%d', transaction_id)
                 self.segments = hints[b'segments']
                 self.compact = FreeSpace()
+                self.storage_quota_use = 0
                 for segment in sorted(hints[b'compact']):
                     logger.debug('Rebuilding sparse info for segment %d', segment)
                     self._rebuild_sparse(segment)
@@ -429,6 +450,8 @@ class Repository:
             else:
                 self.segments = hints[b'segments']
                 self.compact = FreeSpace(hints[b'compact'])
+                self.storage_quota_use = hints.get(b'storage_quota_use', 0)
+            self.log_storage_quota()
             # Drop uncommitted segments in the shadow index
             for key, shadowed_segments in self.shadow_index.items():
                 for segment in list(shadowed_segments):
@@ -438,7 +461,8 @@ class Repository:
     def write_index(self):
         hints = {b'version': 2,
                  b'segments': self.segments,
-                 b'compact': self.compact}
+                 b'compact': self.compact,
+                 b'storage_quota_use': self.storage_quota_use, }
         transaction_id = self.io.get_segments_transaction_id()
         assert transaction_id is not None
         hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
@@ -515,6 +539,11 @@ class Repository:
             formatted_free = format_file_size(free_space)
             raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
 
+    def log_storage_quota(self):
+        if self.storage_quota:
+            logger.info('Storage quota: %s out of %s used.',
+                        format_file_size(self.storage_quota_use), format_file_size(self.storage_quota))
+
     def compact_segments(self):
         """Compact sparse segments by copying data into new segments
         """
@@ -672,6 +701,7 @@ class Repository:
                     pass
                 self.index[key] = segment, offset
                 self.segments[segment] += 1
+                self.storage_quota_use += size
             elif tag == TAG_DELETE:
                 try:
                     # if the deleted PUT is not in the index, there is nothing to clean up
@@ -684,6 +714,7 @@ class Repository:
                         # is already gone, then it was already compacted.
                         self.segments[s] -= 1
                         size = self.io.read(s, offset, key, read_data=False)
+                        self.storage_quota_use -= size
                         self.compact[s] += size
             elif tag == TAG_COMMIT:
                 continue
@@ -821,6 +852,7 @@ class Repository:
             self.io.cleanup(self.io.get_segments_transaction_id())
         self.index = None
         self._active_txn = False
+        self.transaction_doomed = None
 
     def rollback(self):
         # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time.
@@ -915,14 +947,20 @@ class Repository:
         else:
             self.segments[segment] -= 1
             size = self.io.read(segment, offset, id, read_data=False)
+            self.storage_quota_use -= size
             self.compact[segment] += size
             segment, size = self.io.write_delete(id)
             self.compact[segment] += size
             self.segments.setdefault(segment, 0)
         segment, offset = self.io.write_put(id, data)
+        self.storage_quota_use += len(data) + self.io.put_header_fmt.size
         self.segments.setdefault(segment, 0)
         self.segments[segment] += 1
         self.index[id] = segment, offset
+        if self.storage_quota and self.storage_quota_use > self.storage_quota:
+            self.transaction_doomed = self.StorageQuotaExceeded(
+                format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
+            raise self.transaction_doomed
 
     def delete(self, id, wait=True):
         """delete a repo object
@@ -939,6 +977,7 @@ class Repository:
         self.shadow_index.setdefault(id, []).append(segment)
         self.segments[segment] -= 1
         size = self.io.read(segment, offset, id, read_data=False)
+        self.storage_quota_use -= size
         self.compact[segment] += size
         segment, size = self.io.write_delete(id)
         self.compact[segment] += size

+ 48 - 0
src/borg/testsuite/repository.py

@@ -415,6 +415,43 @@ class RepositoryFreeSpaceTestCase(RepositoryTestCaseBase):
         assert not os.path.exists(self.repository.path)
 
 
+class QuotaTestCase(RepositoryTestCaseBase):
+    def test_tracking(self):
+        assert self.repository.storage_quota_use == 0
+        self.repository.put(H(1), bytes(1234))
+        assert self.repository.storage_quota_use == 1234 + 41
+        self.repository.put(H(2), bytes(5678))
+        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41
+        self.repository.delete(H(1))
+        assert self.repository.storage_quota_use == 5678 + 41
+        self.repository.commit()
+        self.reopen()
+        with self.repository:
+            # Open new transaction; hints and thus quota data is not loaded unless needed.
+            self.repository.put(H(3), b'')
+            self.repository.delete(H(3))
+            assert self.repository.storage_quota_use == 5678 + 41
+
+    def test_exceed_quota(self):
+        assert self.repository.storage_quota_use == 0
+        self.repository.storage_quota = 50
+        self.repository.put(H(1), b'')
+        assert self.repository.storage_quota_use == 41
+        self.repository.commit()
+        with pytest.raises(Repository.StorageQuotaExceeded):
+            self.repository.put(H(2), b'')
+        assert self.repository.storage_quota_use == 82
+        with pytest.raises(Repository.StorageQuotaExceeded):
+            self.repository.commit()
+        assert self.repository.storage_quota_use == 82
+        self.reopen()
+        with self.repository:
+            self.repository.storage_quota = 50
+            # Open new transaction; hints and thus quota data is not loaded unless needed.
+            self.repository.put(H(1), b'')
+            assert self.repository.storage_quota_use == 41
+
+
 class NonceReservation(RepositoryTestCaseBase):
     def test_get_free_nonce_asserts(self):
         self.reopen(exclusive=False)
@@ -641,6 +678,7 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
 
 @pytest.mark.skipif(sys.platform == 'cygwin', reason='remote is broken on cygwin and hangs')
 class RemoteRepositoryTestCase(RepositoryTestCase):
+    repository = None  # type: RemoteRepository
 
     def open(self, create=False):
         return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
@@ -716,6 +754,10 @@ class RemoteRepositoryTestCase(RepositoryTestCase):
             umask = 0o077
             debug_topics = []
 
+            def __contains__(self, item):
+                # To behave like argparse.Namespace
+                return hasattr(self, item)
+
         assert self.repository.borg_cmd(None, testing=True) == [sys.executable, '-m', 'borg.archiver', 'serve']
         args = MockArgs()
         # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
@@ -727,6 +769,12 @@ class RemoteRepositoryTestCase(RepositoryTestCase):
         args.debug_topics = ['something_client_side', 'repository_compaction']
         assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info',
                                                                  '--debug-topic=borg.debug.repository_compaction']
+        args = MockArgs()
+        args.storage_quota = 0
+        assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info']
+        args.storage_quota = 314159265
+        assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info',
+                                                                 '--storage-quota=314159265']
 
 
 class RemoteLegacyFree(RepositoryTestCaseBase):