Jelajahi Sumber

Merge pull request #1862 from enkore/f/simple-recreate

The Defenestration of recreate's bloat
enkore 8 tahun lalu
induk
melakukan
abe5923866
3 mengubah file dengan 162 tambahan dan 346 penghapusan
  1. 83 214
      src/borg/archive.py
  2. 25 31
      src/borg/archiver.py
  3. 54 101
      src/borg/testsuite/archiver.py

+ 83 - 214
src/borg/archive.py

@@ -6,6 +6,7 @@ import sys
 import time
 import time
 from contextlib import contextmanager
 from contextlib import contextmanager
 from datetime import datetime, timezone
 from datetime import datetime, timezone
+from functools import partial
 from getpass import getuser
 from getpass import getuser
 from io import BytesIO
 from io import BytesIO
 from itertools import groupby
 from itertools import groupby
@@ -741,28 +742,32 @@ Number of files: {0.stats.nfiles}'''.format(
         self.add_item(item)
         self.add_item(item)
         return 's'  # symlink
         return 's'  # symlink
 
 
-    def chunk_file(self, item, cache, stats, fd, fh=-1, **chunk_kw):
-        def write_part(item, from_chunk, number):
-            item = Item(internal_dict=item.as_dict())
-            length = len(item.chunks)
-            # the item should only have the *additional* chunks we processed after the last partial item:
-            item.chunks = item.chunks[from_chunk:]
-            item.path += '.borg_part_%d' % number
-            item.part = number
-            number += 1
-            self.add_item(item, show_progress=False)
-            self.write_checkpoint()
-            return length, number
+    def write_part_file(self, item, from_chunk, number):
+        item = Item(internal_dict=item.as_dict())
+        length = len(item.chunks)
+        # the item should only have the *additional* chunks we processed after the last partial item:
+        item.chunks = item.chunks[from_chunk:]
+        item.path += '.borg_part_%d' % number
+        item.part = number
+        number += 1
+        self.add_item(item, show_progress=False)
+        self.write_checkpoint()
+        return length, number
+
+    def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
+        if not chunk_processor:
+            def chunk_processor(data):
+                return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)
 
 
         item.chunks = []
         item.chunks = []
         from_chunk = 0
         from_chunk = 0
         part_number = 1
         part_number = 1
-        for data in backup_io_iter(self.chunker.chunkify(fd, fh)):
-            item.chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats))
+        for data in chunk_iter:
+            item.chunks.append(chunk_processor(data))
             if self.show_progress:
             if self.show_progress:
                 self.stats.show_progress(item=item, dt=0.2)
                 self.stats.show_progress(item=item, dt=0.2)
             if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
             if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
-                from_chunk, part_number = write_part(item, from_chunk, part_number)
+                from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
                 self.last_checkpoint = time.time()
                 self.last_checkpoint = time.time()
         else:
         else:
             if part_number > 1:
             if part_number > 1:
@@ -770,7 +775,7 @@ Number of files: {0.stats.nfiles}'''.format(
                     # if we already have created a part item inside this file, we want to put the final
                     # if we already have created a part item inside this file, we want to put the final
                     # chunks (if any) into a part item also (so all parts can be concatenated to get
                     # chunks (if any) into a part item also (so all parts can be concatenated to get
                     # the complete file):
                     # the complete file):
-                    from_chunk, part_number = write_part(item, from_chunk, part_number)
+                    from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
                     self.last_checkpoint = time.time()
                     self.last_checkpoint = time.time()
 
 
                 # if we created part files, we have referenced all chunks from the part files,
                 # if we created part files, we have referenced all chunks from the part files,
@@ -789,7 +794,7 @@ Number of files: {0.stats.nfiles}'''.format(
             mtime=t, atime=t, ctime=t,
             mtime=t, atime=t, ctime=t,
         )
         )
         fd = sys.stdin.buffer  # binary
         fd = sys.stdin.buffer  # binary
-        self.chunk_file(item, cache, self.stats, fd)
+        self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
         self.stats.nfiles += 1
         self.stats.nfiles += 1
         self.add_item(item)
         self.add_item(item)
         return 'i'  # stdin
         return 'i'  # stdin
@@ -845,7 +850,7 @@ Number of files: {0.stats.nfiles}'''.format(
             with backup_io():
             with backup_io():
                 fh = Archive._open_rb(path)
                 fh = Archive._open_rb(path)
             with os.fdopen(fh, 'rb') as fd:
             with os.fdopen(fh, 'rb') as fd:
-                self.chunk_file(item, cache, self.stats, fd, fh, compress=compress)
+                self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)), compress=compress)
             if not is_special_file:
             if not is_special_file:
                 # we must not memorize special files, because the contents of e.g. a
                 # we must not memorize special files, because the contents of e.g. a
                 # block or char device will change without its mtime/size/inode changing.
                 # block or char device will change without its mtime/size/inode changing.
@@ -1371,9 +1376,6 @@ class ArchiveChecker:
 
 
 
 
 class ArchiveRecreater:
 class ArchiveRecreater:
-    AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
-    """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
-
     class FakeTargetArchive:
     class FakeTargetArchive:
         def __init__(self):
         def __init__(self):
             self.stats = Statistics()
             self.stats = Statistics()
@@ -1389,7 +1391,8 @@ class ArchiveRecreater:
     def __init__(self, repository, manifest, key, cache, matcher,
     def __init__(self, repository, manifest, key, cache, matcher,
                  exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
                  exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
                  chunker_params=None, compression=None, compression_files=None, always_recompress=False,
                  chunker_params=None, compression=None, compression_files=None, always_recompress=False,
-                 dry_run=False, stats=False, progress=False, file_status_printer=None):
+                 dry_run=False, stats=False, progress=False, file_status_printer=None,
+                 checkpoint_interval=1800):
         self.repository = repository
         self.repository = repository
         self.key = key
         self.key = key
         self.manifest = manifest
         self.manifest = manifest
@@ -1409,34 +1412,26 @@ class ArchiveRecreater:
                                                         compression_files or [])
                                                         compression_files or [])
         key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
         key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
 
 
-        self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
-        logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
-
         self.dry_run = dry_run
         self.dry_run = dry_run
         self.stats = stats
         self.stats = stats
         self.progress = progress
         self.progress = progress
         self.print_file_status = file_status_printer or (lambda *args: None)
         self.print_file_status = file_status_printer or (lambda *args: None)
-
-        self.interrupt = False
-        self.errors = False
+        self.checkpoint_interval = checkpoint_interval
 
 
     def recreate(self, archive_name, comment=None, target_name=None):
     def recreate(self, archive_name, comment=None, target_name=None):
         assert not self.is_temporary_archive(archive_name)
         assert not self.is_temporary_archive(archive_name)
         archive = self.open_archive(archive_name)
         archive = self.open_archive(archive_name)
-        target, resume_from = self.create_target_or_resume(archive, target_name)
+        target = self.create_target(archive, target_name)
         if self.exclude_if_present or self.exclude_caches:
         if self.exclude_if_present or self.exclude_caches:
             self.matcher_add_tagged_dirs(archive)
             self.matcher_add_tagged_dirs(archive)
         if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
         if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
             logger.info("Skipping archive %s, nothing to do", archive_name)
             logger.info("Skipping archive %s, nothing to do", archive_name)
             return True
             return True
-        try:
-            self.process_items(archive, target, resume_from)
-        except self.Interrupted as e:
-            return self.save(archive, target, completed=False, metadata=e.metadata)
+        self.process_items(archive, target)
         replace_original = target_name is None
         replace_original = target_name is None
         return self.save(archive, target, comment, replace_original=replace_original)
         return self.save(archive, target, comment, replace_original=replace_original)
 
 
-    def process_items(self, archive, target, resume_from=None):
+    def process_items(self, archive, target):
         matcher = self.matcher
         matcher = self.matcher
         target_is_subset = not matcher.empty()
         target_is_subset = not matcher.empty()
         hardlink_masters = {} if target_is_subset else None
         hardlink_masters = {} if target_is_subset else None
@@ -1450,15 +1445,8 @@ class ArchiveRecreater:
 
 
         for item in archive.iter_items():
         for item in archive.iter_items():
             if item_is_hardlink_master(item):
             if item_is_hardlink_master(item):
-                # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
                 hardlink_masters[item.path] = (item.get('chunks'), None)
                 hardlink_masters[item.path] = (item.get('chunks'), None)
                 continue
                 continue
-            if resume_from:
-                # Fast forward to after the last processed file
-                if item.path == resume_from:
-                    logger.info('Fast-forwarded to %s', remove_surrogates(item.path))
-                    resume_from = None
-                continue
             if not matcher.match(item.path):
             if not matcher.match(item.path):
                 self.print_file_status('x', item.path)
                 self.print_file_status('x', item.path)
                 continue
                 continue
@@ -1476,23 +1464,16 @@ class ArchiveRecreater:
             if self.dry_run:
             if self.dry_run:
                 self.print_file_status('-', item.path)
                 self.print_file_status('-', item.path)
             else:
             else:
-                try:
-                    self.process_item(archive, target, item)
-                except self.Interrupted:
-                    if self.progress:
-                        target.stats.show_progress(final=True)
-                    raise
+                self.process_item(archive, target, item)
         if self.progress:
         if self.progress:
             target.stats.show_progress(final=True)
             target.stats.show_progress(final=True)
 
 
     def process_item(self, archive, target, item):
     def process_item(self, archive, target, item):
         if 'chunks' in item:
         if 'chunks' in item:
-            item.chunks = self.process_chunks(archive, target, item)
+            self.process_chunks(archive, target, item)
             target.stats.nfiles += 1
             target.stats.nfiles += 1
         target.add_item(item)
         target.add_item(item)
         self.print_file_status(file_status(item.mode), item.path)
         self.print_file_status(file_status(item.mode), item.path)
-        if self.interrupt:
-            raise self.Interrupted
 
 
     def process_chunks(self, archive, target, item):
     def process_chunks(self, archive, target, item):
         """Return new chunk ID list for 'item'."""
         """Return new chunk ID list for 'item'."""
@@ -1500,103 +1481,62 @@ class ArchiveRecreater:
             for chunk_id, size, csize in item.chunks:
             for chunk_id, size, csize in item.chunks:
                 self.cache.chunk_incref(chunk_id, target.stats)
                 self.cache.chunk_incref(chunk_id, target.stats)
             return item.chunks
             return item.chunks
-        new_chunks = self.process_partial_chunks(target)
-        chunk_iterator = self.create_chunk_iterator(archive, target, item)
-        consume(chunk_iterator, len(new_chunks))
+        chunk_iterator = self.create_chunk_iterator(archive, target, list(item.chunks))
         compress = self.compression_decider1.decide(item.path)
         compress = self.compression_decider1.decide(item.path)
-        for chunk in chunk_iterator:
-            chunk.meta['compress'] = compress
-            chunk_id = self.key.id_hash(chunk.data)
-            if chunk_id in self.seen_chunks:
-                new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
-            else:
-                compression_spec, chunk = self.key.compression_decider2.decide(chunk)
-                overwrite = self.recompress
-                if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
-                    # Check if this chunk is already compressed the way we want it
-                    old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
-                    if Compressor.detect(old_chunk.data).name == compression_spec['name']:
-                        # Stored chunk has the same compression we wanted
-                        overwrite = False
-                chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
-                new_chunks.append((chunk_id, size, csize))
-                self.seen_chunks.add(chunk_id)
-                if self.recompress and self.cache.seen_chunk(chunk_id) == 1:
-                    # This tracks how many bytes are uncommitted but compactable, since we are recompressing
-                    # existing chunks.
-                    target.recreate_uncomitted_bytes += csize
-                    if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
-                        # Issue commits to limit additional space usage when recompressing chunks
-                        target.recreate_uncomitted_bytes = 0
-                        self.repository.commit()
-            if self.progress:
-                target.stats.show_progress(item=item, dt=0.2)
-            if self.interrupt:
-                raise self.Interrupted({
-                    'recreate_partial_chunks': new_chunks,
-                })
-        return new_chunks
-
-    def create_chunk_iterator(self, archive, target, item):
+        chunk_processor = partial(self.chunk_processor, target, compress)
+        target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
+
+    def chunk_processor(self, target, compress, data):
+        chunk_id = self.key.id_hash(data)
+        if chunk_id in self.seen_chunks:
+            return self.cache.chunk_incref(chunk_id, target.stats)
+        chunk = Chunk(data, compress=compress)
+        compression_spec, chunk = self.key.compression_decider2.decide(chunk)
+        overwrite = self.recompress
+        if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
+            # Check if this chunk is already compressed the way we want it
+            old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
+            if Compressor.detect(old_chunk.data).name == compression_spec['name']:
+                # Stored chunk has the same compression we wanted
+                overwrite = False
+        chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
+        self.seen_chunks.add(chunk_id)
+        return chunk_id, size, csize
+
+    def create_chunk_iterator(self, archive, target, chunks):
         """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
         """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
-        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item.chunks])
+        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
         if target.recreate_rechunkify:
         if target.recreate_rechunkify:
             # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
             # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
             # (does not load the entire file into memory)
             # (does not load the entire file into memory)
             file = ChunkIteratorFileWrapper(chunk_iterator)
             file = ChunkIteratorFileWrapper(chunk_iterator)
+            return target.chunker.chunkify(file)
+        else:
+            for chunk in chunk_iterator:
+                yield chunk.data
 
 
-            def _chunk_iterator():
-                for data in target.chunker.chunkify(file):
-                    yield Chunk(data)
-
-            chunk_iterator = _chunk_iterator()
-        return chunk_iterator
-
-    def process_partial_chunks(self, target):
-        """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
-        if not target.recreate_partial_chunks:
-            return []
-        # No incref, create_target_or_resume already did that before to deleting the old target archive
-        # So just copy these over
-        partial_chunks = target.recreate_partial_chunks
-        target.recreate_partial_chunks = None
-        for chunk_id, size, csize in partial_chunks:
-            self.seen_chunks.add(chunk_id)
-        logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
-        return partial_chunks
-
-    def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True):
+    def save(self, archive, target, comment=None, replace_original=True):
         """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
         """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
         if self.dry_run:
         if self.dry_run:
-            return completed
-        if completed:
-            timestamp = archive.ts.replace(tzinfo=None)
-            if comment is None:
-                comment = archive.metadata.get('comment', '')
-            target.save(timestamp=timestamp, comment=comment, additional_metadata={
-                'cmdline': archive.metadata.cmdline,
-                'recreate_cmdline': sys.argv,
-            })
-            if replace_original:
-                archive.delete(Statistics(), progress=self.progress)
-                target.rename(archive.name)
-            if self.stats:
-                target.end = datetime.utcnow()
-                log_multi(DASHES,
-                          str(target),
-                          DASHES,
-                          str(target.stats),
-                          str(self.cache),
-                          DASHES)
-        else:
-            additional_metadata = metadata or {}
-            additional_metadata.update({
-                'recreate_source_id': archive.id,
-                'recreate_args': sys.argv[1:],
-            })
-            target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
-            logger.info('Run the same command again to resume.')
-        return completed
+            return
+        timestamp = archive.ts.replace(tzinfo=None)
+        if comment is None:
+            comment = archive.metadata.get('comment', '')
+        target.save(timestamp=timestamp, comment=comment, additional_metadata={
+            'cmdline': archive.metadata.cmdline,
+            'recreate_cmdline': sys.argv,
+        })
+        if replace_original:
+            archive.delete(Statistics(), progress=self.progress)
+            target.rename(archive.name)
+        if self.stats:
+            target.end = datetime.utcnow()
+            log_multi(DASHES,
+                      str(target),
+                      DASHES,
+                      str(target.stats),
+                      str(self.cache),
+                      DASHES)
 
 
     def matcher_add_tagged_dirs(self, archive):
     def matcher_add_tagged_dirs(self, archive):
         """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
         """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
@@ -1631,91 +1571,20 @@ class ArchiveRecreater:
         matcher.add(tag_files, True)
         matcher.add(tag_files, True)
         matcher.add(tagged_dirs, False)
         matcher.add(tagged_dirs, False)
 
 
-    def create_target_or_resume(self, archive, target_name=None):
-        """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
+    def create_target(self, archive, target_name=None):
+        """Create target archive."""
         if self.dry_run:
         if self.dry_run:
             return self.FakeTargetArchive(), None
             return self.FakeTargetArchive(), None
         target_name = target_name or archive.name + '.recreate'
         target_name = target_name or archive.name + '.recreate'
-        resume = target_name in self.manifest.archives
-        target, resume_from = None, None
-        if resume:
-            target, resume_from = self.try_resume(archive, target_name)
-        if not target:
-            target = self.create_target_archive(target_name)
+        target = self.create_target_archive(target_name)
         # If the archives use the same chunker params, then don't rechunkify
         # If the archives use the same chunker params, then don't rechunkify
         target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params
         target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params
-        return target, resume_from
-
-    def try_resume(self, archive, target_name):
-        """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
-        logger.info('Found %s, will resume interrupted operation', target_name)
-        old_target = self.open_archive(target_name)
-        if not self.can_resume(archive, old_target, target_name):
-            return None, None
-        target = self.create_target_archive(target_name + '.temp')
-        logger.info('Replaying items from interrupted operation...')
-        last_old_item = self.copy_items(old_target, target)
-        resume_from = getattr(last_old_item, 'path', None)
-        self.incref_partial_chunks(old_target, target)
-        old_target.delete(Statistics(), progress=self.progress)
-        logger.info('Done replaying items')
-        return target, resume_from
-
-    def incref_partial_chunks(self, source_archive, target_archive):
-        target_archive.recreate_partial_chunks = source_archive.metadata.get('recreate_partial_chunks', [])
-        for chunk_id, size, csize in target_archive.recreate_partial_chunks:
-            if not self.cache.seen_chunk(chunk_id):
-                try:
-                    # Repository has __contains__, RemoteRepository doesn't
-                    # `chunk_id in repo` doesn't read the data though, so we try to use that if possible.
-                    get_or_in = getattr(self.repository, '__contains__', self.repository.get)
-                    if get_or_in(chunk_id) is False:
-                        raise Repository.ObjectNotFound(chunk_id, self.repository)
-                except Repository.ObjectNotFound:
-                    # delete/prune/check between invocations: these chunks are gone.
-                    target_archive.recreate_partial_chunks = None
-                    break
-                # fast-lane insert into chunks cache
-                self.cache.chunks[chunk_id] = (1, size, csize)
-                target_archive.stats.update(size, csize, True)
-                continue
-            # incref now, otherwise a source_archive.delete() might delete these chunks
-            self.cache.chunk_incref(chunk_id, target_archive.stats)
-
-    def copy_items(self, source_archive, target_archive):
-        item = None
-        for item in source_archive.iter_items():
-            if 'chunks' in item:
-                for chunk in item.chunks:
-                    self.cache.chunk_incref(chunk.id, target_archive.stats)
-                    target_archive.stats.nfiles += 1
-            target_archive.add_item(item)
-        if self.progress:
-            target_archive.stats.show_progress(final=True)
-        return item
-
-    def can_resume(self, archive, old_target, target_name):
-        resume_id = old_target.metadata.recreate_source_id
-        resume_args = [safe_decode(arg) for arg in old_target.metadata.recreate_args]
-        if resume_id != archive.id:
-            logger.warning('Source archive changed, will discard %s and start over', target_name)
-            logger.warning('Saved fingerprint:   %s', bin_to_hex(resume_id))
-            logger.warning('Current fingerprint: %s', archive.fpr)
-            old_target.delete(Statistics(), progress=self.progress)
-            return False
-        if resume_args != sys.argv[1:]:
-            logger.warning('Command line changed, this might lead to inconsistencies')
-            logger.warning('Saved:   %s', repr(resume_args))
-            logger.warning('Current: %s', repr(sys.argv[1:]))
-            # Just warn in this case, don't start over
-        return True
+        return target
 
 
     def create_target_archive(self, name):
     def create_target_archive(self, name):
         target = Archive(self.repository, self.key, self.manifest, name, create=True,
         target = Archive(self.repository, self.key, self.manifest, name, create=True,
                           progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
                           progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
-                          checkpoint_interval=0, compression=self.compression)
-        target.recreate_partial_chunks = None
-        target.recreate_uncomitted_bytes = 0
+                          checkpoint_interval=self.checkpoint_interval, compression=self.compression)
         return target
         return target
 
 
     def open_archive(self, name, **kwargs):
     def open_archive(self, name, **kwargs):

+ 25 - 31
src/borg/archiver.py

@@ -1062,13 +1062,6 @@ class Archiver:
     @with_repository(cache=True, exclusive=True)
     @with_repository(cache=True, exclusive=True)
     def do_recreate(self, args, repository, manifest, key, cache):
     def do_recreate(self, args, repository, manifest, key, cache):
         """Re-create archives"""
         """Re-create archives"""
-        def interrupt(signal_num, stack_frame):
-            if recreater.interrupt:
-                print("\nReceived signal, again. I'm not deaf.", file=sys.stderr)
-            else:
-                print("\nReceived signal, will exit cleanly.", file=sys.stderr)
-            recreater.interrupt = True
-
         msg = ("recreate is an experimental feature.\n"
         msg = ("recreate is an experimental feature.\n"
                "Type 'YES' if you understand this and want to continue: ")
                "Type 'YES' if you understand this and want to continue: ")
         if not yes(msg, false_msg="Aborting.", truish=('YES',),
         if not yes(msg, false_msg="Aborting.", truish=('YES',),
@@ -1086,32 +1079,30 @@ class Archiver:
                                      always_recompress=args.always_recompress,
                                      always_recompress=args.always_recompress,
                                      progress=args.progress, stats=args.stats,
                                      progress=args.progress, stats=args.stats,
                                      file_status_printer=self.print_file_status,
                                      file_status_printer=self.print_file_status,
+                                     checkpoint_interval=args.checkpoint_interval,
                                      dry_run=args.dry_run)
                                      dry_run=args.dry_run)
 
 
-        with signal_handler(signal.SIGTERM, interrupt), \
-             signal_handler(signal.SIGINT, interrupt), \
-             signal_handler(signal.SIGHUP, interrupt):
-            if args.location.archive:
-                name = args.location.archive
+        if args.location.archive:
+            name = args.location.archive
+            if recreater.is_temporary_archive(name):
+                self.print_error('Refusing to work on temporary archive of prior recreate: %s', name)
+                return self.exit_code
+            recreater.recreate(name, args.comment, args.target)
+        else:
+            if args.target is not None:
+                self.print_error('--target: Need to specify single archive')
+                return self.exit_code
+            for archive in manifest.archives.list(sort_by=['ts']):
+                name = archive.name
                 if recreater.is_temporary_archive(name):
                 if recreater.is_temporary_archive(name):
-                    self.print_error('Refusing to work on temporary archive of prior recreate: %s', name)
-                    return self.exit_code
-                recreater.recreate(name, args.comment, args.target)
-            else:
-                if args.target is not None:
-                    self.print_error('--target: Need to specify single archive')
-                    return self.exit_code
-                for archive in manifest.archives.list(sort_by=['ts']):
-                    name = archive.name
-                    if recreater.is_temporary_archive(name):
-                        continue
-                    print('Processing', name)
-                    if not recreater.recreate(name, args.comment):
-                        break
-            manifest.write()
-            repository.commit()
-            cache.commit()
-            return self.exit_code
+                    continue
+                print('Processing', name)
+                if not recreater.recreate(name, args.comment):
+                    break
+        manifest.write()
+        repository.commit()
+        cache.commit()
+        return self.exit_code
 
 
     @with_repository(manifest=False, exclusive=True)
     @with_repository(manifest=False, exclusive=True)
     def do_with_lock(self, args, repository):
     def do_with_lock(self, args, repository):
@@ -2441,6 +2432,9 @@ class Archiver:
                                    type=archivename_validator(),
                                    type=archivename_validator(),
                                    help='create a new archive with the name ARCHIVE, do not replace existing archive '
                                    help='create a new archive with the name ARCHIVE, do not replace existing archive '
                                         '(only applies for a single archive)')
                                         '(only applies for a single archive)')
+        archive_group.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval',
+                                   type=int, default=1800, metavar='SECONDS',
+                                   help='write checkpoint every SECONDS seconds (Default: 1800)')
         archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default=None,
         archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default=None,
                                    help='add a comment text to the archive')
                                    help='add a comment text to the archive')
         archive_group.add_argument('--timestamp', dest='timestamp',
         archive_group.add_argument('--timestamp', dest='timestamp',
@@ -2453,7 +2447,7 @@ class Archiver:
                                    help='select compression algorithm, see the output of the '
                                    help='select compression algorithm, see the output of the '
                                         '"borg help compression" command for details.')
                                         '"borg help compression" command for details.')
         archive_group.add_argument('--always-recompress', dest='always_recompress', action='store_true',
         archive_group.add_argument('--always-recompress', dest='always_recompress', action='store_true',
-                                   help='always recompress chunks, don\'t skip chunks already compressed with the same'
+                                   help='always recompress chunks, don\'t skip chunks already compressed with the same '
                                         'algorithm.')
                                         'algorithm.')
         archive_group.add_argument('--compression-from', dest='compression_files',
         archive_group.add_argument('--compression-from', dest='compression_files',
                                    type=argparse.FileType('r'), action='append',
                                    type=argparse.FileType('r'), action='append',

+ 54 - 101
src/borg/testsuite/archiver.py

@@ -29,7 +29,7 @@ from ..archiver import Archiver
 from ..cache import Cache
 from ..cache import Cache
 from ..constants import *  # NOQA
 from ..constants import *  # NOQA
 from ..crypto import bytes_to_long, num_aes_blocks
 from ..crypto import bytes_to_long, num_aes_blocks
-from ..helpers import PatternMatcher, parse_pattern
+from ..helpers import PatternMatcher, parse_pattern, Location
 from ..helpers import Chunk, Manifest
 from ..helpers import Chunk, Manifest
 from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
 from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
 from ..helpers import bin_to_hex
 from ..helpers import bin_to_hex
@@ -260,6 +260,9 @@ class ArchiverTestCaseBase(BaseTestCase):
             archive = Archive(repository, key, manifest, name)
             archive = Archive(repository, key, manifest, name)
         return archive, repository
         return archive, repository
 
 
+    def open_repository(self):
+        return Repository(self.repository_path, exclusive=True)
+
     def create_regular_file(self, name, size=0, contents=None):
     def create_regular_file(self, name, size=0, contents=None):
         filename = os.path.join(self.input_path, name)
         filename = os.path.join(self.input_path, name)
         if not os.path.exists(os.path.dirname(filename)):
         if not os.path.exists(os.path.dirname(filename)):
@@ -1626,6 +1629,40 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.cmd('init', self.repository_location, exit_code=1)
             self.cmd('init', self.repository_location, exit_code=1)
         assert not os.path.exists(self.repository_location)
         assert not os.path.exists(self.repository_location)
 
 
+    def check_cache(self):
+        # First run a regular borg check
+        self.cmd('check', self.repository_location)
+        # Then check that the cache on disk matches exactly what's in the repo.
+        with self.open_repository() as repository:
+            manifest, key = Manifest.load(repository)
+            with Cache(repository, key, manifest, sync=False) as cache:
+                original_chunks = cache.chunks
+            cache.destroy(repository)
+            with Cache(repository, key, manifest) as cache:
+                correct_chunks = cache.chunks
+        assert original_chunks is not correct_chunks
+        seen = set()
+        for id, (refcount, size, csize) in correct_chunks.iteritems():
+            o_refcount, o_size, o_csize = original_chunks[id]
+            assert refcount == o_refcount
+            assert size == o_size
+            assert csize == o_csize
+            seen.add(id)
+        for id, (refcount, size, csize) in original_chunks.iteritems():
+            assert id in seen
+
+    def test_check_cache(self):
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        with self.open_repository() as repository:
+            manifest, key = Manifest.load(repository)
+            with Cache(repository, key, manifest, sync=False) as cache:
+                cache.begin_txn()
+                cache.chunks.incref(list(cache.chunks.iteritems())[0][0])
+                cache.commit()
+        with pytest.raises(AssertionError):
+            self.check_cache()
+
     def test_recreate_target_rc(self):
     def test_recreate_target_rc(self):
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         output = self.cmd('recreate', self.repository_location, '--target=asdf', exit_code=2)
         output = self.cmd('recreate', self.repository_location, '--target=asdf', exit_code=2)
@@ -1634,10 +1671,13 @@ class ArchiverTestCase(ArchiverTestCaseBase):
     def test_recreate_target(self):
     def test_recreate_target(self):
         self.create_test_files()
         self.create_test_files()
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
+        self.check_cache()
         archive = self.repository_location + '::test0'
         archive = self.repository_location + '::test0'
         self.cmd('create', archive, 'input')
         self.cmd('create', archive, 'input')
+        self.check_cache()
         original_archive = self.cmd('list', self.repository_location)
         original_archive = self.cmd('list', self.repository_location)
         self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3', '--target=new-archive')
         self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3', '--target=new-archive')
+        self.check_cache()
         archives = self.cmd('list', self.repository_location)
         archives = self.cmd('list', self.repository_location)
         assert original_archive in archives
         assert original_archive in archives
         assert 'new-archive' in archives
         assert 'new-archive' in archives
@@ -1655,6 +1695,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         archive = self.repository_location + '::test0'
         archive = self.repository_location + '::test0'
         self.cmd('create', archive, 'input')
         self.cmd('create', archive, 'input')
         self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3')
         self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3')
+        self.check_cache()
         listing = self.cmd('list', '--short', archive)
         listing = self.cmd('list', '--short', archive)
         assert 'file1' not in listing
         assert 'file1' not in listing
         assert 'dir2/file2' in listing
         assert 'dir2/file2' in listing
@@ -1666,6 +1707,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self._extract_hardlinks_setup()
         self._extract_hardlinks_setup()
         self.cmd('create', self.repository_location + '::test2', 'input')
         self.cmd('create', self.repository_location + '::test2', 'input')
         self.cmd('recreate', self.repository_location + '::test', 'input/dir1')
         self.cmd('recreate', self.repository_location + '::test', 'input/dir1')
+        self.check_cache()
         with changedir('output'):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
             self.cmd('extract', self.repository_location + '::test')
             assert os.stat('input/dir1/hardlink').st_nlink == 2
             assert os.stat('input/dir1/hardlink').st_nlink == 2
@@ -1689,6 +1731,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         # test1 and test2 do not deduplicate
         # test1 and test2 do not deduplicate
         assert num_chunks == unique_chunks
         assert num_chunks == unique_chunks
         self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
         self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
+        self.check_cache()
         # test1 and test2 do deduplicate after recreate
         # test1 and test2 do deduplicate after recreate
         assert not int(self.cmd('list', self.repository_location + '::test1', 'input/large_file',
         assert not int(self.cmd('list', self.repository_location + '::test1', 'input/large_file',
                                 '--format', '{unique_chunks}'))
                                 '--format', '{unique_chunks}'))
@@ -1702,6 +1745,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         size, csize, sha256_before = file_list.split(' ')
         size, csize, sha256_before = file_list.split(' ')
         assert int(csize) >= int(size)  # >= due to metadata overhead
         assert int(csize) >= int(size)  # >= due to metadata overhead
         self.cmd('recreate', self.repository_location, '-C', 'lz4')
         self.cmd('recreate', self.repository_location, '-C', 'lz4')
+        self.check_cache()
         file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
         file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
                              '--format', '{size} {csize} {sha256}')
                              '--format', '{size} {csize} {sha256}')
         size, csize, sha256_after = file_list.split(' ')
         size, csize, sha256_after = file_list.split(' ')
@@ -1714,115 +1758,17 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('create', self.repository_location + '::test', 'input')
         self.cmd('create', self.repository_location + '::test', 'input')
         archives_before = self.cmd('list', self.repository_location + '::test')
         archives_before = self.cmd('list', self.repository_location + '::test')
         self.cmd('recreate', self.repository_location, '-n', '-e', 'input/compressible')
         self.cmd('recreate', self.repository_location, '-n', '-e', 'input/compressible')
+        self.check_cache()
         archives_after = self.cmd('list', self.repository_location + '::test')
         archives_after = self.cmd('list', self.repository_location + '::test')
         assert archives_after == archives_before
         assert archives_after == archives_before
 
 
-    def _recreate_interrupt_patch(self, interrupt_after_n_1_files):
-        def interrupt(self, *args):
-            if interrupt_after_n_1_files:
-                self.interrupt = True
-                pi_save(self, *args)
-            else:
-                raise ArchiveRecreater.Interrupted
-
-        def process_item_patch(*args):
-            return pi_call.pop(0)(*args)
-
-        pi_save = ArchiveRecreater.process_item
-        pi_call = [pi_save] * interrupt_after_n_1_files + [interrupt]
-        return process_item_patch
-
-    def _test_recreate_interrupt(self, change_args, interrupt_early):
-        self.create_test_files()
-        self.create_regular_file('dir2/abcdef', size=1024 * 80)
-        self.cmd('init', self.repository_location)
-        self.cmd('create', self.repository_location + '::test', 'input')
-        process_files = 1
-        if interrupt_early:
-            process_files = 0
-        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(process_files)):
-            self.cmd('recreate', self.repository_location, 'input/dir2')
-        assert 'test.recreate' in self.cmd('list', self.repository_location)
-        if change_args:
-            with patch.object(sys, 'argv', sys.argv + ['non-forking tests don\'t use sys.argv']):
-                output = self.cmd('recreate', '-sv', '--list', '-pC', 'lz4', self.repository_location, 'input/dir2')
-        else:
-            output = self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2')
-        assert 'Found test.recreate, will resume' in output
-        assert change_args == ('Command line changed' in output)
-        if not interrupt_early:
-            assert 'Fast-forwarded to input/dir2/abcdef' in output
-            assert 'A input/dir2/abcdef' not in output
-        assert 'A input/dir2/file2' in output
-        archives = self.cmd('list', self.repository_location)
-        assert 'test.recreate' not in archives
-        assert 'test' in archives
-        files = self.cmd('list', self.repository_location + '::test')
-        assert 'dir2/file2' in files
-        assert 'dir2/abcdef' in files
-        assert 'file1' not in files
-
-    # The _test_create_interrupt requires a deterministic (alphabetic) order of the files to easily check if
-    # resumption works correctly. Patch scandir_inorder to work in alphabetic order.
-
-    def test_recreate_interrupt(self):
-        with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic):
-            self._test_recreate_interrupt(False, True)
-
-    def test_recreate_interrupt2(self):
-        with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic):
-            self._test_recreate_interrupt(True, False)
-
-    def _test_recreate_chunker_interrupt_patch(self):
-        real_add_chunk = Cache.add_chunk
-
-        def add_chunk(*args, **kwargs):
-            frame = inspect.stack()[2]
-            try:
-                caller_self = frame[0].f_locals['self']
-                if isinstance(caller_self, ArchiveRecreater):
-                    caller_self.interrupt = True
-            finally:
-                del frame
-            return real_add_chunk(*args, **kwargs)
-        return add_chunk
-
-    def test_recreate_rechunkify_interrupt(self):
-        self.create_regular_file('file1', size=1024 * 80)
-        self.cmd('init', self.repository_location)
-        self.cmd('create', self.repository_location + '::test', 'input')
-        archive_before = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
-        with patch.object(Cache, 'add_chunk', self._test_recreate_chunker_interrupt_patch()):
-            self.cmd('recreate', '-pv', '--chunker-params', '10,13,11,4095', self.repository_location)
-        assert 'test.recreate' in self.cmd('list', self.repository_location)
-        output = self.cmd('recreate', '-svp', '--debug', '--chunker-params', '10,13,11,4095', self.repository_location)
-        assert 'Found test.recreate, will resume' in output
-        assert 'Copied 1 chunks from a partially processed item' in output
-        archive_after = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
-        assert archive_after == archive_before
-
-    def test_recreate_changed_source(self):
-        self.create_test_files()
-        self.cmd('init', self.repository_location)
-        self.cmd('create', self.repository_location + '::test', 'input')
-        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(1)):
-            self.cmd('recreate', self.repository_location, 'input/dir2')
-        assert 'test.recreate' in self.cmd('list', self.repository_location)
-        self.cmd('delete', self.repository_location + '::test')
-        self.cmd('create', self.repository_location + '::test', 'input')
-        output = self.cmd('recreate', self.repository_location, 'input/dir2')
-        assert 'Source archive changed, will discard test.recreate and start over' in output
-
-    def test_recreate_refuses_temporary(self):
-        self.cmd('init', self.repository_location)
-        self.cmd('recreate', self.repository_location + '::cba.recreate', exit_code=2)
-
     def test_recreate_skips_nothing_to_do(self):
     def test_recreate_skips_nothing_to_do(self):
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('file1', size=1024 * 80)
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         self.cmd('create', self.repository_location + '::test', 'input')
         self.cmd('create', self.repository_location + '::test', 'input')
         info_before = self.cmd('info', self.repository_location + '::test')
         info_before = self.cmd('info', self.repository_location + '::test')
         self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
         self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
+        self.check_cache()
         info_after = self.cmd('info', self.repository_location + '::test')
         info_after = self.cmd('info', self.repository_location + '::test')
         assert info_before == info_after  # includes archive ID
         assert info_before == info_after  # includes archive ID
 
 
@@ -1843,18 +1789,22 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('create', self.repository_location + '::test', 'input')
         self.cmd('create', self.repository_location + '::test', 'input')
 
 
         output = self.cmd('recreate', '--list', '--info', self.repository_location + '::test', '-e', 'input/file2')
         output = self.cmd('recreate', '--list', '--info', self.repository_location + '::test', '-e', 'input/file2')
+        self.check_cache()
         self.assert_in("input/file1", output)
         self.assert_in("input/file1", output)
         self.assert_in("x input/file2", output)
         self.assert_in("x input/file2", output)
 
 
         output = self.cmd('recreate', '--list', self.repository_location + '::test', '-e', 'input/file3')
         output = self.cmd('recreate', '--list', self.repository_location + '::test', '-e', 'input/file3')
+        self.check_cache()
         self.assert_in("input/file1", output)
         self.assert_in("input/file1", output)
         self.assert_in("x input/file3", output)
         self.assert_in("x input/file3", output)
 
 
         output = self.cmd('recreate', self.repository_location + '::test', '-e', 'input/file4')
         output = self.cmd('recreate', self.repository_location + '::test', '-e', 'input/file4')
+        self.check_cache()
         self.assert_not_in("input/file1", output)
         self.assert_not_in("input/file1", output)
         self.assert_not_in("x input/file4", output)
         self.assert_not_in("x input/file4", output)
 
 
         output = self.cmd('recreate', '--info', self.repository_location + '::test', '-e', 'input/file5')
         output = self.cmd('recreate', '--info', self.repository_location + '::test', '-e', 'input/file5')
+        self.check_cache()
         self.assert_not_in("input/file1", output)
         self.assert_not_in("input/file1", output)
         self.assert_not_in("x input/file5", output)
         self.assert_not_in("x input/file5", output)
 
 
@@ -2195,6 +2145,9 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
 class RemoteArchiverTestCase(ArchiverTestCase):
 class RemoteArchiverTestCase(ArchiverTestCase):
     prefix = '__testsuite__:'
     prefix = '__testsuite__:'
 
 
+    def open_repository(self):
+        return RemoteRepository(Location(self.repository_location))
+
     def test_remote_repo_restrict_to_path(self):
     def test_remote_repo_restrict_to_path(self):
         # restricted to repo directory itself:
         # restricted to repo directory itself:
         with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]):
         with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]):