Browse Source

borg recreate: Re-create existing archives.

Use with caution: permanent data loss by specifying incorrect patterns
is easily possible. Make a dry run to make sure you got everything right.

borg recreate has many uses:
- Can selectively remove files/dirs from old archives, e.g. to free
  space or purging picturarum biggus dickus from history
- Recompress data
- Rechunkify data, to have upgraded Attic / Borg 0.xx archives deduplicate
  with Borg 1.x archives. (Or to experiment with chunker-params for
  specific use cases

It is interrupt- and resumable.

Chunks are not freed on-the-fly.
Rationale:
  Makes only sense when rechunkifying, but logic on which new chunks to
  free what input chunks is complicated and *very* delicate.

Future TODOs:
- Refactor tests using py.test fixtures
  -- would require porting ArchiverTestCase to py.test: many changes,
     this changeset is already borderline too large.
- Possibly add a --target option to not replace the source archive
  -- with the target possibly in another Repo
     (better than "cp" due to full integrity checking, and deduplication
      at the target)
- Detect and skip (unless --always-recompress) already recompressed chunks

Fixes #787 #686 #630 #70 (and probably some I overlooked)
Also see #757 and #770
Marian Beermann 9 years ago
parent
commit
a3ee9d2c5f
8 changed files with 918 additions and 31 deletions
  1. 319 6
      borg/archive.py
  2. 141 5
      borg/archiver.py
  3. 9 3
      borg/cache.py
  4. 79 2
      borg/helpers.py
  5. 240 14
      borg/testsuite/archiver.py
  6. 12 1
      borg/testsuite/helpers.py
  7. 20 0
      docs/usage.rst
  8. 98 0
      docs/usage/recreate.rst.inc

+ 319 - 6
borg/archive.py

@@ -16,10 +16,12 @@ import sys
 import time
 from io import BytesIO
 from . import xattr
+from .compress import Compressor, COMPR_BUFFER
 from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
     parse_timestamp, to_localtime, format_time, format_timedelta, \
     Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
-    ProgressIndicatorPercent
+    ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, DASHES, PatternMatcher, \
+    PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume
 from .platform import acl_get, acl_set
 from .chunker import Chunker
 from .hashindex import ChunkIndex
@@ -231,7 +233,7 @@ Number of files: {0.stats.nfiles}'''.format(
         if self.show_progress:
             self.stats.show_progress(item=item, dt=0.2)
         self.items_buffer.add(item)
-        if time.time() - self.last_checkpoint > self.checkpoint_interval:
+        if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
             self.write_checkpoint()
             self.last_checkpoint = time.time()
 
@@ -240,7 +242,7 @@ Number of files: {0.stats.nfiles}'''.format(
         del self.manifest.archives[self.checkpoint_name]
         self.cache.chunk_decref(self.id, self.stats)
 
-    def save(self, name=None, comment=None, timestamp=None):
+    def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
         name = name or self.name
         if name in self.manifest.archives:
             raise self.AlreadyExists(name)
@@ -253,7 +255,7 @@ Number of files: {0.stats.nfiles}'''.format(
             self.end = timestamp
             start = timestamp
             end = timestamp  # we only have 1 value
-        metadata = StableDict({
+        metadata = {
             'version': 1,
             'name': name,
             'comment': comment,
@@ -264,8 +266,9 @@ Number of files: {0.stats.nfiles}'''.format(
             'time': start.isoformat(),
             'time_end': end.isoformat(),
             'chunker_params': self.chunker_params,
-        })
-        data = msgpack.packb(metadata, unicode_errors='surrogateescape')
+        }
+        metadata.update(additional_metadata or {})
+        data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
         self.id = self.key.id_hash(data)
         self.cache.add_chunk(self.id, data, self.stats)
         self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
@@ -456,6 +459,7 @@ Number of files: {0.stats.nfiles}'''.format(
         self.cache.add_chunk(new_id, data, self.stats)
         self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
         self.cache.chunk_decref(self.id, self.stats)
+        self.id = new_id
 
     def rename(self, name):
         if name in self.manifest.archives:
@@ -923,3 +927,312 @@ class ArchiveChecker:
         if self.repair:
             self.manifest.write()
             self.repository.commit(save_space=save_space)
+
+
+class ArchiveRecreater:
+    AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
+    """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
+
+    class FakeTargetArchive:
+        def __init__(self):
+            self.stats = Statistics()
+
+    class Interrupted(Exception):
+        def __init__(self, metadata=None):
+            self.metadata = metadata or {}
+
+    @staticmethod
+    def is_temporary_archive(archive_name):
+        return archive_name.endswith('.recreate')
+
+    def __init__(self, repository, manifest, key, cache, matcher,
+                 exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
+                 chunker_params=None, compression=None,
+                 dry_run=False, stats=False, progress=False, file_status_printer=None):
+        self.repository = repository
+        self.key = key
+        self.manifest = manifest
+        self.cache = cache
+
+        self.matcher = matcher
+        self.exclude_caches = exclude_caches
+        self.exclude_if_present = exclude_if_present or []
+        self.keep_tag_files = keep_tag_files
+
+        self.chunker_params = chunker_params or CHUNKER_PARAMS
+        self.compression = compression or dict(name='none')
+        self.seen_chunks = set()
+        self.recompress = bool(compression)
+        compr_args = dict(buffer=COMPR_BUFFER)
+        compr_args.update(self.compression)
+        key.compressor = Compressor(**compr_args)
+
+        self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
+        logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
+
+        self.dry_run = dry_run
+        self.stats = stats
+        self.progress = progress
+        self.print_file_status = file_status_printer or (lambda *args: None)
+
+        self.interrupt = False
+        self.errors = False
+
+    def recreate(self, archive_name):
+        assert not self.is_temporary_archive(archive_name)
+        archive = self.open_archive(archive_name)
+        target, resume_from = self.create_target_or_resume(archive)
+        if self.exclude_if_present or self.exclude_caches:
+            self.matcher_add_tagged_dirs(archive)
+        if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify:
+            logger.info("Skipping archive %s, nothing to do", archive_name)
+            return True
+        try:
+            self.process_items(archive, target, resume_from)
+        except self.Interrupted as e:
+            return self.save(archive, target, completed=False, metadata=e.metadata)
+        return self.save(archive, target)
+
+    def process_items(self, archive, target, resume_from=None):
+        matcher = self.matcher
+        target_is_subset = not matcher.empty()
+        hardlink_masters = {} if target_is_subset else None
+
+        def item_is_hardlink_master(item):
+            return (target_is_subset and
+                    stat.S_ISREG(item[b'mode']) and
+                    item.get(b'hardlink_master', True) and
+                    b'source' not in item and
+                    not matcher.match(item[b'path']))
+
+        for item in archive.iter_items():
+            if item_is_hardlink_master(item):
+                # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
+                hardlink_masters[item[b'path']] = (item.get(b'chunks'), None)
+                continue
+            if resume_from:
+                # Fast forward to after the last processed file
+                if item[b'path'] == resume_from:
+                    logger.info('Fast-forwarded to %s', remove_surrogates(item[b'path']))
+                    resume_from = None
+                continue
+            if not matcher.match(item[b'path']):
+                self.print_file_status('x', item[b'path'])
+                continue
+            if target_is_subset and stat.S_ISREG(item[b'mode']) and item.get(b'source') in hardlink_masters:
+                # master of this hard link is outside the target subset
+                chunks, new_source = hardlink_masters[item[b'source']]
+                if new_source is None:
+                    # First item to use this master, move the chunks
+                    item[b'chunks'] = chunks
+                    hardlink_masters[item[b'source']] = (None, item[b'path'])
+                    del item[b'source']
+                else:
+                    # Master was already moved, only update this item's source
+                    item[b'source'] = new_source
+            if self.dry_run:
+                self.print_file_status('-', item[b'path'])
+            else:
+                try:
+                    self.process_item(archive, target, item)
+                except self.Interrupted:
+                    if self.progress:
+                        target.stats.show_progress(final=True)
+                    raise
+        if self.progress:
+            target.stats.show_progress(final=True)
+
+    def process_item(self, archive, target, item):
+        if b'chunks' in item:
+            item[b'chunks'] = self.process_chunks(archive, target, item)
+            target.stats.nfiles += 1
+        target.add_item(item)
+        self.print_file_status(file_status(item[b'mode']), item[b'path'])
+        if self.interrupt:
+            raise self.Interrupted
+
+    def process_chunks(self, archive, target, item):
+        """Return new chunk ID list for 'item'."""
+        if not self.recompress and not target.recreate_rechunkify:
+            for chunk_id, size, csize in item[b'chunks']:
+                self.cache.chunk_incref(chunk_id, target.stats)
+            return item[b'chunks']
+        new_chunks = self.process_partial_chunks(target)
+        chunk_iterator = self.create_chunk_iterator(archive, target, item)
+        consume(chunk_iterator, len(new_chunks))
+        for chunk in chunk_iterator:
+            chunk_id = self.key.id_hash(chunk)
+            if chunk_id in self.seen_chunks:
+                new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
+            else:
+                # TODO: detect / skip / --always-recompress
+                chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
+                new_chunks.append((chunk_id, size, csize))
+                self.seen_chunks.add(chunk_id)
+                if self.recompress:
+                    # This tracks how many bytes are uncommitted but compactable, since we are recompressing
+                    # existing chunks.
+                    target.recreate_uncomitted_bytes += csize
+                    if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
+                        # Issue commits to limit additional space usage when recompressing chunks
+                        target.recreate_uncomitted_bytes = 0
+                        self.repository.commit()
+            if self.progress:
+                target.stats.show_progress(item=item, dt=0.2)
+            if self.interrupt:
+                raise self.Interrupted({
+                    'recreate_partial_chunks': new_chunks,
+                })
+        return new_chunks
+
+    def create_chunk_iterator(self, archive, target, item):
+        """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
+        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item[b'chunks']])
+        if target.recreate_rechunkify:
+            # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
+            # (does not load the entire file into memory)
+            file = ChunkIteratorFileWrapper(chunk_iterator)
+            chunk_iterator = target.chunker.chunkify(file)
+        return chunk_iterator
+
+    def process_partial_chunks(self, target):
+        """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
+        if not target.recreate_partial_chunks:
+            return []
+        # No incref, create_target_or_resume already did that before to deleting the old target archive
+        # So just copy these over
+        partial_chunks = target.recreate_partial_chunks
+        target.recreate_partial_chunks = None
+        for chunk_id, size, csize in partial_chunks:
+            self.seen_chunks.add(chunk_id)
+        logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
+        return partial_chunks
+
+    def save(self, archive, target, completed=True, metadata=None):
+        """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
+        if self.dry_run:
+            return completed
+        if completed:
+            timestamp = archive.ts.replace(tzinfo=None)
+            target.save(timestamp=timestamp, additional_metadata={
+                'cmdline': archive.metadata[b'cmdline'],
+                'recreate_cmdline': sys.argv,
+            })
+            archive.delete(Statistics(), progress=self.progress)
+            target.rename(archive.name)
+            if self.stats:
+                target.end = datetime.utcnow()
+                log_multi(DASHES,
+                          str(target),
+                          DASHES,
+                          str(target.stats),
+                          str(self.cache),
+                          DASHES)
+        else:
+            additional_metadata = metadata or {}
+            additional_metadata.update({
+                'recreate_source_id': archive.id,
+                'recreate_args': sys.argv[1:],
+            })
+            target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
+            logger.info('Run the same command again to resume.')
+        return completed
+
+    def matcher_add_tagged_dirs(self, archive):
+        """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
+        def exclude(dir, tag_item):
+            if self.keep_tag_files:
+                tag_files.append(PathPrefixPattern(tag_item[b'path']))
+                tagged_dirs.append(FnmatchPattern(dir + '/'))
+            else:
+                tagged_dirs.append(PathPrefixPattern(dir))
+
+        matcher = self.matcher
+        tag_files = []
+        tagged_dirs = []
+        # build hardlink masters, but only for paths ending in CACHEDIR.TAG, so we can read hard-linked CACHEDIR.TAGs
+        cachedir_masters = {}
+
+        for item in archive.iter_items(
+                filter=lambda item: item[b'path'].endswith('CACHEDIR.TAG') or matcher.match(item[b'path'])):
+            if item[b'path'].endswith('CACHEDIR.TAG'):
+                cachedir_masters[item[b'path']] = item
+            if stat.S_ISREG(item[b'mode']):
+                dir, tag_file = os.path.split(item[b'path'])
+                if tag_file in self.exclude_if_present:
+                    exclude(dir, item)
+                if self.exclude_caches and tag_file == 'CACHEDIR.TAG':
+                    tag_contents = b'Signature: 8a477f597d28d172789f06886806bc55'
+                    if b'chunks' in item:
+                        file = open_item(archive, item)
+                    else:
+                        file = open_item(archive, cachedir_masters[item[b'source']])
+                    if file.read(len(tag_contents)).startswith(tag_contents):
+                        exclude(dir, item)
+        matcher.add(tag_files, True)
+        matcher.add(tagged_dirs, False)
+
+    def create_target_or_resume(self, archive):
+        """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
+        if self.dry_run:
+            return self.FakeTargetArchive(), None
+        target_name = archive.name + '.recreate'
+        resume = target_name in self.manifest.archives
+        target, resume_from = None, None
+        if resume:
+            target, resume_from = self.try_resume(archive, target_name)
+        if not target:
+            target = self.create_target_archive(target_name)
+        # If the archives use the same chunker params, then don't rechunkify
+        target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
+        return target, resume_from
+
+    def try_resume(self, archive, target_name):
+        """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
+        logger.info('Found %s, will resume interrupted operation', target_name)
+        old_target = self.open_archive(target_name)
+        resume_id = old_target.metadata[b'recreate_source_id']
+        resume_args = [arg.decode('utf-8', 'surrogateescape') for arg in old_target.metadata[b'recreate_args']]
+        if resume_id != archive.id:
+            logger.warning('Source archive changed, will discard %s and start over', target_name)
+            logger.warning('Saved fingerprint:   %s', hexlify(resume_id).decode('ascii'))
+            logger.warning('Current fingerprint: %s', archive.fpr)
+            old_target.delete(Statistics(), progress=self.progress)
+            return None, None  # can't resume
+        if resume_args != sys.argv[1:]:
+            logger.warning('Command line changed, this might lead to inconsistencies')
+            logger.warning('Saved:   %s', repr(resume_args))
+            logger.warning('Current: %s', repr(sys.argv[1:]))
+        target = self.create_target_archive(target_name + '.temp')
+        logger.info('Replaying items from interrupted operation...')
+        item = None
+        for item in old_target.iter_items():
+            if b'chunks' in item:
+                for chunk in item[b'chunks']:
+                    self.cache.chunk_incref(chunk[0], target.stats)
+                target.stats.nfiles += 1
+            target.add_item(item)
+        if item:
+            resume_from = item[b'path']
+        else:
+            resume_from = None
+        if self.progress:
+            old_target.stats.show_progress(final=True)
+        target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
+        for chunk_id, _, _ in target.recreate_partial_chunks:
+            # incref now, otherwise old_target.delete() might delete these chunks
+            self.cache.chunk_incref(chunk_id, target.stats)
+        old_target.delete(Statistics(), progress=self.progress)
+        logger.info('Done replaying items')
+        return target, resume_from
+
+    def create_target_archive(self, name):
+        target = Archive(self.repository, self.key, self.manifest, name, create=True,
+                          progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
+                          checkpoint_interval=0)
+        target.recreate_partial_chunks = None
+        target.recreate_uncomitted_bytes = 0
+        return target
+
+    def open_archive(self, name, **kwargs):
+        return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)

+ 141 - 5
borg/archiver.py

@@ -22,7 +22,7 @@ from .helpers import Error, location_validator, archivename_validator, format_ti
     get_cache_dir, prune_within, prune_split, \
     Manifest, remove_surrogates, update_excludes, format_archive, check_extension_modules, Statistics, \
     dir_is_tagged, ChunkerParams, CompressionSpec, is_slow_msgpack, yes, sysinfo, \
-    EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ItemFormatter
+    EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ItemFormatter, DASHES
 from .logger import create_logger, setup_logging
 logger = create_logger()
 from .compress import Compressor, COMPR_BUFFER
@@ -30,7 +30,7 @@ from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader
 from .repository import Repository
 from .cache import Cache
 from .key import key_creator, RepoKey, PassphraseKey
-from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS
+from .archive import Archive, ArchiveChecker, ArchiveRecreater, CHUNKER_PARAMS
 from .remote import RepositoryServer, RemoteRepository, cache_if_remote
 
 has_lchflags = hasattr(os, 'lchflags')
@@ -38,8 +38,6 @@ has_lchflags = hasattr(os, 'lchflags')
 # default umask, overriden by --umask, defaults to read/write only for owner
 UMASK_DEFAULT = 0o077
 
-DASHES = '-' * 78
-
 
 def argument(args, str_or_bool):
     """If bool is passed, return it. If str is passed, retrieve named attribute from args."""
@@ -402,7 +400,7 @@ class Archiver:
                 filter=lambda item: item_is_hardlink_master(item) or matcher.match(item[b'path'])):
             orig_path = item[b'path']
             if item_is_hardlink_master(item):
-                hardlink_masters[orig_path] = (item.get(b'chunks'), item.get(b'source'))
+                hardlink_masters[orig_path] = (item.get(b'chunks'), None)
             if not matcher.match(item[b'path']):
                 continue
             if strip_components:
@@ -823,6 +821,50 @@ class Archiver:
             print("warning: %s" % e)
         return self.exit_code
 
+    @with_repository(cache=True, exclusive=True)
+    def do_recreate(self, args, repository, manifest, key, cache):
+        """Re-create archives"""
+        def interrupt(signal_num, stack_frame):
+            if recreater.interrupt:
+                print("Received signal, again. I'm not deaf.\n", file=sys.stderr)
+            else:
+                print("Received signal, will exit cleanly.\n", file=sys.stderr)
+            recreater.interrupt = True
+
+        matcher, include_patterns = self.build_matcher(args.excludes, args.paths)
+        self.output_list = args.output_list
+        self.output_filter = args.output_filter
+
+        recreater = ArchiveRecreater(repository, manifest, key, cache, matcher,
+                                     exclude_caches=args.exclude_caches, exclude_if_present=args.exclude_if_present,
+                                     keep_tag_files=args.keep_tag_files,
+                                     compression=args.compression, chunker_params=args.chunker_params,
+                                     progress=args.progress, stats=args.stats,
+                                     file_status_printer=self.print_file_status,
+                                     dry_run=args.dry_run)
+
+        signal.signal(signal.SIGTERM, interrupt)
+        signal.signal(signal.SIGINT, interrupt)
+
+        if args.location.archive:
+            name = args.location.archive
+            if recreater.is_temporary_archive(name):
+                self.print_error('Refusing to work on temporary archive of prior recreate: %s', name)
+                return self.exit_code
+            recreater.recreate(name)
+        else:
+            for archive in manifest.list_archive_infos(sort_by='ts'):
+                name = archive.name
+                if recreater.is_temporary_archive(name):
+                    continue
+                print('Processing', name)
+                if not recreater.recreate(name):
+                    break
+        manifest.write()
+        repository.commit()
+        cache.commit()
+        return self.exit_code
+
     @with_repository()
     def do_debug_dump_archive_items(self, args, repository, manifest, key):
         """dump (decrypted, decompressed) archive items metadata (not: data)"""
@@ -1645,6 +1687,100 @@ class Archiver:
                                type=location_validator(archive=False),
                                help='path to the repository to be upgraded')
 
+        recreate_epilog = textwrap.dedent("""
+        Recreate the contents of existing archives.
+
+        --exclude, --exclude-from and PATH have the exact same semantics
+        as in "borg create". If a PATH is specified the resulting archive
+        will only contain files under PATH.
+
+        --compression: all chunks seen will be stored using the given method.
+        Due to how Borg stores compressed size information this might display
+        incorrect information for archives that were not rewritten at the same time.
+        There is no risk of data loss by this.
+
+        --chunker-params will re-chunk all files in the archive, this can be
+        used to have upgraded Borg 0.xx or Attic archives deduplicate with
+        Borg 1.x archives.
+
+        borg recreate is signal safe. Send either SIGINT (Ctrl-C on most terminals) or
+        SIGTERM to request termination.
+
+        Use the *exact same* command line to resume the operation later - changing excludes
+        or paths will lead to inconsistencies (changed excludes will only apply to newly
+        processed files/dirs). Changing compression leads to incorrect size information
+        (which does not cause any data loss, but can be misleading).
+        Changing chunker params between invocations might lead to data loss.
+
+        USE WITH CAUTION.
+        Permanent data loss by specifying incorrect patterns or PATHS is possible.
+        When in doubt, use "--dry-run --verbose --list" to see how patterns/PATHS are
+        interpreted.
+
+        The archive being recreated is only removed after the operation completes. The
+        archive that is built during the operation exists at the same time at
+        "<ARCHIVE>.recreate". The new archive will have a different archive ID.
+
+        When rechunking space usage can be substantial, expect at least the entire
+        deduplicated size of the archives using the older chunker params.
+        When recompressing approximately 1 % of the repository size or 512 MB
+        (whichever is greater) of additional space is used.
+        """)
+        subparser = subparsers.add_parser('recreate', parents=[common_parser], add_help=False,
+                                          description=self.do_recreate.__doc__,
+                                          epilog=recreate_epilog,
+                                          formatter_class=argparse.RawDescriptionHelpFormatter,
+                                          help=self.do_recreate.__doc__)
+        subparser.set_defaults(func=self.do_recreate)
+        subparser.add_argument('--list', dest='output_list',
+                               action='store_true', default=False,
+                               help='output verbose list of items (files, dirs, ...)')
+        subparser.add_argument('--filter', dest='output_filter', metavar='STATUSCHARS',
+                               help='only display items with the given status characters')
+        subparser.add_argument('-p', '--progress', dest='progress',
+                               action='store_true', default=False,
+                               help='show progress display while rewriting archives')
+        subparser.add_argument('-n', '--dry-run', dest='dry_run',
+                               action='store_true', default=False,
+                               help='do not change anything')
+        subparser.add_argument('-s', '--stats', dest='stats',
+                               action='store_true', default=False,
+                               help='print statistics at end')
+        subparser.add_argument('-e', '--exclude', dest='excludes',
+                               type=parse_pattern, action='append',
+                               metavar="PATTERN", help='exclude paths matching PATTERN')
+        subparser.add_argument('--exclude-from', dest='exclude_files',
+                               type=argparse.FileType('r'), action='append',
+                               metavar='EXCLUDEFILE', help='read exclude patterns from EXCLUDEFILE, one per line')
+        subparser.add_argument('--exclude-caches', dest='exclude_caches',
+                               action='store_true', default=False,
+                               help='exclude directories that contain a CACHEDIR.TAG file ('
+                                    'http://www.brynosaurus.com/cachedir/spec.html)')
+        subparser.add_argument('--exclude-if-present', dest='exclude_if_present',
+                               metavar='FILENAME', action='append', type=str,
+                               help='exclude directories that contain the specified file')
+        subparser.add_argument('--keep-tag-files', dest='keep_tag_files',
+                               action='store_true', default=False,
+                               help='keep tag files of excluded caches/directories')
+        archive_group.add_argument('-C', '--compression', dest='compression',
+                                   type=CompressionSpec, default=None, metavar='COMPRESSION',
+                                   help='select compression algorithm (and level):\n'
+                                        'none == no compression (default),\n'
+                                        'lz4 == lz4,\n'
+                                        'zlib == zlib (default level 6),\n'
+                                        'zlib,0 .. zlib,9 == zlib (with level 0..9),\n'
+                                        'lzma == lzma (default level 6),\n'
+                                        'lzma,0 .. lzma,9 == lzma (with level 0..9).')
+        subparser.add_argument('--chunker-params', dest='chunker_params',
+                               type=ChunkerParams, default=None,
+                               metavar='CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE',
+                               help='specify the chunker parameters (or "default").')
+        subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='',
+                               type=location_validator(),
+                               help='repository/archive to recreate')
+        subparser.add_argument('paths', metavar='PATH', nargs='*', type=str,
+                               help='paths to recreate; patterns are supported')
+
         subparser = subparsers.add_parser('help', parents=[common_parser], add_help=False,
                                           description='Extra help')
         subparser.add_argument('--epilog-only', dest='epilog_only',

+ 9 - 3
borg/cache.py

@@ -108,6 +108,11 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             stats[field] = format_file_size(stats[field])
         return Summary(**stats)
 
+    def chunks_stored_size(self):
+        Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 'total_chunks'])
+        stats = Summary(*self.chunks.summarize())
+        return stats.unique_csize
+
     def create(self):
         """Create a new empty cache at `self.path`
         """
@@ -358,16 +363,17 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             self.do_cache = os.path.isdir(archive_path)
             self.chunks = create_master_idx(self.chunks)
 
-    def add_chunk(self, id, data, stats):
+    def add_chunk(self, id, data, stats, overwrite=False):
         if not self.txn_active:
             self.begin_txn()
         size = len(data)
-        if self.seen_chunk(id, size):
+        refcount = self.seen_chunk(id, size)
+        if refcount and not overwrite:
             return self.chunk_incref(id, stats)
         data = self.key.encrypt(data)
         csize = len(data)
         self.repository.put(id, data, wait=False)
-        self.chunks[id] = (1, size, csize)
+        self.chunks[id] = (refcount + 1, size, csize)
         stats.update(size, csize, True)
         return id, size, csize
 

+ 79 - 2
borg/helpers.py

@@ -1,9 +1,10 @@
 import argparse
 from binascii import hexlify
-from collections import namedtuple
+from collections import namedtuple, deque
 from functools import wraps, partial
 import grp
 import hashlib
+from itertools import islice
 import os
 import stat
 import textwrap
@@ -15,8 +16,8 @@ from string import Formatter
 import platform
 import time
 import unicodedata
-
 import logging
+
 from .logger import create_logger
 logger = create_logger()
 
@@ -40,6 +41,8 @@ EXIT_SUCCESS = 0  # everything done, no problems
 EXIT_WARNING = 1  # reached normal end of operation, but there were issues
 EXIT_ERROR = 2  # terminated abruptly, did not reach end of operation
 
+DASHES = '-' * 78
+
 
 class Error(Exception):
     """Error base class"""
@@ -491,6 +494,9 @@ def timestamp(s):
 
 
 def ChunkerParams(s):
+    if s.strip().lower() == "default":
+        from .archive import CHUNKER_PARAMS
+        return CHUNKER_PARAMS
     chunk_min, chunk_max, chunk_mask, window_size = s.split(',')
     if int(chunk_max) > 23:
         # do not go beyond 2**23 (8MB) chunk size now,
@@ -1272,3 +1278,74 @@ class ItemFormatter:
 
     def time(self, key, item):
         return safe_timestamp(item.get(key) or item[b'mtime'])
+
+
+class ChunkIteratorFileWrapper:
+    """File-like wrapper for chunk iterators"""
+
+    def __init__(self, chunk_iterator):
+        self.chunk_iterator = chunk_iterator
+        self.chunk_offset = 0
+        self.chunk = b''
+        self.exhausted = False
+
+    def _refill(self):
+        remaining = len(self.chunk) - self.chunk_offset
+        if not remaining:
+            try:
+                self.chunk = memoryview(next(self.chunk_iterator))
+            except StopIteration:
+                self.exhausted = True
+                return 0  # EOF
+            self.chunk_offset = 0
+            remaining = len(self.chunk)
+        return remaining
+
+    def _read(self, nbytes):
+        if not nbytes:
+            return b''
+        remaining = self._refill()
+        will_read = min(remaining, nbytes)
+        self.chunk_offset += will_read
+        return self.chunk[self.chunk_offset - will_read:self.chunk_offset]
+
+    def read(self, nbytes):
+        parts = []
+        while nbytes and not self.exhausted:
+            read_data = self._read(nbytes)
+            nbytes -= len(read_data)
+            parts.append(read_data)
+        return b''.join(parts)
+
+
+def open_item(archive, item):
+    """Return file-like object for archived item (with chunks)."""
+    chunk_iterator = archive.pipeline.fetch_many([c[0] for c in item[b'chunks']])
+    return ChunkIteratorFileWrapper(chunk_iterator)
+
+
+def file_status(mode):
+    if stat.S_ISREG(mode):
+        return 'A'
+    elif stat.S_ISDIR(mode):
+        return 'd'
+    elif stat.S_ISBLK(mode):
+        return 'b'
+    elif stat.S_ISCHR(mode):
+        return 'c'
+    elif stat.S_ISLNK(mode):
+        return 's'
+    elif stat.S_ISFIFO(mode):
+        return 'f'
+    return '?'
+
+
+def consume(iterator, n=None):
+    """Advance the iterator n-steps ahead. If n is none, consume entirely."""
+    # Use functions that consume iterators at C speed.
+    if n is None:
+        # feed the entire iterator into a zero-length deque
+        deque(iterator, maxlen=0)
+    else:
+        # advance to the empty slice starting at position n
+        next(islice(iterator, n, n), None)

+ 240 - 14
borg/testsuite/archiver.py

@@ -2,6 +2,7 @@ from binascii import hexlify
 from configparser import ConfigParser
 import errno
 import os
+import inspect
 from io import StringIO
 import random
 import stat
@@ -17,7 +18,7 @@ from hashlib import sha256
 import pytest
 
 from .. import xattr
-from ..archive import Archive, ChunkBuffer, CHUNK_MAX_EXP
+from ..archive import Archive, ChunkBuffer, ArchiveRecreater, CHUNK_MAX_EXP
 from ..archiver import Archiver
 from ..cache import Cache
 from ..crypto import bytes_to_long, num_aes_blocks
@@ -235,9 +236,6 @@ class ArchiverTestCaseBase(BaseTestCase):
     def create_src_archive(self, name):
         self.cmd('create', self.repository_location + '::' + name, src_dir)
 
-
-class ArchiverTestCase(ArchiverTestCaseBase):
-
     def create_regular_file(self, name, size=0, contents=None):
         filename = os.path.join(self.input_path, name)
         if not os.path.exists(os.path.dirname(filename)):
@@ -295,6 +293,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             have_root = False
         return have_root
 
+
+class ArchiverTestCase(ArchiverTestCaseBase):
     def test_basic_functionality(self):
         have_root = self.create_test_files()
         self.cmd('init', self.repository_location)
@@ -637,29 +637,56 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.cmd("extract", self.repository_location + "::test", "fm:input/file1", "fm:*file33*", "input/file2")
         self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file333"])
 
-    def test_exclude_caches(self):
+    def _create_test_caches(self):
         self.cmd('init', self.repository_location)
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('cache1/CACHEDIR.TAG', contents=b'Signature: 8a477f597d28d172789f06886806bc55 extra stuff')
         self.create_regular_file('cache2/CACHEDIR.TAG', contents=b'invalid signature')
-        self.cmd('create', '--exclude-caches', self.repository_location + '::test', 'input')
+        os.mkdir('input/cache3')
+        os.link('input/cache1/CACHEDIR.TAG', 'input/cache3/CACHEDIR.TAG')
+
+    def _assert_test_caches(self):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
         self.assert_equal(sorted(os.listdir('output/input')), ['cache2', 'file1'])
         self.assert_equal(sorted(os.listdir('output/input/cache2')), ['CACHEDIR.TAG'])
 
-    def test_exclude_tagged(self):
+    def test_exclude_caches(self):
+        self._create_test_caches()
+        self.cmd('create', '--exclude-caches', self.repository_location + '::test', 'input')
+        self._assert_test_caches()
+
+    def test_recreate_exclude_caches(self):
+        self._create_test_caches()
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.cmd('recreate', '--exclude-caches', self.repository_location + '::test')
+        self._assert_test_caches()
+
+    def _create_test_tagged(self):
         self.cmd('init', self.repository_location)
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('tagged1/.NOBACKUP')
         self.create_regular_file('tagged2/00-NOBACKUP')
         self.create_regular_file('tagged3/.NOBACKUP/file2')
-        self.cmd('create', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP', self.repository_location + '::test', 'input')
+
+    def _assert_test_tagged(self):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
         self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'tagged3'])
 
-    def test_exclude_keep_tagged(self):
+    def test_exclude_tagged(self):
+        self._create_test_tagged()
+        self.cmd('create', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP', self.repository_location + '::test', 'input')
+        self._assert_test_tagged()
+
+    def test_recreate_exclude_tagged(self):
+        self._create_test_tagged()
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.cmd('recreate', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP',
+                 self.repository_location + '::test')
+        self._assert_test_tagged()
+
+    def _create_test_keep_tagged(self):
         self.cmd('init', self.repository_location)
         self.create_regular_file('file0', size=1024)
         self.create_regular_file('tagged1/.NOBACKUP1')
@@ -672,8 +699,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.create_regular_file('taggedall/.NOBACKUP2')
         self.create_regular_file('taggedall/CACHEDIR.TAG', contents=b'Signature: 8a477f597d28d172789f06886806bc55 extra stuff')
         self.create_regular_file('taggedall/file4', size=1024)
-        self.cmd('create', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
-                 '--exclude-caches', '--keep-tag-files', self.repository_location + '::test', 'input')
+
+    def _assert_test_keep_tagged(self):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
         self.assert_equal(sorted(os.listdir('output/input')), ['file0', 'tagged1', 'tagged2', 'tagged3', 'taggedall'])
@@ -683,6 +710,19 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.assert_equal(sorted(os.listdir('output/input/taggedall')),
                           ['.NOBACKUP1', '.NOBACKUP2', 'CACHEDIR.TAG', ])
 
+    def test_exclude_keep_tagged(self):
+        self._create_test_keep_tagged()
+        self.cmd('create', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
+                 '--exclude-caches', '--keep-tag-files', self.repository_location + '::test', 'input')
+        self._assert_test_keep_tagged()
+
+    def test_recreate_exclude_keep_tagged(self):
+        self._create_test_keep_tagged()
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.cmd('recreate', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
+                 '--exclude-caches', '--keep-tag-files', self.repository_location + '::test')
+        self._assert_test_keep_tagged()
+
     def test_path_normalization(self):
         self.cmd('init', self.repository_location)
         self.create_regular_file('dir1/dir2/file', size=1024 * 80)
@@ -880,6 +920,13 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.assert_in("U input/file1", output)
         self.assert_in("x input/file2", output)
 
+    def test_create_delete_inbetween(self):
+        self.create_test_files()
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test1', 'input')
+        self.cmd('delete', self.repository_location + '::test1')
+        self.cmd('create', self.repository_location + '::test2', 'input')
+
     def test_create_topical(self):
         now = time.time()
         self.create_regular_file('file1', size=1024 * 80)
@@ -1149,6 +1196,176 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.cmd('init', self.repository_location, exit_code=1)
         assert not os.path.exists(self.repository_location)
 
+    def test_recreate_basic(self):
+        self.create_test_files()
+        self.create_regular_file('dir2/file3', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        archive = self.repository_location + '::test0'
+        self.cmd('create', archive, 'input')
+        self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3')
+        listing = self.cmd('list', '--short', archive)
+        assert 'file1' not in listing
+        assert 'dir2/file2' in listing
+        assert 'dir2/file3' not in listing
+
+    def test_recreate_subtree_hardlinks(self):
+        # This is essentially the same problem set as in test_extract_hardlinks
+        self._extract_hardlinks_setup()
+        self.cmd('create', self.repository_location + '::test2', 'input')
+        self.cmd('recreate', self.repository_location + '::test', 'input/dir1')
+        with changedir('output'):
+            self.cmd('extract', self.repository_location + '::test')
+            assert os.stat('input/dir1/hardlink').st_nlink == 2
+            assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
+            assert os.stat('input/dir1/aaaa').st_nlink == 2
+            assert os.stat('input/dir1/source2').st_nlink == 2
+        with changedir('output'):
+            self.cmd('extract', self.repository_location + '::test2')
+            assert os.stat('input/dir1/hardlink').st_nlink == 4
+
+    def test_recreate_rechunkify(self):
+        with open(os.path.join(self.input_path, 'large_file'), 'wb') as fd:
+            fd.write(b'a' * 250)
+            fd.write(b'b' * 250)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', '--chunker-params', '7,9,8,128', self.repository_location + '::test1', 'input')
+        self.cmd('create', self.repository_location + '::test2', 'input', '--no-files-cache')
+        list = self.cmd('list', self.repository_location + '::test1', 'input/large_file',
+                        '--format', '{num_chunks} {unique_chunks}')
+        num_chunks, unique_chunks = map(int, list.split(' '))
+        # test1 and test2 do not deduplicate
+        assert num_chunks == unique_chunks
+        self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
+        # test1 and test2 do deduplicate after recreate
+        assert not int(self.cmd('list', self.repository_location + '::test1', 'input/large_file',
+                                '--format', '{unique_chunks}'))
+
+    def test_recreate_recompress(self):
+        self.create_regular_file('compressible', size=10000)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
+                        '--format', '{size} {csize}')
+        size, csize = map(int, list.split(' '))
+        assert csize >= size
+        self.cmd('recreate', self.repository_location, '-C', 'lz4')
+        list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
+                        '--format', '{size} {csize}')
+        size, csize = map(int, list.split(' '))
+        assert csize < size
+
+    def test_recreate_dry_run(self):
+        self.create_regular_file('compressible', size=10000)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        archives_before = self.cmd('list', self.repository_location + '::test')
+        self.cmd('recreate', self.repository_location, '-n', '-e', 'input/compressible')
+        archives_after = self.cmd('list', self.repository_location + '::test')
+        assert archives_after == archives_before
+
+    def _recreate_interrupt_patch(self, interrupt_after_n_1_files):
+        def interrupt(self, *args):
+            if interrupt_after_n_1_files:
+                self.interrupt = True
+                pi_save(self, *args)
+            else:
+                raise ArchiveRecreater.Interrupted
+
+        def process_item_patch(*args):
+            return pi_call.pop(0)(*args)
+
+        pi_save = ArchiveRecreater.process_item
+        pi_call = [pi_save] * interrupt_after_n_1_files + [interrupt]
+        return process_item_patch
+
+    def _test_recreate_interrupt(self, change_args, interrupt_early):
+        self.create_test_files()
+        self.create_regular_file('dir2/abcdef', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        process_files = 1
+        if interrupt_early:
+            process_files = 0
+        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(process_files)):
+            self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2')
+        assert 'test.recreate' in self.cmd('list', self.repository_location)
+        if change_args:
+            with patch.object(sys, 'argv', sys.argv + ['non-forking tests don\'t use sys.argv']):
+                output = self.cmd('recreate', '-sv', '--list', '-pC', 'lz4', self.repository_location, 'input/dir2')
+        else:
+            output = self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2')
+        assert 'Found test.recreate, will resume' in output
+        assert change_args == ('Command line changed' in output)
+        if not interrupt_early:
+            assert 'Fast-forwarded to input/dir2/abcdef' in output
+            assert 'A input/dir2/abcdef' not in output
+        assert 'A input/dir2/file2' in output
+        archives = self.cmd('list', self.repository_location)
+        assert 'test.recreate' not in archives
+        assert 'test' in archives
+        files = self.cmd('list', self.repository_location + '::test')
+        assert 'dir2/file2' in files
+        assert 'dir2/abcdef' in files
+        assert 'file1' not in files
+
+    def test_recreate_interrupt(self):
+        self._test_recreate_interrupt(False, True)
+
+    def test_recreate_interrupt2(self):
+        self._test_recreate_interrupt(True, False)
+
+    def _test_recreate_chunker_interrupt_patch(self):
+        real_add_chunk = Cache.add_chunk
+
+        def add_chunk(*args, **kwargs):
+            frame = inspect.stack()[2]
+            try:
+                caller_self = frame[0].f_locals['self']
+                caller_self.interrupt = True
+            finally:
+                del frame
+            return real_add_chunk(*args, **kwargs)
+        return add_chunk
+
+    def test_recreate_rechunkify_interrupt(self):
+        self.create_regular_file('file1', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        archive_before = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
+        with patch.object(Cache, 'add_chunk', self._test_recreate_chunker_interrupt_patch()):
+            self.cmd('recreate', '-p', '--chunker-params', '16,18,17,4095', self.repository_location)
+        assert 'test.recreate' in self.cmd('list', self.repository_location)
+        output = self.cmd('recreate', '-svp', '--debug', '--chunker-params', '16,18,17,4095', self.repository_location)
+        assert 'Found test.recreate, will resume' in output
+        assert 'Copied 1 chunks from a partially processed item' in output
+        archive_after = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
+        assert archive_after == archive_before
+
+    def test_recreate_changed_source(self):
+        self.create_test_files()
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(1)):
+            self.cmd('recreate', self.repository_location, 'input/dir2')
+        assert 'test.recreate' in self.cmd('list', self.repository_location)
+        self.cmd('delete', self.repository_location + '::test')
+        self.cmd('create', self.repository_location + '::test', 'input')
+        output = self.cmd('recreate', self.repository_location, 'input/dir2')
+        assert 'Source archive changed, will discard test.recreate and start over' in output
+
+    def test_recreate_refuses_temporary(self):
+        self.cmd('init', self.repository_location)
+        self.cmd('recreate', self.repository_location + '::cba.recreate', exit_code=2)
+
+    def test_recreate_skips_nothing_to_do(self):
+        self.create_regular_file('file1', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        info_before = self.cmd('info', self.repository_location + '::test')
+        self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
+        info_after = self.cmd('info', self.repository_location + '::test')
+        assert info_before == info_after  # includes archive ID
+
 
 @unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
 class ArchiverTestCaseBinary(ArchiverTestCase):
@@ -1159,6 +1376,18 @@ class ArchiverTestCaseBinary(ArchiverTestCase):
     def test_init_interrupt(self):
         pass
 
+    @unittest.skip('patches objects')
+    def test_recreate_rechunkify_interrupt(self):
+        pass
+
+    @unittest.skip('patches objects')
+    def test_recreate_interrupt(self):
+        pass
+
+    @unittest.skip('patches objects')
+    def test_recreate_changed_source(self):
+        pass
+
 
 class ArchiverCheckTestCase(ArchiverTestCaseBase):
 
@@ -1274,9 +1503,6 @@ class RemoteArchiverTestCase(ArchiverTestCase):
 
 
 class DiffArchiverTestCase(ArchiverTestCaseBase):
-    create_test_files = ArchiverTestCase.create_test_files
-    create_regular_file = ArchiverTestCase.create_regular_file
-
     def test_basic_functionality(self):
         # Initialize test folder
         self.create_test_files()

+ 12 - 1
borg/testsuite/helpers.py

@@ -15,7 +15,7 @@ from ..helpers import Location, format_file_size, format_timedelta, make_path_sa
     yes, TRUISH, FALSISH, DEFAULTISH, \
     StableDict, int_to_bigint, bigint_to_int, parse_timestamp, CompressionSpec, ChunkerParams, \
     ProgressIndicatorPercent, ProgressIndicatorEndless, load_excludes, parse_pattern, \
-    PatternMatcher, RegexPattern, PathPrefixPattern, FnmatchPattern, ShellPattern, partial_format
+    PatternMatcher, RegexPattern, PathPrefixPattern, FnmatchPattern, ShellPattern, partial_format, ChunkIteratorFileWrapper
 from . import BaseTestCase, environment_variable, FakeInputs
 
 
@@ -899,3 +899,14 @@ def test_partial_format():
     assert partial_format('{unknown_key}', {}) == '{unknown_key}'
     assert partial_format('{key}{{escaped_key}}', {}) == '{key}{{escaped_key}}'
     assert partial_format('{{escaped_key}}', {'escaped_key': 1234}) == '{{escaped_key}}'
+
+
+def test_chunk_file_wrapper():
+    cfw = ChunkIteratorFileWrapper(iter([b'abc', b'def']))
+    assert cfw.read(2) == b'ab'
+    assert cfw.read(50) == b'cdef'
+    assert cfw.exhausted
+
+    cfw = ChunkIteratorFileWrapper(iter([]))
+    assert cfw.read(2) == b''
+    assert cfw.exhausted

+ 20 - 0
docs/usage.rst

@@ -628,6 +628,26 @@ Examples
     no key file found for repository
 
 
+.. include:: usage/recreate.rst.inc
+
+Examples
+~~~~~~~~
+::
+
+    # Make old (Attic / Borg 0.xx) archives deduplicate with Borg 1.x archives
+    # Archives created with Borg 1.1+ and the default chunker params are skipped (archive ID stays the same)
+    $ borg recreate /mnt/backup --chunker-params default --progress
+
+    # Create a backup with little but fast compression
+    $ borg create /mnt/backup::archive /some/files --compression lz4
+    # Then compress it - this might take longer, but the backup has already completed, so no inconsistencies
+    # from a long-running backup job.
+    $ borg recreate /mnt/backup::archive --compression zlib,9
+
+    # Remove unwanted files from all archives in a repository
+    $ borg recreate /mnt/backup -e /home/icke/Pictures/drunk_photos
+
+
 Miscellaneous Help
 ------------------
 

+ 98 - 0
docs/usage/recreate.rst.inc

@@ -0,0 +1,98 @@
+.. _borg_recreate:
+
+borg recreate
+-------------
+::
+
+    usage: borg recreate [-h] [-v] [--debug] [--lock-wait N] [--show-version]
+                         [--show-rc] [--no-files-cache] [--umask M]
+                         [--remote-path PATH] [--list] [--filter STATUSCHARS] [-p]
+                         [-n] [-s] [-e PATTERN] [--exclude-from EXCLUDEFILE]
+                         [--exclude-caches] [--exclude-if-present FILENAME]
+                         [--keep-tag-files] [-C COMPRESSION]
+                         [--chunker-params CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE]
+                         [REPOSITORY_OR_ARCHIVE] [PATH [PATH ...]]
+    
+    Re-create archives
+    
+    positional arguments:
+      REPOSITORY_OR_ARCHIVE
+                            repository/archive to recreate
+      PATH                  paths to recreate; patterns are supported
+    
+    optional arguments:
+      -h, --help            show this help message and exit
+      -v, --verbose, --info
+                            enable informative (verbose) output, work on log level
+                            INFO
+      --debug               enable debug output, work on log level DEBUG
+      --lock-wait N         wait for the lock, but max. N seconds (default: 1).
+      --show-version        show/log the borg version
+      --show-rc             show/log the return code (rc)
+      --no-files-cache      do not load/update the file metadata cache used to
+                            detect unchanged files
+      --umask M             set umask to M (local and remote, default: 0077)
+      --remote-path PATH    set remote path to executable (default: "borg")
+      --list                output verbose list of items (files, dirs, ...)
+      --filter STATUSCHARS  only display items with the given status characters
+      -p, --progress        show progress display while rewriting archives
+      -n, --dry-run         do not change anything
+      -s, --stats           print statistics at end
+      -e PATTERN, --exclude PATTERN
+                            exclude paths matching PATTERN
+      --exclude-from EXCLUDEFILE
+                            read exclude patterns from EXCLUDEFILE, one per line
+      --exclude-caches      exclude directories that contain a CACHEDIR.TAG file
+                            (http://www.brynosaurus.com/cachedir/spec.html)
+      --exclude-if-present FILENAME
+                            exclude directories that contain the specified file
+      --keep-tag-files      keep tag files of excluded caches/directories
+      -C COMPRESSION, --compression COMPRESSION
+                            select compression algorithm (and level): none == no
+                            compression (default), lz4 == lz4, zlib == zlib
+                            (default level 6), zlib,0 .. zlib,9 == zlib (with
+                            level 0..9), lzma == lzma (default level 6), lzma,0 ..
+                            lzma,9 == lzma (with level 0..9).
+      --chunker-params CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE
+                            specify the chunker parameters (or "default").
+    
+Description
+~~~~~~~~~~~
+
+Recreate the contents of existing archives.
+
+--exclude, --exclude-from and PATH have the exact same semantics
+as in "borg create". If a PATH is specified the resulting archive
+will only contain files under PATH.
+
+--compression: all chunks seen will be stored using the given method.
+Due to how Borg stores compressed size information this might display
+incorrect information for archives that were not rewritten at the same time.
+There is no risk of data loss by this.
+
+--chunker-params will re-chunk all files in the archive, this can be
+used to have upgraded Borg 0.xx or Attic archives deduplicate with
+Borg 1.x archives.
+
+borg recreate is signal safe. Send either SIGINT (Ctrl-C on most terminals) or
+SIGTERM to request termination.
+
+Use the *exact same* command line to resume the operation later - changing excludes
+or paths will lead to inconsistencies (changed excludes will only apply to newly
+processed files/dirs). Changing compression leads to incorrect size information
+(which does not cause any data loss, but can be misleading).
+Changing chunker params between invocations might lead to data loss.
+
+USE WITH CAUTION.
+Permanent data loss by specifying incorrect patterns or PATHS is possible.
+When in doubt, use "--dry-run --verbose --list" to see how patterns/PATHS are
+interpreted.
+
+The archive being recreated is only removed after the operation completes. The
+archive that is built during the operation exists at the same time at
+"<ARCHIVE>.recreate". The new archive will have a different archive ID.
+
+When rechunking space usage can be substantial, expect at least the entire
+deduplicated size of the archives using the older chunker params.
+When recompressing approximately 1 % of the repository size or 512 MB
+(whichever is greater) of additional space is used.