Bläddra i källkod

recreate: re-use existing checkpoint functionality

Marian Beermann 8 år sedan
förälder
incheckning
93b03ea231
2 ändrade filer med 80 tillägg och 89 borttagningar
  1. 75 88
      src/borg/archive.py
  2. 5 1
      src/borg/archiver.py

+ 75 - 88
src/borg/archive.py

@@ -6,6 +6,7 @@ import sys
 import time
 from contextlib import contextmanager
 from datetime import datetime, timezone
+from functools import partial
 from getpass import getuser
 from io import BytesIO
 from itertools import groupby
@@ -741,28 +742,32 @@ Number of files: {0.stats.nfiles}'''.format(
         self.add_item(item)
         return 's'  # symlink
 
-    def chunk_file(self, item, cache, stats, fd, fh=-1, **chunk_kw):
-        def write_part(item, from_chunk, number):
-            item = Item(internal_dict=item.as_dict())
-            length = len(item.chunks)
-            # the item should only have the *additional* chunks we processed after the last partial item:
-            item.chunks = item.chunks[from_chunk:]
-            item.path += '.borg_part_%d' % number
-            item.part = number
-            number += 1
-            self.add_item(item, show_progress=False)
-            self.write_checkpoint()
-            return length, number
+    def write_part_file(self, item, from_chunk, number):
+        item = Item(internal_dict=item.as_dict())
+        length = len(item.chunks)
+        # the item should only have the *additional* chunks we processed after the last partial item:
+        item.chunks = item.chunks[from_chunk:]
+        item.path += '.borg_part_%d' % number
+        item.part = number
+        number += 1
+        self.add_item(item, show_progress=False)
+        self.write_checkpoint()
+        return length, number
+
+    def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
+        if not chunk_processor:
+            def chunk_processor(data):
+                return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)
 
         item.chunks = []
         from_chunk = 0
         part_number = 1
-        for data in backup_io_iter(self.chunker.chunkify(fd, fh)):
-            item.chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats))
+        for data in chunk_iter:
+            item.chunks.append(chunk_processor(data))
             if self.show_progress:
                 self.stats.show_progress(item=item, dt=0.2)
             if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
-                from_chunk, part_number = write_part(item, from_chunk, part_number)
+                from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
                 self.last_checkpoint = time.time()
         else:
             if part_number > 1:
@@ -770,7 +775,7 @@ Number of files: {0.stats.nfiles}'''.format(
                     # if we already have created a part item inside this file, we want to put the final
                     # chunks (if any) into a part item also (so all parts can be concatenated to get
                     # the complete file):
-                    from_chunk, part_number = write_part(item, from_chunk, part_number)
+                    from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
                     self.last_checkpoint = time.time()
 
                 # if we created part files, we have referenced all chunks from the part files,
@@ -789,7 +794,7 @@ Number of files: {0.stats.nfiles}'''.format(
             mtime=t, atime=t, ctime=t,
         )
         fd = sys.stdin.buffer  # binary
-        self.chunk_file(item, cache, self.stats, fd)
+        self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
         self.stats.nfiles += 1
         self.add_item(item)
         return 'i'  # stdin
@@ -845,7 +850,7 @@ Number of files: {0.stats.nfiles}'''.format(
             with backup_io():
                 fh = Archive._open_rb(path)
             with os.fdopen(fh, 'rb') as fd:
-                self.chunk_file(item, cache, self.stats, fd, fh, compress=compress)
+                self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)), compress=compress)
             if not is_special_file:
                 # we must not memorize special files, because the contents of e.g. a
                 # block or char device will change without its mtime/size/inode changing.
@@ -1386,7 +1391,8 @@ class ArchiveRecreater:
     def __init__(self, repository, manifest, key, cache, matcher,
                  exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
                  chunker_params=None, compression=None, compression_files=None, always_recompress=False,
-                 dry_run=False, stats=False, progress=False, file_status_printer=None):
+                 dry_run=False, stats=False, progress=False, file_status_printer=None,
+                 checkpoint_interval=1800):
         self.repository = repository
         self.key = key
         self.manifest = manifest
@@ -1410,9 +1416,7 @@ class ArchiveRecreater:
         self.stats = stats
         self.progress = progress
         self.print_file_status = file_status_printer or (lambda *args: None)
-
-        self.interrupt = False
-        self.errors = False
+        self.checkpoint_interval = checkpoint_interval
 
     def recreate(self, archive_name, comment=None, target_name=None):
         assert not self.is_temporary_archive(archive_name)
@@ -1466,7 +1470,7 @@ class ArchiveRecreater:
 
     def process_item(self, archive, target, item):
         if 'chunks' in item:
-            item.chunks = self.process_chunks(archive, target, item)
+            self.process_chunks(archive, target, item)
             target.stats.nfiles += 1
         target.add_item(item)
         self.print_file_status(file_status(item.mode), item.path)
@@ -1477,77 +1481,62 @@ class ArchiveRecreater:
             for chunk_id, size, csize in item.chunks:
                 self.cache.chunk_incref(chunk_id, target.stats)
             return item.chunks
-        new_chunks = []
-        chunk_iterator = self.create_chunk_iterator(archive, target, item)
+        chunk_iterator = self.create_chunk_iterator(archive, target, list(item.chunks))
         compress = self.compression_decider1.decide(item.path)
-        for chunk in chunk_iterator:
-            chunk.meta['compress'] = compress
-            chunk_id = self.key.id_hash(chunk.data)
-            if chunk_id in self.seen_chunks:
-                new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
-            else:
-                compression_spec, chunk = self.key.compression_decider2.decide(chunk)
-                overwrite = self.recompress
-                if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
-                    # Check if this chunk is already compressed the way we want it
-                    old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
-                    if Compressor.detect(old_chunk.data).name == compression_spec['name']:
-                        # Stored chunk has the same compression we wanted
-                        overwrite = False
-                chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
-                new_chunks.append((chunk_id, size, csize))
-                self.seen_chunks.add(chunk_id)
-            if self.progress:
-                target.stats.show_progress(item=item, dt=0.2)
-        return new_chunks
-
-    def create_chunk_iterator(self, archive, target, item):
+        chunk_processor = partial(self.chunk_processor, target, compress)
+        target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
+
+    def chunk_processor(self, target, compress, data):
+        chunk_id = self.key.id_hash(data)
+        if chunk_id in self.seen_chunks:
+            return self.cache.chunk_incref(chunk_id, target.stats)
+        chunk = Chunk(data, compress=compress)
+        compression_spec, chunk = self.key.compression_decider2.decide(chunk)
+        overwrite = self.recompress
+        if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
+            # Check if this chunk is already compressed the way we want it
+            old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
+            if Compressor.detect(old_chunk.data).name == compression_spec['name']:
+                # Stored chunk has the same compression we wanted
+                overwrite = False
+        chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
+        self.seen_chunks.add(chunk_id)
+        return chunk_id, size, csize
+
+    def create_chunk_iterator(self, archive, target, chunks):
         """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
-        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item.chunks])
+        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
         if target.recreate_rechunkify:
             # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
             # (does not load the entire file into memory)
             file = ChunkIteratorFileWrapper(chunk_iterator)
+            return target.chunker.chunkify(file)
+        else:
+            for chunk in chunk_iterator:
+                yield chunk.data
 
-            def _chunk_iterator():
-                for data in target.chunker.chunkify(file):
-                    yield Chunk(data)
-
-            chunk_iterator = _chunk_iterator()
-        return chunk_iterator
-
-    def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True):
+    def save(self, archive, target, comment=None, replace_original=True):
         """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
         if self.dry_run:
-            return completed
-        if completed:
-            timestamp = archive.ts.replace(tzinfo=None)
-            if comment is None:
-                comment = archive.metadata.get('comment', '')
-            target.save(timestamp=timestamp, comment=comment, additional_metadata={
-                'cmdline': archive.metadata.cmdline,
-                'recreate_cmdline': sys.argv,
-            })
-            if replace_original:
-                archive.delete(Statistics(), progress=self.progress)
-                target.rename(archive.name)
-            if self.stats:
-                target.end = datetime.utcnow()
-                log_multi(DASHES,
-                          str(target),
-                          DASHES,
-                          str(target.stats),
-                          str(self.cache),
-                          DASHES)
-        else:
-            additional_metadata = metadata or {}
-            additional_metadata.update({
-                'recreate_source_id': archive.id,
-                'recreate_args': sys.argv[1:],
-            })
-            target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
-            logger.info('Run the same command again to resume.')
-        return completed
+            return
+        timestamp = archive.ts.replace(tzinfo=None)
+        if comment is None:
+            comment = archive.metadata.get('comment', '')
+        target.save(timestamp=timestamp, comment=comment, additional_metadata={
+            'cmdline': archive.metadata.cmdline,
+            'recreate_cmdline': sys.argv,
+        })
+        if replace_original:
+            archive.delete(Statistics(), progress=self.progress)
+            target.rename(archive.name)
+        if self.stats:
+            target.end = datetime.utcnow()
+            log_multi(DASHES,
+                      str(target),
+                      DASHES,
+                      str(target.stats),
+                      str(self.cache),
+                      DASHES)
 
     def matcher_add_tagged_dirs(self, archive):
         """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
@@ -1595,9 +1584,7 @@ class ArchiveRecreater:
     def create_target_archive(self, name):
         target = Archive(self.repository, self.key, self.manifest, name, create=True,
                           progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
-                          checkpoint_interval=0, compression=self.compression)
-        target.recreate_partial_chunks = None
-        target.recreate_uncomitted_bytes = 0
+                          checkpoint_interval=self.checkpoint_interval, compression=self.compression)
         return target
 
     def open_archive(self, name, **kwargs):

+ 5 - 1
src/borg/archiver.py

@@ -1075,6 +1075,7 @@ class Archiver:
                                      always_recompress=args.always_recompress,
                                      progress=args.progress, stats=args.stats,
                                      file_status_printer=self.print_file_status,
+                                     checkpoint_interval=args.checkpoint_interval,
                                      dry_run=args.dry_run)
 
         if args.location.archive:
@@ -2412,6 +2413,9 @@ class Archiver:
                                    type=archivename_validator(),
                                    help='create a new archive with the name ARCHIVE, do not replace existing archive '
                                         '(only applies for a single archive)')
+        archive_group.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval',
+                                   type=int, default=1800, metavar='SECONDS',
+                                   help='write checkpoint every SECONDS seconds (Default: 1800)')
         archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default=None,
                                    help='add a comment text to the archive')
         archive_group.add_argument('--timestamp', dest='timestamp',
@@ -2424,7 +2428,7 @@ class Archiver:
                                    help='select compression algorithm, see the output of the '
                                         '"borg help compression" command for details.')
         archive_group.add_argument('--always-recompress', dest='always_recompress', action='store_true',
-                                   help='always recompress chunks, don\'t skip chunks already compressed with the same'
+                                   help='always recompress chunks, don\'t skip chunks already compressed with the same '
                                         'algorithm.')
         archive_group.add_argument('--compression-from', dest='compression_files',
                                    type=argparse.FileType('r'), action='append',