Browse Source

recreate: remove interruption blah, autocommit blah, resuming blah

Marian Beermann 8 years ago
parent
commit
44935aa8ea
3 changed files with 29 additions and 257 deletions
  1. 9 127
      src/borg/archive.py
  2. 20 30
      src/borg/archiver.py
  3. 0 100
      src/borg/testsuite/archiver.py

+ 9 - 127
src/borg/archive.py

@@ -1371,9 +1371,6 @@ class ArchiveChecker:
 
 
 
 
 class ArchiveRecreater:
 class ArchiveRecreater:
-    AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
-    """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
-
     class FakeTargetArchive:
     class FakeTargetArchive:
         def __init__(self):
         def __init__(self):
             self.stats = Statistics()
             self.stats = Statistics()
@@ -1409,9 +1406,6 @@ class ArchiveRecreater:
                                                         compression_files or [])
                                                         compression_files or [])
         key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
         key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
 
 
-        self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
-        logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
-
         self.dry_run = dry_run
         self.dry_run = dry_run
         self.stats = stats
         self.stats = stats
         self.progress = progress
         self.progress = progress
@@ -1423,20 +1417,17 @@ class ArchiveRecreater:
     def recreate(self, archive_name, comment=None, target_name=None):
     def recreate(self, archive_name, comment=None, target_name=None):
         assert not self.is_temporary_archive(archive_name)
         assert not self.is_temporary_archive(archive_name)
         archive = self.open_archive(archive_name)
         archive = self.open_archive(archive_name)
-        target, resume_from = self.create_target_or_resume(archive, target_name)
+        target = self.create_target(archive, target_name)
         if self.exclude_if_present or self.exclude_caches:
         if self.exclude_if_present or self.exclude_caches:
             self.matcher_add_tagged_dirs(archive)
             self.matcher_add_tagged_dirs(archive)
         if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
         if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
             logger.info("Skipping archive %s, nothing to do", archive_name)
             logger.info("Skipping archive %s, nothing to do", archive_name)
             return True
             return True
-        try:
-            self.process_items(archive, target, resume_from)
-        except self.Interrupted as e:
-            return self.save(archive, target, completed=False, metadata=e.metadata)
+        self.process_items(archive, target)
         replace_original = target_name is None
         replace_original = target_name is None
         return self.save(archive, target, comment, replace_original=replace_original)
         return self.save(archive, target, comment, replace_original=replace_original)
 
 
-    def process_items(self, archive, target, resume_from=None):
+    def process_items(self, archive, target):
         matcher = self.matcher
         matcher = self.matcher
         target_is_subset = not matcher.empty()
         target_is_subset = not matcher.empty()
         hardlink_masters = {} if target_is_subset else None
         hardlink_masters = {} if target_is_subset else None
@@ -1450,15 +1441,8 @@ class ArchiveRecreater:
 
 
         for item in archive.iter_items():
         for item in archive.iter_items():
             if item_is_hardlink_master(item):
             if item_is_hardlink_master(item):
-                # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
                 hardlink_masters[item.path] = (item.get('chunks'), None)
                 hardlink_masters[item.path] = (item.get('chunks'), None)
                 continue
                 continue
-            if resume_from:
-                # Fast forward to after the last processed file
-                if item.path == resume_from:
-                    logger.info('Fast-forwarded to %s', remove_surrogates(item.path))
-                    resume_from = None
-                continue
             if not matcher.match(item.path):
             if not matcher.match(item.path):
                 self.print_file_status('x', item.path)
                 self.print_file_status('x', item.path)
                 continue
                 continue
@@ -1476,12 +1460,7 @@ class ArchiveRecreater:
             if self.dry_run:
             if self.dry_run:
                 self.print_file_status('-', item.path)
                 self.print_file_status('-', item.path)
             else:
             else:
-                try:
-                    self.process_item(archive, target, item)
-                except self.Interrupted:
-                    if self.progress:
-                        target.stats.show_progress(final=True)
-                    raise
+                self.process_item(archive, target, item)
         if self.progress:
         if self.progress:
             target.stats.show_progress(final=True)
             target.stats.show_progress(final=True)
 
 
@@ -1491,8 +1470,6 @@ class ArchiveRecreater:
             target.stats.nfiles += 1
             target.stats.nfiles += 1
         target.add_item(item)
         target.add_item(item)
         self.print_file_status(file_status(item.mode), item.path)
         self.print_file_status(file_status(item.mode), item.path)
-        if self.interrupt:
-            raise self.Interrupted
 
 
     def process_chunks(self, archive, target, item):
     def process_chunks(self, archive, target, item):
         """Return new chunk ID list for 'item'."""
         """Return new chunk ID list for 'item'."""
@@ -1500,9 +1477,8 @@ class ArchiveRecreater:
             for chunk_id, size, csize in item.chunks:
             for chunk_id, size, csize in item.chunks:
                 self.cache.chunk_incref(chunk_id, target.stats)
                 self.cache.chunk_incref(chunk_id, target.stats)
             return item.chunks
             return item.chunks
-        new_chunks = self.process_partial_chunks(target)
+        new_chunks = []
         chunk_iterator = self.create_chunk_iterator(archive, target, item)
         chunk_iterator = self.create_chunk_iterator(archive, target, item)
-        consume(chunk_iterator, len(new_chunks))
         compress = self.compression_decider1.decide(item.path)
         compress = self.compression_decider1.decide(item.path)
         for chunk in chunk_iterator:
         for chunk in chunk_iterator:
             chunk.meta['compress'] = compress
             chunk.meta['compress'] = compress
@@ -1521,20 +1497,8 @@ class ArchiveRecreater:
                 chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
                 chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
                 new_chunks.append((chunk_id, size, csize))
                 new_chunks.append((chunk_id, size, csize))
                 self.seen_chunks.add(chunk_id)
                 self.seen_chunks.add(chunk_id)
-                if self.recompress and self.cache.seen_chunk(chunk_id) == 1:
-                    # This tracks how many bytes are uncommitted but compactable, since we are recompressing
-                    # existing chunks.
-                    target.recreate_uncomitted_bytes += csize
-                    if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
-                        # Issue commits to limit additional space usage when recompressing chunks
-                        target.recreate_uncomitted_bytes = 0
-                        self.repository.commit()
             if self.progress:
             if self.progress:
                 target.stats.show_progress(item=item, dt=0.2)
                 target.stats.show_progress(item=item, dt=0.2)
-            if self.interrupt:
-                raise self.Interrupted({
-                    'recreate_partial_chunks': new_chunks,
-                })
         return new_chunks
         return new_chunks
 
 
     def create_chunk_iterator(self, archive, target, item):
     def create_chunk_iterator(self, archive, target, item):
@@ -1552,19 +1516,6 @@ class ArchiveRecreater:
             chunk_iterator = _chunk_iterator()
             chunk_iterator = _chunk_iterator()
         return chunk_iterator
         return chunk_iterator
 
 
-    def process_partial_chunks(self, target):
-        """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
-        if not target.recreate_partial_chunks:
-            return []
-        # No incref, create_target_or_resume already did that before to deleting the old target archive
-        # So just copy these over
-        partial_chunks = target.recreate_partial_chunks
-        target.recreate_partial_chunks = None
-        for chunk_id, size, csize in partial_chunks:
-            self.seen_chunks.add(chunk_id)
-        logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
-        return partial_chunks
-
     def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True):
     def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True):
         """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
         """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
         if self.dry_run:
         if self.dry_run:
@@ -1631,84 +1582,15 @@ class ArchiveRecreater:
         matcher.add(tag_files, True)
         matcher.add(tag_files, True)
         matcher.add(tagged_dirs, False)
         matcher.add(tagged_dirs, False)
 
 
-    def create_target_or_resume(self, archive, target_name=None):
-        """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
+    def create_target(self, archive, target_name=None):
+        """Create target archive."""
         if self.dry_run:
         if self.dry_run:
             return self.FakeTargetArchive(), None
             return self.FakeTargetArchive(), None
         target_name = target_name or archive.name + '.recreate'
         target_name = target_name or archive.name + '.recreate'
-        resume = target_name in self.manifest.archives
-        target, resume_from = None, None
-        if resume:
-            target, resume_from = self.try_resume(archive, target_name)
-        if not target:
-            target = self.create_target_archive(target_name)
+        target = self.create_target_archive(target_name)
         # If the archives use the same chunker params, then don't rechunkify
         # If the archives use the same chunker params, then don't rechunkify
         target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params
         target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params
-        return target, resume_from
-
-    def try_resume(self, archive, target_name):
-        """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
-        logger.info('Found %s, will resume interrupted operation', target_name)
-        old_target = self.open_archive(target_name)
-        if not self.can_resume(archive, old_target, target_name):
-            return None, None
-        target = self.create_target_archive(target_name + '.temp')
-        logger.info('Replaying items from interrupted operation...')
-        last_old_item = self.copy_items(old_target, target)
-        resume_from = getattr(last_old_item, 'path', None)
-        self.incref_partial_chunks(old_target, target)
-        old_target.delete(Statistics(), progress=self.progress)
-        logger.info('Done replaying items')
-        return target, resume_from
-
-    def incref_partial_chunks(self, source_archive, target_archive):
-        target_archive.recreate_partial_chunks = source_archive.metadata.get('recreate_partial_chunks', [])
-        for chunk_id, size, csize in target_archive.recreate_partial_chunks:
-            if not self.cache.seen_chunk(chunk_id):
-                try:
-                    # Repository has __contains__, RemoteRepository doesn't
-                    # `chunk_id in repo` doesn't read the data though, so we try to use that if possible.
-                    get_or_in = getattr(self.repository, '__contains__', self.repository.get)
-                    if get_or_in(chunk_id) is False:
-                        raise Repository.ObjectNotFound(chunk_id, self.repository)
-                except Repository.ObjectNotFound:
-                    # delete/prune/check between invocations: these chunks are gone.
-                    target_archive.recreate_partial_chunks = None
-                    break
-                # fast-lane insert into chunks cache
-                self.cache.chunks[chunk_id] = (1, size, csize)
-                target_archive.stats.update(size, csize, True)
-                continue
-            # incref now, otherwise a source_archive.delete() might delete these chunks
-            self.cache.chunk_incref(chunk_id, target_archive.stats)
-
-    def copy_items(self, source_archive, target_archive):
-        item = None
-        for item in source_archive.iter_items():
-            if 'chunks' in item:
-                for chunk in item.chunks:
-                    self.cache.chunk_incref(chunk.id, target_archive.stats)
-                    target_archive.stats.nfiles += 1
-            target_archive.add_item(item)
-        if self.progress:
-            target_archive.stats.show_progress(final=True)
-        return item
-
-    def can_resume(self, archive, old_target, target_name):
-        resume_id = old_target.metadata.recreate_source_id
-        resume_args = [safe_decode(arg) for arg in old_target.metadata.recreate_args]
-        if resume_id != archive.id:
-            logger.warning('Source archive changed, will discard %s and start over', target_name)
-            logger.warning('Saved fingerprint:   %s', bin_to_hex(resume_id))
-            logger.warning('Current fingerprint: %s', archive.fpr)
-            old_target.delete(Statistics(), progress=self.progress)
-            return False
-        if resume_args != sys.argv[1:]:
-            logger.warning('Command line changed, this might lead to inconsistencies')
-            logger.warning('Saved:   %s', repr(resume_args))
-            logger.warning('Current: %s', repr(sys.argv[1:]))
-            # Just warn in this case, don't start over
-        return True
+        return target
 
 
     def create_target_archive(self, name):
     def create_target_archive(self, name):
         target = Archive(self.repository, self.key, self.manifest, name, create=True,
         target = Archive(self.repository, self.key, self.manifest, name, create=True,

+ 20 - 30
src/borg/archiver.py

@@ -1058,13 +1058,6 @@ class Archiver:
     @with_repository(cache=True, exclusive=True)
     @with_repository(cache=True, exclusive=True)
     def do_recreate(self, args, repository, manifest, key, cache):
     def do_recreate(self, args, repository, manifest, key, cache):
         """Re-create archives"""
         """Re-create archives"""
-        def interrupt(signal_num, stack_frame):
-            if recreater.interrupt:
-                print("\nReceived signal, again. I'm not deaf.", file=sys.stderr)
-            else:
-                print("\nReceived signal, will exit cleanly.", file=sys.stderr)
-            recreater.interrupt = True
-
         msg = ("recreate is an experimental feature.\n"
         msg = ("recreate is an experimental feature.\n"
                "Type 'YES' if you understand this and want to continue: ")
                "Type 'YES' if you understand this and want to continue: ")
         if not yes(msg, false_msg="Aborting.", truish=('YES',),
         if not yes(msg, false_msg="Aborting.", truish=('YES',),
@@ -1084,30 +1077,27 @@ class Archiver:
                                      file_status_printer=self.print_file_status,
                                      file_status_printer=self.print_file_status,
                                      dry_run=args.dry_run)
                                      dry_run=args.dry_run)
 
 
-        with signal_handler(signal.SIGTERM, interrupt), \
-             signal_handler(signal.SIGINT, interrupt), \
-             signal_handler(signal.SIGHUP, interrupt):
-            if args.location.archive:
-                name = args.location.archive
+        if args.location.archive:
+            name = args.location.archive
+            if recreater.is_temporary_archive(name):
+                self.print_error('Refusing to work on temporary archive of prior recreate: %s', name)
+                return self.exit_code
+            recreater.recreate(name, args.comment, args.target)
+        else:
+            if args.target is not None:
+                self.print_error('--target: Need to specify single archive')
+                return self.exit_code
+            for archive in manifest.archives.list(sort_by=['ts']):
+                name = archive.name
                 if recreater.is_temporary_archive(name):
                 if recreater.is_temporary_archive(name):
-                    self.print_error('Refusing to work on temporary archive of prior recreate: %s', name)
-                    return self.exit_code
-                recreater.recreate(name, args.comment, args.target)
-            else:
-                if args.target is not None:
-                    self.print_error('--target: Need to specify single archive')
-                    return self.exit_code
-                for archive in manifest.archives.list(sort_by=['ts']):
-                    name = archive.name
-                    if recreater.is_temporary_archive(name):
-                        continue
-                    print('Processing', name)
-                    if not recreater.recreate(name, args.comment):
-                        break
-            manifest.write()
-            repository.commit()
-            cache.commit()
-            return self.exit_code
+                    continue
+                print('Processing', name)
+                if not recreater.recreate(name, args.comment):
+                    break
+        manifest.write()
+        repository.commit()
+        cache.commit()
+        return self.exit_code
 
 
     @with_repository(manifest=False, exclusive=True)
     @with_repository(manifest=False, exclusive=True)
     def do_with_lock(self, args, repository):
     def do_with_lock(self, args, repository):

+ 0 - 100
src/borg/testsuite/archiver.py

@@ -1717,106 +1717,6 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         archives_after = self.cmd('list', self.repository_location + '::test')
         archives_after = self.cmd('list', self.repository_location + '::test')
         assert archives_after == archives_before
         assert archives_after == archives_before
 
 
-    def _recreate_interrupt_patch(self, interrupt_after_n_1_files):
-        def interrupt(self, *args):
-            if interrupt_after_n_1_files:
-                self.interrupt = True
-                pi_save(self, *args)
-            else:
-                raise ArchiveRecreater.Interrupted
-
-        def process_item_patch(*args):
-            return pi_call.pop(0)(*args)
-
-        pi_save = ArchiveRecreater.process_item
-        pi_call = [pi_save] * interrupt_after_n_1_files + [interrupt]
-        return process_item_patch
-
-    def _test_recreate_interrupt(self, change_args, interrupt_early):
-        self.create_test_files()
-        self.create_regular_file('dir2/abcdef', size=1024 * 80)
-        self.cmd('init', self.repository_location)
-        self.cmd('create', self.repository_location + '::test', 'input')
-        process_files = 1
-        if interrupt_early:
-            process_files = 0
-        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(process_files)):
-            self.cmd('recreate', self.repository_location, 'input/dir2')
-        assert 'test.recreate' in self.cmd('list', self.repository_location)
-        if change_args:
-            with patch.object(sys, 'argv', sys.argv + ['non-forking tests don\'t use sys.argv']):
-                output = self.cmd('recreate', '-sv', '--list', '-pC', 'lz4', self.repository_location, 'input/dir2')
-        else:
-            output = self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2')
-        assert 'Found test.recreate, will resume' in output
-        assert change_args == ('Command line changed' in output)
-        if not interrupt_early:
-            assert 'Fast-forwarded to input/dir2/abcdef' in output
-            assert 'A input/dir2/abcdef' not in output
-        assert 'A input/dir2/file2' in output
-        archives = self.cmd('list', self.repository_location)
-        assert 'test.recreate' not in archives
-        assert 'test' in archives
-        files = self.cmd('list', self.repository_location + '::test')
-        assert 'dir2/file2' in files
-        assert 'dir2/abcdef' in files
-        assert 'file1' not in files
-
-    # The _test_create_interrupt requires a deterministic (alphabetic) order of the files to easily check if
-    # resumption works correctly. Patch scandir_inorder to work in alphabetic order.
-
-    def test_recreate_interrupt(self):
-        with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic):
-            self._test_recreate_interrupt(False, True)
-
-    def test_recreate_interrupt2(self):
-        with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic):
-            self._test_recreate_interrupt(True, False)
-
-    def _test_recreate_chunker_interrupt_patch(self):
-        real_add_chunk = Cache.add_chunk
-
-        def add_chunk(*args, **kwargs):
-            frame = inspect.stack()[2]
-            try:
-                caller_self = frame[0].f_locals['self']
-                if isinstance(caller_self, ArchiveRecreater):
-                    caller_self.interrupt = True
-            finally:
-                del frame
-            return real_add_chunk(*args, **kwargs)
-        return add_chunk
-
-    def test_recreate_rechunkify_interrupt(self):
-        self.create_regular_file('file1', size=1024 * 80)
-        self.cmd('init', self.repository_location)
-        self.cmd('create', self.repository_location + '::test', 'input')
-        archive_before = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
-        with patch.object(Cache, 'add_chunk', self._test_recreate_chunker_interrupt_patch()):
-            self.cmd('recreate', '-pv', '--chunker-params', '10,13,11,4095', self.repository_location)
-        assert 'test.recreate' in self.cmd('list', self.repository_location)
-        output = self.cmd('recreate', '-svp', '--debug', '--chunker-params', '10,13,11,4095', self.repository_location)
-        assert 'Found test.recreate, will resume' in output
-        assert 'Copied 1 chunks from a partially processed item' in output
-        archive_after = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
-        assert archive_after == archive_before
-
-    def test_recreate_changed_source(self):
-        self.create_test_files()
-        self.cmd('init', self.repository_location)
-        self.cmd('create', self.repository_location + '::test', 'input')
-        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(1)):
-            self.cmd('recreate', self.repository_location, 'input/dir2')
-        assert 'test.recreate' in self.cmd('list', self.repository_location)
-        self.cmd('delete', self.repository_location + '::test')
-        self.cmd('create', self.repository_location + '::test', 'input')
-        output = self.cmd('recreate', self.repository_location, 'input/dir2')
-        assert 'Source archive changed, will discard test.recreate and start over' in output
-
-    def test_recreate_refuses_temporary(self):
-        self.cmd('init', self.repository_location)
-        self.cmd('recreate', self.repository_location + '::cba.recreate', exit_code=2)
-
     def test_recreate_skips_nothing_to_do(self):
     def test_recreate_skips_nothing_to_do(self):
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('file1', size=1024 * 80)
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)