Browse Source

prune/delete --checkpoint-interval=1800 and ctrl-c/SIGINT support, fixes #6284

manifest, repo and cache are committed every checkpoint interval.

also, when ctrl-c is pressed, finish deleting the current archive, commit and then terminate.
Thomas Waldmann 2 years ago
parent
commit
f8e33cfdf1
2 changed files with 54 additions and 12 deletions
  1. 1 1
      setup.cfg
  2. 53 11
      src/borg/archiver.py

+ 1 - 1
setup.cfg

@@ -44,7 +44,7 @@ ignore = E226, W503
 per_file_ignores =
     docs/conf.py:E121,E126,E265,E305,E401,E402
     src/borg/archive.py:E122,E125,E127,E402,E501,F401,F405,W504
-    src/borg/archiver.py:E126,E127,E128,E501,E722,E731,E741,F401,F405,W504
+    src/borg/archiver.py:E125,E126,E127,E128,E501,E722,E731,E741,F401,F405,W504
     src/borg/cache.py:E127,E128,E402,E501,E722,W504
     src/borg/fuse.py:E402,E501,E722,W504
     src/borg/fuse_impl.py:F811

+ 53 - 11
src/borg/archiver.py

@@ -232,6 +232,7 @@ class Archiver:
         self.exit_code = EXIT_SUCCESS
         self.lock_wait = lock_wait
         self.prog = prog
+        self.last_checkpoint = time.monotonic()
 
     def print_error(self, msg, *args):
         msg = args and msg % args or msg
@@ -1158,6 +1159,20 @@ class Archiver:
         cache.commit()
         return self.exit_code
 
+    def maybe_checkpoint(self, *, checkpoint_func, checkpoint_interval):
+        checkpointed = False
+        sig_int_triggered = sig_int and sig_int.action_triggered()
+        if sig_int_triggered or checkpoint_interval and time.monotonic() - self.last_checkpoint > checkpoint_interval:
+            if sig_int_triggered:
+                logger.info('checkpoint requested: starting checkpoint creation...')
+            checkpoint_func()
+            checkpointed = True
+            self.last_checkpoint = time.monotonic()
+            if sig_int_triggered:
+                sig_int.action_completed()
+                logger.info('checkpoint requested: finished checkpoint creation!')
+        return checkpointed
+
     @with_repository(exclusive=True, manifest=False)
     def do_delete(self, args, repository):
         """Delete an existing repository or archives"""
@@ -1217,11 +1232,18 @@ class Archiver:
 
         stats = Statistics(iec=args.iec)
         with Cache(repository, key, manifest, progress=args.progress, lock_wait=self.lock_wait, iec=args.iec) as cache:
+            def checkpoint_func():
+                manifest.write()
+                repository.commit(compact=False, save_space=args.save_space)
+                cache.commit()
+
             msg_delete = 'Would delete archive: {} ({}/{})' if dry_run else 'Deleting archive: {} ({}/{})'
             msg_not_found = 'Archive {} not found ({}/{}).'
             logger_list = logging.getLogger('borg.output.list')
-            delete_count = 0
+            uncommitted_deletes = 0
             for i, archive_name in enumerate(archive_names, 1):
+                if sig_int and sig_int.action_done():
+                    break
                 try:
                     archive_info = manifest.archives[archive_name]
                 except KeyError:
@@ -1234,12 +1256,14 @@ class Archiver:
                         archive = Archive(repository, key, manifest, archive_name, cache=cache,
                                           consider_part_files=args.consider_part_files)
                         archive.delete(stats, progress=args.progress, forced=args.forced)
-                        delete_count += 1
-            if delete_count > 0:
-                # only write/commit if we actually changed something, see #6060.
-                manifest.write()
-                repository.commit(compact=False, save_space=args.save_space)
-                cache.commit()
+                        checkpointed = self.maybe_checkpoint(checkpoint_func=checkpoint_func,
+                                                             checkpoint_interval=args.checkpoint_interval)
+                        uncommitted_deletes = 0 if checkpointed else (uncommitted_deletes + 1)
+            if sig_int:
+                # Ctrl-C / SIGINT: do not checkpoint (commit) again, we already have a checkpoint in this case.
+                self.print_error("Got Ctrl-C / SIGINT.")
+            elif uncommitted_deletes > 0:
+                checkpoint_func()
             if args.stats:
                 log_multi(DASHES,
                           STATS_HEADER,
@@ -1541,12 +1565,20 @@ class Archiver:
         to_delete = (set(archives) | checkpoints) - (set(keep) | set(keep_checkpoints))
         stats = Statistics(iec=args.iec)
         with Cache(repository, key, manifest, lock_wait=self.lock_wait, iec=args.iec) as cache:
+            def checkpoint_func():
+                manifest.write()
+                repository.commit(compact=False, save_space=args.save_space)
+                cache.commit()
+
             list_logger = logging.getLogger('borg.output.list')
             # set up counters for the progress display
             to_delete_len = len(to_delete)
             archives_deleted = 0
+            uncommitted_deletes = 0
             pi = ProgressIndicatorPercent(total=len(to_delete), msg='Pruning archives %3.0f%%', msgid='prune')
             for archive in archives_checkpoints:
+                if sig_int and sig_int.action_done():
+                    break
                 if archive in to_delete:
                     pi.show()
                     if args.dry_run:
@@ -1557,6 +1589,9 @@ class Archiver:
                         archive = Archive(repository, key, manifest, archive.name, cache,
                                           consider_part_files=args.consider_part_files)
                         archive.delete(stats, forced=args.forced)
+                        checkpointed = self.maybe_checkpoint(checkpoint_func=checkpoint_func,
+                                                             checkpoint_interval=args.checkpoint_interval)
+                        uncommitted_deletes = 0 if checkpointed else (uncommitted_deletes + 1)
                 else:
                     if is_checkpoint(archive.name):
                         log_message = 'Keeping checkpoint archive:'
@@ -1569,10 +1604,11 @@ class Archiver:
                         message=log_message, archive=format_archive(archive)
                     ))
             pi.finish()
-            if to_delete and not args.dry_run:
-                manifest.write()
-                repository.commit(compact=False, save_space=args.save_space)
-                cache.commit()
+            if sig_int:
+                # Ctrl-C / SIGINT: do not checkpoint (commit) again, we already have a checkpoint in this case.
+                self.print_error("Got Ctrl-C / SIGINT.")
+            elif uncommitted_deletes > 0:
+                checkpoint_func()
             if args.stats:
                 log_multi(DASHES,
                           STATS_HEADER,
@@ -3820,6 +3856,9 @@ class Archiver:
                                help='keep the local security info when deleting a repository')
         subparser.add_argument('--save-space', dest='save_space', action='store_true',
                                help='work slower, but using less space')
+        subparser.add_argument('-c', '--checkpoint-interval', metavar='SECONDS', dest='checkpoint_interval',
+                                type=int, default=1800,
+                                help='write checkpoint every SECONDS seconds (Default: 1800)')
         subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='',
                                type=location_validator(),
                                help='repository or archive to delete')
@@ -4496,6 +4535,9 @@ class Archiver:
         define_archive_filters_group(subparser, sort_by=False, first_last=False)
         subparser.add_argument('--save-space', dest='save_space', action='store_true',
                                help='work slower, but using less space')
+        subparser.add_argument('-c', '--checkpoint-interval', metavar='SECONDS', dest='checkpoint_interval',
+                                   type=int, default=1800,
+                                   help='write checkpoint every SECONDS seconds (Default: 1800)')
         subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='',
                                type=location_validator(archive=False),
                                help='repository to prune')