Преглед на файлове

Merge pull request #812 from enkore/feature-rewrite

feature recreate
TW преди 9 години
родител
ревизия
c3e6bc2b7e
променени са 9 файла, в които са добавени 973 реда и са изтрити 103 реда
  1. 334 6
      borg/archive.py
  2. 160 32
      borg/archiver.py
  3. 9 3
      borg/cache.py
  4. 79 2
      borg/helpers.py
  5. 244 16
      borg/testsuite/archiver.py
  6. 12 1
      borg/testsuite/helpers.py
  7. 37 20
      docs/usage.rst
  8. 0 23
      docs/usage/comment.rst.inc
  9. 98 0
      docs/usage/recreate.rst.inc

+ 334 - 6
borg/archive.py

@@ -16,10 +16,13 @@ import sys
 import time
 import time
 from io import BytesIO
 from io import BytesIO
 from . import xattr
 from . import xattr
+from .compress import Compressor, COMPR_BUFFER
 from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
 from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
     parse_timestamp, to_localtime, format_time, format_timedelta, \
     parse_timestamp, to_localtime, format_time, format_timedelta, \
     Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
     Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
-    ProgressIndicatorPercent
+    ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, DASHES, \
+    PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume
+from .repository import Repository
 from .platform import acl_get, acl_set
 from .platform import acl_get, acl_set
 from .chunker import Chunker
 from .chunker import Chunker
 from .hashindex import ChunkIndex
 from .hashindex import ChunkIndex
@@ -231,7 +234,7 @@ Number of files: {0.stats.nfiles}'''.format(
         if self.show_progress:
         if self.show_progress:
             self.stats.show_progress(item=item, dt=0.2)
             self.stats.show_progress(item=item, dt=0.2)
         self.items_buffer.add(item)
         self.items_buffer.add(item)
-        if time.time() - self.last_checkpoint > self.checkpoint_interval:
+        if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
             self.write_checkpoint()
             self.write_checkpoint()
             self.last_checkpoint = time.time()
             self.last_checkpoint = time.time()
 
 
@@ -240,7 +243,7 @@ Number of files: {0.stats.nfiles}'''.format(
         del self.manifest.archives[self.checkpoint_name]
         del self.manifest.archives[self.checkpoint_name]
         self.cache.chunk_decref(self.id, self.stats)
         self.cache.chunk_decref(self.id, self.stats)
 
 
-    def save(self, name=None, comment=None, timestamp=None):
+    def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
         name = name or self.name
         name = name or self.name
         if name in self.manifest.archives:
         if name in self.manifest.archives:
             raise self.AlreadyExists(name)
             raise self.AlreadyExists(name)
@@ -253,7 +256,7 @@ Number of files: {0.stats.nfiles}'''.format(
             self.end = timestamp
             self.end = timestamp
             start = timestamp
             start = timestamp
             end = timestamp  # we only have 1 value
             end = timestamp  # we only have 1 value
-        metadata = StableDict({
+        metadata = {
             'version': 1,
             'version': 1,
             'name': name,
             'name': name,
             'comment': comment,
             'comment': comment,
@@ -264,8 +267,9 @@ Number of files: {0.stats.nfiles}'''.format(
             'time': start.isoformat(),
             'time': start.isoformat(),
             'time_end': end.isoformat(),
             'time_end': end.isoformat(),
             'chunker_params': self.chunker_params,
             'chunker_params': self.chunker_params,
-        })
-        data = msgpack.packb(metadata, unicode_errors='surrogateescape')
+        }
+        metadata.update(additional_metadata or {})
+        data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
         self.id = self.key.id_hash(data)
         self.id = self.key.id_hash(data)
         self.cache.add_chunk(self.id, data, self.stats)
         self.cache.add_chunk(self.id, data, self.stats)
         self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
         self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
@@ -456,6 +460,7 @@ Number of files: {0.stats.nfiles}'''.format(
         self.cache.add_chunk(new_id, data, self.stats)
         self.cache.add_chunk(new_id, data, self.stats)
         self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
         self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
         self.cache.chunk_decref(self.id, self.stats)
         self.cache.chunk_decref(self.id, self.stats)
+        self.id = new_id
 
 
     def rename(self, name):
     def rename(self, name):
         if name in self.manifest.archives:
         if name in self.manifest.archives:
@@ -923,3 +928,326 @@ class ArchiveChecker:
         if self.repair:
         if self.repair:
             self.manifest.write()
             self.manifest.write()
             self.repository.commit(save_space=save_space)
             self.repository.commit(save_space=save_space)
+
+
+class ArchiveRecreater:
+    AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
+    """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
+
+    class FakeTargetArchive:
+        def __init__(self):
+            self.stats = Statistics()
+
+    class Interrupted(Exception):
+        def __init__(self, metadata=None):
+            self.metadata = metadata or {}
+
+    @staticmethod
+    def is_temporary_archive(archive_name):
+        return archive_name.endswith('.recreate')
+
+    def __init__(self, repository, manifest, key, cache, matcher,
+                 exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
+                 chunker_params=None, compression=None,
+                 dry_run=False, stats=False, progress=False, file_status_printer=None):
+        self.repository = repository
+        self.key = key
+        self.manifest = manifest
+        self.cache = cache
+
+        self.matcher = matcher
+        self.exclude_caches = exclude_caches
+        self.exclude_if_present = exclude_if_present or []
+        self.keep_tag_files = keep_tag_files
+
+        self.chunker_params = chunker_params or CHUNKER_PARAMS
+        self.compression = compression or dict(name='none')
+        self.seen_chunks = set()
+        self.recompress = bool(compression)
+        compr_args = dict(buffer=COMPR_BUFFER)
+        compr_args.update(self.compression)
+        key.compressor = Compressor(**compr_args)
+
+        self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
+        logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
+
+        self.dry_run = dry_run
+        self.stats = stats
+        self.progress = progress
+        self.print_file_status = file_status_printer or (lambda *args: None)
+
+        self.interrupt = False
+        self.errors = False
+
+    def recreate(self, archive_name, comment=None):
+        assert not self.is_temporary_archive(archive_name)
+        archive = self.open_archive(archive_name)
+        target, resume_from = self.create_target_or_resume(archive)
+        if self.exclude_if_present or self.exclude_caches:
+            self.matcher_add_tagged_dirs(archive)
+        if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
+            logger.info("Skipping archive %s, nothing to do", archive_name)
+            return True
+        try:
+            self.process_items(archive, target, resume_from)
+        except self.Interrupted as e:
+            return self.save(archive, target, completed=False, metadata=e.metadata)
+        return self.save(archive, target, comment)
+
+    def process_items(self, archive, target, resume_from=None):
+        matcher = self.matcher
+        target_is_subset = not matcher.empty()
+        hardlink_masters = {} if target_is_subset else None
+
+        def item_is_hardlink_master(item):
+            return (target_is_subset and
+                    stat.S_ISREG(item[b'mode']) and
+                    item.get(b'hardlink_master', True) and
+                    b'source' not in item and
+                    not matcher.match(item[b'path']))
+
+        for item in archive.iter_items():
+            if item_is_hardlink_master(item):
+                # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
+                hardlink_masters[item[b'path']] = (item.get(b'chunks'), None)
+                continue
+            if resume_from:
+                # Fast forward to after the last processed file
+                if item[b'path'] == resume_from:
+                    logger.info('Fast-forwarded to %s', remove_surrogates(item[b'path']))
+                    resume_from = None
+                continue
+            if not matcher.match(item[b'path']):
+                self.print_file_status('x', item[b'path'])
+                continue
+            if target_is_subset and stat.S_ISREG(item[b'mode']) and item.get(b'source') in hardlink_masters:
+                # master of this hard link is outside the target subset
+                chunks, new_source = hardlink_masters[item[b'source']]
+                if new_source is None:
+                    # First item to use this master, move the chunks
+                    item[b'chunks'] = chunks
+                    hardlink_masters[item[b'source']] = (None, item[b'path'])
+                    del item[b'source']
+                else:
+                    # Master was already moved, only update this item's source
+                    item[b'source'] = new_source
+            if self.dry_run:
+                self.print_file_status('-', item[b'path'])
+            else:
+                try:
+                    self.process_item(archive, target, item)
+                except self.Interrupted:
+                    if self.progress:
+                        target.stats.show_progress(final=True)
+                    raise
+        if self.progress:
+            target.stats.show_progress(final=True)
+
+    def process_item(self, archive, target, item):
+        if b'chunks' in item:
+            item[b'chunks'] = self.process_chunks(archive, target, item)
+            target.stats.nfiles += 1
+        target.add_item(item)
+        self.print_file_status(file_status(item[b'mode']), item[b'path'])
+        if self.interrupt:
+            raise self.Interrupted
+
+    def process_chunks(self, archive, target, item):
+        """Return new chunk ID list for 'item'."""
+        if not self.recompress and not target.recreate_rechunkify:
+            for chunk_id, size, csize in item[b'chunks']:
+                self.cache.chunk_incref(chunk_id, target.stats)
+            return item[b'chunks']
+        new_chunks = self.process_partial_chunks(target)
+        chunk_iterator = self.create_chunk_iterator(archive, target, item)
+        consume(chunk_iterator, len(new_chunks))
+        for chunk in chunk_iterator:
+            chunk_id = self.key.id_hash(chunk)
+            if chunk_id in self.seen_chunks:
+                new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
+            else:
+                # TODO: detect / skip / --always-recompress
+                chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
+                new_chunks.append((chunk_id, size, csize))
+                self.seen_chunks.add(chunk_id)
+                if self.recompress:
+                    # This tracks how many bytes are uncommitted but compactable, since we are recompressing
+                    # existing chunks.
+                    target.recreate_uncomitted_bytes += csize
+                    if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
+                        # Issue commits to limit additional space usage when recompressing chunks
+                        target.recreate_uncomitted_bytes = 0
+                        self.repository.commit()
+            if self.progress:
+                target.stats.show_progress(item=item, dt=0.2)
+            if self.interrupt:
+                raise self.Interrupted({
+                    'recreate_partial_chunks': new_chunks,
+                })
+        return new_chunks
+
+    def create_chunk_iterator(self, archive, target, item):
+        """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
+        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item[b'chunks']])
+        if target.recreate_rechunkify:
+            # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
+            # (does not load the entire file into memory)
+            file = ChunkIteratorFileWrapper(chunk_iterator)
+            chunk_iterator = target.chunker.chunkify(file)
+        return chunk_iterator
+
+    def process_partial_chunks(self, target):
+        """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
+        if not target.recreate_partial_chunks:
+            return []
+        # No incref, create_target_or_resume already did that before to deleting the old target archive
+        # So just copy these over
+        partial_chunks = target.recreate_partial_chunks
+        target.recreate_partial_chunks = None
+        for chunk_id, size, csize in partial_chunks:
+            self.seen_chunks.add(chunk_id)
+        logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
+        return partial_chunks
+
+    def save(self, archive, target, comment=None, completed=True, metadata=None):
+        """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
+        if self.dry_run:
+            return completed
+        if completed:
+            timestamp = archive.ts.replace(tzinfo=None)
+            if comment is None:
+                comment = archive.metadata.get(b'comment', '')
+            target.save(timestamp=timestamp, comment=comment, additional_metadata={
+                'cmdline': archive.metadata[b'cmdline'],
+                'recreate_cmdline': sys.argv,
+            })
+            archive.delete(Statistics(), progress=self.progress)
+            target.rename(archive.name)
+            if self.stats:
+                target.end = datetime.utcnow()
+                log_multi(DASHES,
+                          str(target),
+                          DASHES,
+                          str(target.stats),
+                          str(self.cache),
+                          DASHES)
+        else:
+            additional_metadata = metadata or {}
+            additional_metadata.update({
+                'recreate_source_id': archive.id,
+                'recreate_args': sys.argv[1:],
+            })
+            target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
+            logger.info('Run the same command again to resume.')
+        return completed
+
+    def matcher_add_tagged_dirs(self, archive):
+        """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
+        def exclude(dir, tag_item):
+            if self.keep_tag_files:
+                tag_files.append(PathPrefixPattern(tag_item[b'path']))
+                tagged_dirs.append(FnmatchPattern(dir + '/'))
+            else:
+                tagged_dirs.append(PathPrefixPattern(dir))
+
+        matcher = self.matcher
+        tag_files = []
+        tagged_dirs = []
+        # build hardlink masters, but only for paths ending in CACHEDIR.TAG, so we can read hard-linked CACHEDIR.TAGs
+        cachedir_masters = {}
+
+        for item in archive.iter_items(
+                filter=lambda item: item[b'path'].endswith('CACHEDIR.TAG') or matcher.match(item[b'path'])):
+            if item[b'path'].endswith('CACHEDIR.TAG'):
+                cachedir_masters[item[b'path']] = item
+            if stat.S_ISREG(item[b'mode']):
+                dir, tag_file = os.path.split(item[b'path'])
+                if tag_file in self.exclude_if_present:
+                    exclude(dir, item)
+                if self.exclude_caches and tag_file == 'CACHEDIR.TAG':
+                    tag_contents = b'Signature: 8a477f597d28d172789f06886806bc55'
+                    if b'chunks' in item:
+                        file = open_item(archive, item)
+                    else:
+                        file = open_item(archive, cachedir_masters[item[b'source']])
+                    if file.read(len(tag_contents)).startswith(tag_contents):
+                        exclude(dir, item)
+        matcher.add(tag_files, True)
+        matcher.add(tagged_dirs, False)
+
+    def create_target_or_resume(self, archive):
+        """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
+        if self.dry_run:
+            return self.FakeTargetArchive(), None
+        target_name = archive.name + '.recreate'
+        resume = target_name in self.manifest.archives
+        target, resume_from = None, None
+        if resume:
+            target, resume_from = self.try_resume(archive, target_name)
+        if not target:
+            target = self.create_target_archive(target_name)
+        # If the archives use the same chunker params, then don't rechunkify
+        target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
+        return target, resume_from
+
+    def try_resume(self, archive, target_name):
+        """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
+        logger.info('Found %s, will resume interrupted operation', target_name)
+        old_target = self.open_archive(target_name)
+        resume_id = old_target.metadata[b'recreate_source_id']
+        resume_args = [arg.decode('utf-8', 'surrogateescape') for arg in old_target.metadata[b'recreate_args']]
+        if resume_id != archive.id:
+            logger.warning('Source archive changed, will discard %s and start over', target_name)
+            logger.warning('Saved fingerprint:   %s', hexlify(resume_id).decode('ascii'))
+            logger.warning('Current fingerprint: %s', archive.fpr)
+            old_target.delete(Statistics(), progress=self.progress)
+            return None, None  # can't resume
+        if resume_args != sys.argv[1:]:
+            logger.warning('Command line changed, this might lead to inconsistencies')
+            logger.warning('Saved:   %s', repr(resume_args))
+            logger.warning('Current: %s', repr(sys.argv[1:]))
+        target = self.create_target_archive(target_name + '.temp')
+        logger.info('Replaying items from interrupted operation...')
+        item = None
+        for item in old_target.iter_items():
+            if b'chunks' in item:
+                for chunk in item[b'chunks']:
+                    self.cache.chunk_incref(chunk[0], target.stats)
+                target.stats.nfiles += 1
+            target.add_item(item)
+        if item:
+            resume_from = item[b'path']
+        else:
+            resume_from = None
+        if self.progress:
+            old_target.stats.show_progress(final=True)
+        target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
+        for chunk_id, size, csize in target.recreate_partial_chunks:
+            if not self.cache.seen_chunk(chunk_id):
+                try:
+                    # Repository has __contains__, RemoteRepository doesn't
+                    self.repository.get(chunk_id)
+                except Repository.ObjectNotFound:
+                    # delete/prune/check between invocations: these chunks are gone.
+                    target.recreate_partial_chunks = None
+                    break
+                # fast-lane insert into chunks cache
+                self.cache.chunks[chunk_id] = (1, size, csize)
+                target.stats.update(size, csize, True)
+                continue
+            # incref now, otherwise old_target.delete() might delete these chunks
+            self.cache.chunk_incref(chunk_id, target.stats)
+        old_target.delete(Statistics(), progress=self.progress)
+        logger.info('Done replaying items')
+        return target, resume_from
+
+    def create_target_archive(self, name):
+        target = Archive(self.repository, self.key, self.manifest, name, create=True,
+                          progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
+                          checkpoint_interval=0)
+        target.recreate_partial_chunks = None
+        target.recreate_uncomitted_bytes = 0
+        return target
+
+    def open_archive(self, name, **kwargs):
+        return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)

+ 160 - 32
borg/archiver.py

@@ -22,7 +22,7 @@ from .helpers import Error, location_validator, archivename_validator, format_ti
     get_cache_dir, prune_within, prune_split, \
     get_cache_dir, prune_within, prune_split, \
     Manifest, remove_surrogates, update_excludes, format_archive, check_extension_modules, Statistics, \
     Manifest, remove_surrogates, update_excludes, format_archive, check_extension_modules, Statistics, \
     dir_is_tagged, ChunkerParams, CompressionSpec, is_slow_msgpack, yes, sysinfo, \
     dir_is_tagged, ChunkerParams, CompressionSpec, is_slow_msgpack, yes, sysinfo, \
-    EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ItemFormatter
+    EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ItemFormatter, DASHES
 from .logger import create_logger, setup_logging
 from .logger import create_logger, setup_logging
 logger = create_logger()
 logger = create_logger()
 from .compress import Compressor, COMPR_BUFFER
 from .compress import Compressor, COMPR_BUFFER
@@ -30,7 +30,7 @@ from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader
 from .repository import Repository
 from .repository import Repository
 from .cache import Cache
 from .cache import Cache
 from .key import key_creator, RepoKey, PassphraseKey
 from .key import key_creator, RepoKey, PassphraseKey
-from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS
+from .archive import Archive, ArchiveChecker, ArchiveRecreater, CHUNKER_PARAMS
 from .remote import RepositoryServer, RemoteRepository, cache_if_remote
 from .remote import RepositoryServer, RemoteRepository, cache_if_remote
 
 
 has_lchflags = hasattr(os, 'lchflags')
 has_lchflags = hasattr(os, 'lchflags')
@@ -38,8 +38,6 @@ has_lchflags = hasattr(os, 'lchflags')
 # default umask, overriden by --umask, defaults to read/write only for owner
 # default umask, overriden by --umask, defaults to read/write only for owner
 UMASK_DEFAULT = 0o077
 UMASK_DEFAULT = 0o077
 
 
-DASHES = '-' * 78
-
 
 
 def argument(args, str_or_bool):
 def argument(args, str_or_bool):
     """If bool is passed, return it. If str is passed, retrieve named attribute from args."""
     """If bool is passed, return it. If str is passed, retrieve named attribute from args."""
@@ -402,7 +400,7 @@ class Archiver:
                 filter=lambda item: item_is_hardlink_master(item) or matcher.match(item[b'path'])):
                 filter=lambda item: item_is_hardlink_master(item) or matcher.match(item[b'path'])):
             orig_path = item[b'path']
             orig_path = item[b'path']
             if item_is_hardlink_master(item):
             if item_is_hardlink_master(item):
-                hardlink_masters[orig_path] = (item.get(b'chunks'), item.get(b'source'))
+                hardlink_masters[orig_path] = (item.get(b'chunks'), None)
             if not matcher.match(item[b'path']):
             if not matcher.match(item[b'path']):
                 continue
                 continue
             if strip_components:
             if strip_components:
@@ -628,16 +626,6 @@ class Archiver:
         cache.commit()
         cache.commit()
         return self.exit_code
         return self.exit_code
 
 
-    @with_repository(exclusive=True, cache=True)
-    @with_archive
-    def do_comment(self, args, repository, manifest, key, cache, archive):
-        """Set the archive comment"""
-        archive.set_meta(b'comment', args.comment)
-        manifest.write()
-        repository.commit()
-        cache.commit()
-        return self.exit_code
-
     @with_repository(exclusive=True, cache=True)
     @with_repository(exclusive=True, cache=True)
     def do_delete(self, args, repository, manifest, key, cache):
     def do_delete(self, args, repository, manifest, key, cache):
         """Delete an existing repository or archive"""
         """Delete an existing repository or archive"""
@@ -823,6 +811,56 @@ class Archiver:
             print("warning: %s" % e)
             print("warning: %s" % e)
         return self.exit_code
         return self.exit_code
 
 
+    @with_repository(cache=True, exclusive=True)
+    def do_recreate(self, args, repository, manifest, key, cache):
+        """Re-create archives"""
+        def interrupt(signal_num, stack_frame):
+            if recreater.interrupt:
+                print("\nReceived signal, again. I'm not deaf.", file=sys.stderr)
+            else:
+                print("\nReceived signal, will exit cleanly.", file=sys.stderr)
+            recreater.interrupt = True
+
+        msg = ("recreate is an experimental feature.\n"
+               "Type 'YES' if you understand this and want to continue: ")
+        if not yes(msg, false_msg="Aborting.", truish=('YES',),
+                   env_var_override='BORG_RECREATE_I_KNOW_WHAT_I_AM_DOING'):
+            return EXIT_ERROR
+
+        matcher, include_patterns = self.build_matcher(args.excludes, args.paths)
+        self.output_list = args.output_list
+        self.output_filter = args.output_filter
+
+        recreater = ArchiveRecreater(repository, manifest, key, cache, matcher,
+                                     exclude_caches=args.exclude_caches, exclude_if_present=args.exclude_if_present,
+                                     keep_tag_files=args.keep_tag_files,
+                                     compression=args.compression, chunker_params=args.chunker_params,
+                                     progress=args.progress, stats=args.stats,
+                                     file_status_printer=self.print_file_status,
+                                     dry_run=args.dry_run)
+
+        signal.signal(signal.SIGTERM, interrupt)
+        signal.signal(signal.SIGINT, interrupt)
+
+        if args.location.archive:
+            name = args.location.archive
+            if recreater.is_temporary_archive(name):
+                self.print_error('Refusing to work on temporary archive of prior recreate: %s', name)
+                return self.exit_code
+            recreater.recreate(name, args.comment)
+        else:
+            for archive in manifest.list_archive_infos(sort_by='ts'):
+                name = archive.name
+                if recreater.is_temporary_archive(name):
+                    continue
+                print('Processing', name)
+                if not recreater.recreate(name, args.comment):
+                    break
+        manifest.write()
+        repository.commit()
+        cache.commit()
+        return self.exit_code
+
     @with_repository()
     @with_repository()
     def do_debug_dump_archive_items(self, args, repository, manifest, key):
     def do_debug_dump_archive_items(self, args, repository, manifest, key):
         """dump (decrypted, decompressed) archive items metadata (not: data)"""
         """dump (decrypted, decompressed) archive items metadata (not: data)"""
@@ -1392,23 +1430,6 @@ class Archiver:
                                type=archivename_validator(),
                                type=archivename_validator(),
                                help='the new archive name to use')
                                help='the new archive name to use')
 
 
-        comment_epilog = textwrap.dedent("""
-        This command sets the archive comment.
-
-        This results in a different archive ID.
-        """)
-        subparser = subparsers.add_parser('comment', parents=[common_parser], add_help=False,
-                                          description=self.do_comment.__doc__,
-                                          epilog=comment_epilog,
-                                          formatter_class=argparse.RawDescriptionHelpFormatter,
-                                          help='set the archive comment')
-        subparser.set_defaults(func=self.do_comment)
-        subparser.add_argument('location', metavar='ARCHIVE',
-                               type=location_validator(archive=True),
-                               help='archive to modify')
-        subparser.add_argument('comment', metavar='COMMENT',
-                               help='the new archive comment')
-
         delete_epilog = textwrap.dedent("""
         delete_epilog = textwrap.dedent("""
         This command deletes an archive from the repository or the complete repository.
         This command deletes an archive from the repository or the complete repository.
         Disk space is reclaimed accordingly. If you delete the complete repository, the
         Disk space is reclaimed accordingly. If you delete the complete repository, the
@@ -1645,6 +1666,113 @@ class Archiver:
                                type=location_validator(archive=False),
                                type=location_validator(archive=False),
                                help='path to the repository to be upgraded')
                                help='path to the repository to be upgraded')
 
 
+        recreate_epilog = textwrap.dedent("""
+        Recreate the contents of existing archives.
+
+        --exclude, --exclude-from and PATH have the exact same semantics
+        as in "borg create". If PATHs are specified the resulting archive
+        will only contain files from these PATHs.
+
+        --compression: all chunks seen will be stored using the given method.
+        Due to how Borg stores compressed size information this might display
+        incorrect information for archives that were not recreated at the same time.
+        There is no risk of data loss by this.
+
+        --chunker-params will re-chunk all files in the archive, this can be
+        used to have upgraded Borg 0.xx or Attic archives deduplicate with
+        Borg 1.x archives.
+
+        borg recreate is signal safe. Send either SIGINT (Ctrl-C on most terminals) or
+        SIGTERM to request termination.
+
+        Use the *exact same* command line to resume the operation later - changing excludes
+        or paths will lead to inconsistencies (changed excludes will only apply to newly
+        processed files/dirs). Changing compression leads to incorrect size information
+        (which does not cause any data loss, but can be misleading).
+        Changing chunker params between invocations might lead to data loss.
+
+        USE WITH CAUTION.
+        Depending on the PATHs and patterns given, recreate can be used to permanently
+        delete files from archives.
+        When in doubt, use "--dry-run --verbose --list" to see how patterns/PATHS are
+        interpreted.
+
+        The archive being recreated is only removed after the operation completes. The
+        archive that is built during the operation exists at the same time at
+        "<ARCHIVE>.recreate". The new archive will have a different archive ID.
+
+        When rechunking space usage can be substantial, expect at least the entire
+        deduplicated size of the archives using the previous chunker params.
+        When recompressing approximately 1 % of the repository size or 512 MB
+        (whichever is greater) of additional space is used.
+        """)
+        subparser = subparsers.add_parser('recreate', parents=[common_parser], add_help=False,
+                                          description=self.do_recreate.__doc__,
+                                          epilog=recreate_epilog,
+                                          formatter_class=argparse.RawDescriptionHelpFormatter,
+                                          help=self.do_recreate.__doc__)
+        subparser.set_defaults(func=self.do_recreate)
+        subparser.add_argument('--list', dest='output_list',
+                               action='store_true', default=False,
+                               help='output verbose list of items (files, dirs, ...)')
+        subparser.add_argument('--filter', dest='output_filter', metavar='STATUSCHARS',
+                               help='only display items with the given status characters')
+        subparser.add_argument('-p', '--progress', dest='progress',
+                               action='store_true', default=False,
+                               help='show progress display while recreating archives')
+        subparser.add_argument('-n', '--dry-run', dest='dry_run',
+                               action='store_true', default=False,
+                               help='do not change anything')
+        subparser.add_argument('-s', '--stats', dest='stats',
+                               action='store_true', default=False,
+                               help='print statistics at end')
+
+        exclude_group = subparser.add_argument_group('Exclusion options')
+        exclude_group.add_argument('-e', '--exclude', dest='excludes',
+                                   type=parse_pattern, action='append',
+                                   metavar="PATTERN", help='exclude paths matching PATTERN')
+        exclude_group.add_argument('--exclude-from', dest='exclude_files',
+                                   type=argparse.FileType('r'), action='append',
+                                   metavar='EXCLUDEFILE', help='read exclude patterns from EXCLUDEFILE, one per line')
+        exclude_group.add_argument('--exclude-caches', dest='exclude_caches',
+                                   action='store_true', default=False,
+                                   help='exclude directories that contain a CACHEDIR.TAG file ('
+                                        'http://www.brynosaurus.com/cachedir/spec.html)')
+        exclude_group.add_argument('--exclude-if-present', dest='exclude_if_present',
+                                   metavar='FILENAME', action='append', type=str,
+                                   help='exclude directories that contain the specified file')
+        exclude_group.add_argument('--keep-tag-files', dest='keep_tag_files',
+                                   action='store_true', default=False,
+                                   help='keep tag files of excluded caches/directories')
+
+        archive_group = subparser.add_argument_group('Archive options')
+        archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default=None,
+                                   help='add a comment text to the archive')
+        archive_group.add_argument('--timestamp', dest='timestamp',
+                                   type=timestamp, default=None,
+                                   metavar='yyyy-mm-ddThh:mm:ss',
+                                   help='manually specify the archive creation date/time (UTC). '
+                                        'alternatively, give a reference file/directory.')
+        archive_group.add_argument('-C', '--compression', dest='compression',
+                                   type=CompressionSpec, default=None, metavar='COMPRESSION',
+                                   help='select compression algorithm (and level):\n'
+                                        'none == no compression (default),\n'
+                                        'lz4 == lz4,\n'
+                                        'zlib == zlib (default level 6),\n'
+                                        'zlib,0 .. zlib,9 == zlib (with level 0..9),\n'
+                                        'lzma == lzma (default level 6),\n'
+                                        'lzma,0 .. lzma,9 == lzma (with level 0..9).')
+        archive_group.add_argument('--chunker-params', dest='chunker_params',
+                                   type=ChunkerParams, default=None,
+                                   metavar='CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE',
+                                   help='specify the chunker parameters (or "default").')
+
+        subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='',
+                               type=location_validator(),
+                               help='repository/archive to recreate')
+        subparser.add_argument('paths', metavar='PATH', nargs='*', type=str,
+                               help='paths to recreate; patterns are supported')
+
         subparser = subparsers.add_parser('help', parents=[common_parser], add_help=False,
         subparser = subparsers.add_parser('help', parents=[common_parser], add_help=False,
                                           description='Extra help')
                                           description='Extra help')
         subparser.add_argument('--epilog-only', dest='epilog_only',
         subparser.add_argument('--epilog-only', dest='epilog_only',

+ 9 - 3
borg/cache.py

@@ -108,6 +108,11 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             stats[field] = format_file_size(stats[field])
             stats[field] = format_file_size(stats[field])
         return Summary(**stats)
         return Summary(**stats)
 
 
+    def chunks_stored_size(self):
+        Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 'total_chunks'])
+        stats = Summary(*self.chunks.summarize())
+        return stats.unique_csize
+
     def create(self):
     def create(self):
         """Create a new empty cache at `self.path`
         """Create a new empty cache at `self.path`
         """
         """
@@ -358,16 +363,17 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             self.do_cache = os.path.isdir(archive_path)
             self.do_cache = os.path.isdir(archive_path)
             self.chunks = create_master_idx(self.chunks)
             self.chunks = create_master_idx(self.chunks)
 
 
-    def add_chunk(self, id, data, stats):
+    def add_chunk(self, id, data, stats, overwrite=False):
         if not self.txn_active:
         if not self.txn_active:
             self.begin_txn()
             self.begin_txn()
         size = len(data)
         size = len(data)
-        if self.seen_chunk(id, size):
+        refcount = self.seen_chunk(id, size)
+        if refcount and not overwrite:
             return self.chunk_incref(id, stats)
             return self.chunk_incref(id, stats)
         data = self.key.encrypt(data)
         data = self.key.encrypt(data)
         csize = len(data)
         csize = len(data)
         self.repository.put(id, data, wait=False)
         self.repository.put(id, data, wait=False)
-        self.chunks[id] = (1, size, csize)
+        self.chunks[id] = (refcount + 1, size, csize)
         stats.update(size, csize, True)
         stats.update(size, csize, True)
         return id, size, csize
         return id, size, csize
 
 

+ 79 - 2
borg/helpers.py

@@ -1,9 +1,10 @@
 import argparse
 import argparse
 from binascii import hexlify
 from binascii import hexlify
-from collections import namedtuple
+from collections import namedtuple, deque
 from functools import wraps, partial
 from functools import wraps, partial
 import grp
 import grp
 import hashlib
 import hashlib
+from itertools import islice
 import os
 import os
 import stat
 import stat
 import textwrap
 import textwrap
@@ -15,8 +16,8 @@ from string import Formatter
 import platform
 import platform
 import time
 import time
 import unicodedata
 import unicodedata
-
 import logging
 import logging
+
 from .logger import create_logger
 from .logger import create_logger
 logger = create_logger()
 logger = create_logger()
 
 
@@ -40,6 +41,8 @@ EXIT_SUCCESS = 0  # everything done, no problems
 EXIT_WARNING = 1  # reached normal end of operation, but there were issues
 EXIT_WARNING = 1  # reached normal end of operation, but there were issues
 EXIT_ERROR = 2  # terminated abruptly, did not reach end of operation
 EXIT_ERROR = 2  # terminated abruptly, did not reach end of operation
 
 
+DASHES = '-' * 78
+
 
 
 class Error(Exception):
 class Error(Exception):
     """Error base class"""
     """Error base class"""
@@ -491,6 +494,9 @@ def timestamp(s):
 
 
 
 
 def ChunkerParams(s):
 def ChunkerParams(s):
+    if s.strip().lower() == "default":
+        from .archive import CHUNKER_PARAMS
+        return CHUNKER_PARAMS
     chunk_min, chunk_max, chunk_mask, window_size = s.split(',')
     chunk_min, chunk_max, chunk_mask, window_size = s.split(',')
     if int(chunk_max) > 23:
     if int(chunk_max) > 23:
         # do not go beyond 2**23 (8MB) chunk size now,
         # do not go beyond 2**23 (8MB) chunk size now,
@@ -1272,3 +1278,74 @@ class ItemFormatter:
 
 
     def time(self, key, item):
     def time(self, key, item):
         return safe_timestamp(item.get(key) or item[b'mtime'])
         return safe_timestamp(item.get(key) or item[b'mtime'])
+
+
+class ChunkIteratorFileWrapper:
+    """File-like wrapper for chunk iterators"""
+
+    def __init__(self, chunk_iterator):
+        self.chunk_iterator = chunk_iterator
+        self.chunk_offset = 0
+        self.chunk = b''
+        self.exhausted = False
+
+    def _refill(self):
+        remaining = len(self.chunk) - self.chunk_offset
+        if not remaining:
+            try:
+                self.chunk = memoryview(next(self.chunk_iterator))
+            except StopIteration:
+                self.exhausted = True
+                return 0  # EOF
+            self.chunk_offset = 0
+            remaining = len(self.chunk)
+        return remaining
+
+    def _read(self, nbytes):
+        if not nbytes:
+            return b''
+        remaining = self._refill()
+        will_read = min(remaining, nbytes)
+        self.chunk_offset += will_read
+        return self.chunk[self.chunk_offset - will_read:self.chunk_offset]
+
+    def read(self, nbytes):
+        parts = []
+        while nbytes and not self.exhausted:
+            read_data = self._read(nbytes)
+            nbytes -= len(read_data)
+            parts.append(read_data)
+        return b''.join(parts)
+
+
+def open_item(archive, item):
+    """Return file-like object for archived item (with chunks)."""
+    chunk_iterator = archive.pipeline.fetch_many([c[0] for c in item[b'chunks']])
+    return ChunkIteratorFileWrapper(chunk_iterator)
+
+
+def file_status(mode):
+    if stat.S_ISREG(mode):
+        return 'A'
+    elif stat.S_ISDIR(mode):
+        return 'd'
+    elif stat.S_ISBLK(mode):
+        return 'b'
+    elif stat.S_ISCHR(mode):
+        return 'c'
+    elif stat.S_ISLNK(mode):
+        return 's'
+    elif stat.S_ISFIFO(mode):
+        return 'f'
+    return '?'
+
+
+def consume(iterator, n=None):
+    """Advance the iterator n-steps ahead. If n is none, consume entirely."""
+    # Use functions that consume iterators at C speed.
+    if n is None:
+        # feed the entire iterator into a zero-length deque
+        deque(iterator, maxlen=0)
+    else:
+        # advance to the empty slice starting at position n
+        next(islice(iterator, n, n), None)

+ 244 - 16
borg/testsuite/archiver.py

@@ -2,6 +2,7 @@ from binascii import hexlify
 from configparser import ConfigParser
 from configparser import ConfigParser
 import errno
 import errno
 import os
 import os
+import inspect
 from io import StringIO
 from io import StringIO
 import random
 import random
 import stat
 import stat
@@ -17,7 +18,7 @@ from hashlib import sha256
 import pytest
 import pytest
 
 
 from .. import xattr
 from .. import xattr
-from ..archive import Archive, ChunkBuffer, CHUNK_MAX_EXP
+from ..archive import Archive, ChunkBuffer, ArchiveRecreater, CHUNK_MAX_EXP
 from ..archiver import Archiver
 from ..archiver import Archiver
 from ..cache import Cache
 from ..cache import Cache
 from ..crypto import bytes_to_long, num_aes_blocks
 from ..crypto import bytes_to_long, num_aes_blocks
@@ -196,6 +197,7 @@ class ArchiverTestCaseBase(BaseTestCase):
     def setUp(self):
     def setUp(self):
         os.environ['BORG_CHECK_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
         os.environ['BORG_CHECK_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
         os.environ['BORG_DELETE_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
         os.environ['BORG_DELETE_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
+        os.environ['BORG_RECREATE_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
         os.environ['BORG_PASSPHRASE'] = 'waytooeasyonlyfortests'
         os.environ['BORG_PASSPHRASE'] = 'waytooeasyonlyfortests'
         self.archiver = not self.FORK_DEFAULT and Archiver() or None
         self.archiver = not self.FORK_DEFAULT and Archiver() or None
         self.tmpdir = tempfile.mkdtemp()
         self.tmpdir = tempfile.mkdtemp()
@@ -235,9 +237,6 @@ class ArchiverTestCaseBase(BaseTestCase):
     def create_src_archive(self, name):
     def create_src_archive(self, name):
         self.cmd('create', self.repository_location + '::' + name, src_dir)
         self.cmd('create', self.repository_location + '::' + name, src_dir)
 
 
-
-class ArchiverTestCase(ArchiverTestCaseBase):
-
     def create_regular_file(self, name, size=0, contents=None):
     def create_regular_file(self, name, size=0, contents=None):
         filename = os.path.join(self.input_path, name)
         filename = os.path.join(self.input_path, name)
         if not os.path.exists(os.path.dirname(filename)):
         if not os.path.exists(os.path.dirname(filename)):
@@ -295,6 +294,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             have_root = False
             have_root = False
         return have_root
         return have_root
 
 
+
+class ArchiverTestCase(ArchiverTestCaseBase):
     def test_basic_functionality(self):
     def test_basic_functionality(self):
         have_root = self.create_test_files()
         have_root = self.create_test_files()
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
@@ -637,29 +638,56 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.cmd("extract", self.repository_location + "::test", "fm:input/file1", "fm:*file33*", "input/file2")
             self.cmd("extract", self.repository_location + "::test", "fm:input/file1", "fm:*file33*", "input/file2")
         self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file333"])
         self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file333"])
 
 
-    def test_exclude_caches(self):
+    def _create_test_caches(self):
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('cache1/CACHEDIR.TAG', contents=b'Signature: 8a477f597d28d172789f06886806bc55 extra stuff')
         self.create_regular_file('cache1/CACHEDIR.TAG', contents=b'Signature: 8a477f597d28d172789f06886806bc55 extra stuff')
         self.create_regular_file('cache2/CACHEDIR.TAG', contents=b'invalid signature')
         self.create_regular_file('cache2/CACHEDIR.TAG', contents=b'invalid signature')
-        self.cmd('create', '--exclude-caches', self.repository_location + '::test', 'input')
+        os.mkdir('input/cache3')
+        os.link('input/cache1/CACHEDIR.TAG', 'input/cache3/CACHEDIR.TAG')
+
+    def _assert_test_caches(self):
         with changedir('output'):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
             self.cmd('extract', self.repository_location + '::test')
         self.assert_equal(sorted(os.listdir('output/input')), ['cache2', 'file1'])
         self.assert_equal(sorted(os.listdir('output/input')), ['cache2', 'file1'])
         self.assert_equal(sorted(os.listdir('output/input/cache2')), ['CACHEDIR.TAG'])
         self.assert_equal(sorted(os.listdir('output/input/cache2')), ['CACHEDIR.TAG'])
 
 
-    def test_exclude_tagged(self):
+    def test_exclude_caches(self):
+        self._create_test_caches()
+        self.cmd('create', '--exclude-caches', self.repository_location + '::test', 'input')
+        self._assert_test_caches()
+
+    def test_recreate_exclude_caches(self):
+        self._create_test_caches()
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.cmd('recreate', '--exclude-caches', self.repository_location + '::test')
+        self._assert_test_caches()
+
+    def _create_test_tagged(self):
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('tagged1/.NOBACKUP')
         self.create_regular_file('tagged1/.NOBACKUP')
         self.create_regular_file('tagged2/00-NOBACKUP')
         self.create_regular_file('tagged2/00-NOBACKUP')
         self.create_regular_file('tagged3/.NOBACKUP/file2')
         self.create_regular_file('tagged3/.NOBACKUP/file2')
-        self.cmd('create', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP', self.repository_location + '::test', 'input')
+
+    def _assert_test_tagged(self):
         with changedir('output'):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
             self.cmd('extract', self.repository_location + '::test')
         self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'tagged3'])
         self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'tagged3'])
 
 
-    def test_exclude_keep_tagged(self):
+    def test_exclude_tagged(self):
+        self._create_test_tagged()
+        self.cmd('create', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP', self.repository_location + '::test', 'input')
+        self._assert_test_tagged()
+
+    def test_recreate_exclude_tagged(self):
+        self._create_test_tagged()
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.cmd('recreate', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP',
+                 self.repository_location + '::test')
+        self._assert_test_tagged()
+
+    def _create_test_keep_tagged(self):
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         self.create_regular_file('file0', size=1024)
         self.create_regular_file('file0', size=1024)
         self.create_regular_file('tagged1/.NOBACKUP1')
         self.create_regular_file('tagged1/.NOBACKUP1')
@@ -672,8 +700,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.create_regular_file('taggedall/.NOBACKUP2')
         self.create_regular_file('taggedall/.NOBACKUP2')
         self.create_regular_file('taggedall/CACHEDIR.TAG', contents=b'Signature: 8a477f597d28d172789f06886806bc55 extra stuff')
         self.create_regular_file('taggedall/CACHEDIR.TAG', contents=b'Signature: 8a477f597d28d172789f06886806bc55 extra stuff')
         self.create_regular_file('taggedall/file4', size=1024)
         self.create_regular_file('taggedall/file4', size=1024)
-        self.cmd('create', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
-                 '--exclude-caches', '--keep-tag-files', self.repository_location + '::test', 'input')
+
+    def _assert_test_keep_tagged(self):
         with changedir('output'):
         with changedir('output'):
             self.cmd('extract', self.repository_location + '::test')
             self.cmd('extract', self.repository_location + '::test')
         self.assert_equal(sorted(os.listdir('output/input')), ['file0', 'tagged1', 'tagged2', 'tagged3', 'taggedall'])
         self.assert_equal(sorted(os.listdir('output/input')), ['file0', 'tagged1', 'tagged2', 'tagged3', 'taggedall'])
@@ -683,6 +711,19 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.assert_equal(sorted(os.listdir('output/input/taggedall')),
         self.assert_equal(sorted(os.listdir('output/input/taggedall')),
                           ['.NOBACKUP1', '.NOBACKUP2', 'CACHEDIR.TAG', ])
                           ['.NOBACKUP1', '.NOBACKUP2', 'CACHEDIR.TAG', ])
 
 
+    def test_exclude_keep_tagged(self):
+        self._create_test_keep_tagged()
+        self.cmd('create', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
+                 '--exclude-caches', '--keep-tag-files', self.repository_location + '::test', 'input')
+        self._assert_test_keep_tagged()
+
+    def test_recreate_exclude_keep_tagged(self):
+        self._create_test_keep_tagged()
+        self.cmd('create', self.repository_location + '::test', 'input')
+        self.cmd('recreate', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
+                 '--exclude-caches', '--keep-tag-files', self.repository_location + '::test')
+        self._assert_test_keep_tagged()
+
     def test_path_normalization(self):
     def test_path_normalization(self):
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         self.create_regular_file('dir1/dir2/file', size=1024 * 80)
         self.create_regular_file('dir1/dir2/file', size=1024 * 80)
@@ -760,13 +801,19 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         self.cmd('init', self.repository_location)
         self.cmd('init', self.repository_location)
         self.cmd('create', self.repository_location + '::test1', 'input')
         self.cmd('create', self.repository_location + '::test1', 'input')
         self.cmd('create', '--comment', 'this is the comment', self.repository_location + '::test2', 'input')
         self.cmd('create', '--comment', 'this is the comment', self.repository_location + '::test2', 'input')
+        self.cmd('create', '--comment', '"deleted" comment', self.repository_location + '::test3', 'input')
+        self.cmd('create', '--comment', 'preserved comment', self.repository_location + '::test4', 'input')
         assert 'Comment: \n' in self.cmd('info', self.repository_location + '::test1')
         assert 'Comment: \n' in self.cmd('info', self.repository_location + '::test1')
         assert 'Comment: this is the comment' in self.cmd('info', self.repository_location + '::test2')
         assert 'Comment: this is the comment' in self.cmd('info', self.repository_location + '::test2')
 
 
-        self.cmd('comment', self.repository_location + '::test1', 'added comment')
-        self.cmd('comment', self.repository_location + '::test2', 'modified comment')
+        self.cmd('recreate', self.repository_location + '::test1', '--comment', 'added comment')
+        self.cmd('recreate', self.repository_location + '::test2', '--comment', 'modified comment')
+        self.cmd('recreate', self.repository_location + '::test3', '--comment', '')
+        self.cmd('recreate', self.repository_location + '::test4', '12345')
         assert 'Comment: added comment' in self.cmd('info', self.repository_location + '::test1')
         assert 'Comment: added comment' in self.cmd('info', self.repository_location + '::test1')
         assert 'Comment: modified comment' in self.cmd('info', self.repository_location + '::test2')
         assert 'Comment: modified comment' in self.cmd('info', self.repository_location + '::test2')
+        assert 'Comment: \n' in self.cmd('info', self.repository_location + '::test3')
+        assert 'Comment: preserved comment' in self.cmd('info', self.repository_location + '::test4')
 
 
     def test_delete(self):
     def test_delete(self):
         self.create_regular_file('file1', size=1024 * 80)
         self.create_regular_file('file1', size=1024 * 80)
@@ -1149,6 +1196,178 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.cmd('init', self.repository_location, exit_code=1)
             self.cmd('init', self.repository_location, exit_code=1)
         assert not os.path.exists(self.repository_location)
         assert not os.path.exists(self.repository_location)
 
 
+    def test_recreate_basic(self):
+        self.create_test_files()
+        self.create_regular_file('dir2/file3', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        archive = self.repository_location + '::test0'
+        self.cmd('create', archive, 'input')
+        self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3')
+        listing = self.cmd('list', '--short', archive)
+        assert 'file1' not in listing
+        assert 'dir2/file2' in listing
+        assert 'dir2/file3' not in listing
+
+    def test_recreate_subtree_hardlinks(self):
+        # This is essentially the same problem set as in test_extract_hardlinks
+        self._extract_hardlinks_setup()
+        self.cmd('create', self.repository_location + '::test2', 'input')
+        self.cmd('recreate', self.repository_location + '::test', 'input/dir1')
+        with changedir('output'):
+            self.cmd('extract', self.repository_location + '::test')
+            assert os.stat('input/dir1/hardlink').st_nlink == 2
+            assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
+            assert os.stat('input/dir1/aaaa').st_nlink == 2
+            assert os.stat('input/dir1/source2').st_nlink == 2
+        with changedir('output'):
+            self.cmd('extract', self.repository_location + '::test2')
+            assert os.stat('input/dir1/hardlink').st_nlink == 4
+
+    def test_recreate_rechunkify(self):
+        with open(os.path.join(self.input_path, 'large_file'), 'wb') as fd:
+            fd.write(b'a' * 280)
+            fd.write(b'b' * 280)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', '--chunker-params', '7,9,8,128', self.repository_location + '::test1', 'input')
+        self.cmd('create', self.repository_location + '::test2', 'input', '--no-files-cache')
+        list = self.cmd('list', self.repository_location + '::test1', 'input/large_file',
+                        '--format', '{num_chunks} {unique_chunks}')
+        num_chunks, unique_chunks = map(int, list.split(' '))
+        # test1 and test2 do not deduplicate
+        assert num_chunks == unique_chunks
+        self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
+        # test1 and test2 do deduplicate after recreate
+        assert not int(self.cmd('list', self.repository_location + '::test1', 'input/large_file',
+                                '--format', '{unique_chunks}'))
+
+    def test_recreate_recompress(self):
+        self.create_regular_file('compressible', size=10000)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input', '-C', 'none')
+        file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
+                             '--format', '{size} {csize} {sha256}')
+        size, csize, sha256_before = file_list.split(' ')
+        assert int(csize) >= int(size)  # >= due to metadata overhead
+        self.cmd('recreate', self.repository_location, '-C', 'lz4')
+        file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
+                             '--format', '{size} {csize} {sha256}')
+        size, csize, sha256_after = file_list.split(' ')
+        assert int(csize) < int(size)
+        assert sha256_before == sha256_after
+
+    def test_recreate_dry_run(self):
+        self.create_regular_file('compressible', size=10000)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        archives_before = self.cmd('list', self.repository_location + '::test')
+        self.cmd('recreate', self.repository_location, '-n', '-e', 'input/compressible')
+        archives_after = self.cmd('list', self.repository_location + '::test')
+        assert archives_after == archives_before
+
+    def _recreate_interrupt_patch(self, interrupt_after_n_1_files):
+        def interrupt(self, *args):
+            if interrupt_after_n_1_files:
+                self.interrupt = True
+                pi_save(self, *args)
+            else:
+                raise ArchiveRecreater.Interrupted
+
+        def process_item_patch(*args):
+            return pi_call.pop(0)(*args)
+
+        pi_save = ArchiveRecreater.process_item
+        pi_call = [pi_save] * interrupt_after_n_1_files + [interrupt]
+        return process_item_patch
+
+    def _test_recreate_interrupt(self, change_args, interrupt_early):
+        self.create_test_files()
+        self.create_regular_file('dir2/abcdef', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        process_files = 1
+        if interrupt_early:
+            process_files = 0
+        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(process_files)):
+            self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2')
+        assert 'test.recreate' in self.cmd('list', self.repository_location)
+        if change_args:
+            with patch.object(sys, 'argv', sys.argv + ['non-forking tests don\'t use sys.argv']):
+                output = self.cmd('recreate', '-sv', '--list', '-pC', 'lz4', self.repository_location, 'input/dir2')
+        else:
+            output = self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2')
+        assert 'Found test.recreate, will resume' in output
+        assert change_args == ('Command line changed' in output)
+        if not interrupt_early:
+            assert 'Fast-forwarded to input/dir2/abcdef' in output
+            assert 'A input/dir2/abcdef' not in output
+        assert 'A input/dir2/file2' in output
+        archives = self.cmd('list', self.repository_location)
+        assert 'test.recreate' not in archives
+        assert 'test' in archives
+        files = self.cmd('list', self.repository_location + '::test')
+        assert 'dir2/file2' in files
+        assert 'dir2/abcdef' in files
+        assert 'file1' not in files
+
+    def test_recreate_interrupt(self):
+        self._test_recreate_interrupt(False, True)
+
+    def test_recreate_interrupt2(self):
+        self._test_recreate_interrupt(True, False)
+
+    def _test_recreate_chunker_interrupt_patch(self):
+        real_add_chunk = Cache.add_chunk
+
+        def add_chunk(*args, **kwargs):
+            frame = inspect.stack()[2]
+            try:
+                caller_self = frame[0].f_locals['self']
+                if isinstance(caller_self, ArchiveRecreater):
+                    caller_self.interrupt = True
+            finally:
+                del frame
+            return real_add_chunk(*args, **kwargs)
+        return add_chunk
+
+    def test_recreate_rechunkify_interrupt(self):
+        self.create_regular_file('file1', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        archive_before = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
+        with patch.object(Cache, 'add_chunk', self._test_recreate_chunker_interrupt_patch()):
+            self.cmd('recreate', '-pv', '--chunker-params', '10,12,11,4095', self.repository_location)
+        assert 'test.recreate' in self.cmd('list', self.repository_location)
+        output = self.cmd('recreate', '-svp', '--debug', '--chunker-params', '10,12,11,4095', self.repository_location)
+        assert 'Found test.recreate, will resume' in output
+        assert 'Copied 1 chunks from a partially processed item' in output
+        archive_after = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}')
+        assert archive_after == archive_before
+
+    def test_recreate_changed_source(self):
+        self.create_test_files()
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(1)):
+            self.cmd('recreate', self.repository_location, 'input/dir2')
+        assert 'test.recreate' in self.cmd('list', self.repository_location)
+        self.cmd('delete', self.repository_location + '::test')
+        self.cmd('create', self.repository_location + '::test', 'input')
+        output = self.cmd('recreate', self.repository_location, 'input/dir2')
+        assert 'Source archive changed, will discard test.recreate and start over' in output
+
+    def test_recreate_refuses_temporary(self):
+        self.cmd('init', self.repository_location)
+        self.cmd('recreate', self.repository_location + '::cba.recreate', exit_code=2)
+
+    def test_recreate_skips_nothing_to_do(self):
+        self.create_regular_file('file1', size=1024 * 80)
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        info_before = self.cmd('info', self.repository_location + '::test')
+        self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
+        info_after = self.cmd('info', self.repository_location + '::test')
+        assert info_before == info_after  # includes archive ID
+
 
 
 @unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
 @unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
 class ArchiverTestCaseBinary(ArchiverTestCase):
 class ArchiverTestCaseBinary(ArchiverTestCase):
@@ -1159,6 +1378,18 @@ class ArchiverTestCaseBinary(ArchiverTestCase):
     def test_init_interrupt(self):
     def test_init_interrupt(self):
         pass
         pass
 
 
+    @unittest.skip('patches objects')
+    def test_recreate_rechunkify_interrupt(self):
+        pass
+
+    @unittest.skip('patches objects')
+    def test_recreate_interrupt(self):
+        pass
+
+    @unittest.skip('patches objects')
+    def test_recreate_changed_source(self):
+        pass
+
 
 
 class ArchiverCheckTestCase(ArchiverTestCaseBase):
 class ArchiverCheckTestCase(ArchiverTestCaseBase):
 
 
@@ -1274,9 +1505,6 @@ class RemoteArchiverTestCase(ArchiverTestCase):
 
 
 
 
 class DiffArchiverTestCase(ArchiverTestCaseBase):
 class DiffArchiverTestCase(ArchiverTestCaseBase):
-    create_test_files = ArchiverTestCase.create_test_files
-    create_regular_file = ArchiverTestCase.create_regular_file
-
     def test_basic_functionality(self):
     def test_basic_functionality(self):
         # Initialize test folder
         # Initialize test folder
         self.create_test_files()
         self.create_test_files()

+ 12 - 1
borg/testsuite/helpers.py

@@ -15,7 +15,7 @@ from ..helpers import Location, format_file_size, format_timedelta, make_path_sa
     yes, TRUISH, FALSISH, DEFAULTISH, \
     yes, TRUISH, FALSISH, DEFAULTISH, \
     StableDict, int_to_bigint, bigint_to_int, parse_timestamp, CompressionSpec, ChunkerParams, \
     StableDict, int_to_bigint, bigint_to_int, parse_timestamp, CompressionSpec, ChunkerParams, \
     ProgressIndicatorPercent, ProgressIndicatorEndless, load_excludes, parse_pattern, \
     ProgressIndicatorPercent, ProgressIndicatorEndless, load_excludes, parse_pattern, \
-    PatternMatcher, RegexPattern, PathPrefixPattern, FnmatchPattern, ShellPattern, partial_format
+    PatternMatcher, RegexPattern, PathPrefixPattern, FnmatchPattern, ShellPattern, partial_format, ChunkIteratorFileWrapper
 from . import BaseTestCase, environment_variable, FakeInputs
 from . import BaseTestCase, environment_variable, FakeInputs
 
 
 
 
@@ -899,3 +899,14 @@ def test_partial_format():
     assert partial_format('{unknown_key}', {}) == '{unknown_key}'
     assert partial_format('{unknown_key}', {}) == '{unknown_key}'
     assert partial_format('{key}{{escaped_key}}', {}) == '{key}{{escaped_key}}'
     assert partial_format('{key}{{escaped_key}}', {}) == '{key}{{escaped_key}}'
     assert partial_format('{{escaped_key}}', {'escaped_key': 1234}) == '{{escaped_key}}'
     assert partial_format('{{escaped_key}}', {'escaped_key': 1234}) == '{{escaped_key}}'
+
+
+def test_chunk_file_wrapper():
+    cfw = ChunkIteratorFileWrapper(iter([b'abc', b'def']))
+    assert cfw.read(2) == b'ab'
+    assert cfw.read(50) == b'cdef'
+    assert cfw.exhausted
+
+    cfw = ChunkIteratorFileWrapper(iter([]))
+    assert cfw.read(2) == b''
+    assert cfw.exhausted

+ 37 - 20
docs/usage.rst

@@ -94,6 +94,8 @@ Some automatic "answerers" (if set, they automatically answer confirmation quest
         For "Warning: 'check --repair' is an experimental feature that might result in data loss."
         For "Warning: 'check --repair' is an experimental feature that might result in data loss."
     BORG_DELETE_I_KNOW_WHAT_I_AM_DOING=NO (or =YES)
     BORG_DELETE_I_KNOW_WHAT_I_AM_DOING=NO (or =YES)
         For "You requested to completely DELETE the repository *including* all archives it contains:"
         For "You requested to completely DELETE the repository *including* all archives it contains:"
+    BORG_RECREATE_I_KNOW_WHAT_I_AM_DOING=NO (or =YES)
+        For "recreate is an experimental feature."
 
 
     Note: answers are case sensitive. setting an invalid answer value might either give the default
     Note: answers are case sensitive. setting an invalid answer value might either give the default
     answer or ask you interactively, depending on whether retries are allowed (they by default are
     answer or ask you interactively, depending on whether retries are allowed (they by default are
@@ -356,26 +358,6 @@ Examples
     newname                              Mon, 2016-02-15 19:50:19
     newname                              Mon, 2016-02-15 19:50:19
 
 
 
 
-.. include:: usage/comment.rst.inc
-
-Examples
-~~~~~~~~
-::
-
-    $ borg create --comment "This is a comment" /mnt/backup::archivename ~
-    $ borg info /mnt/backup::archivename
-    Name: archivename
-    Fingerprint: ...
-    Comment: This is a comment
-    ...
-    $ borg comment /mnt/backup::archivename "This is a better comment"
-    $ borg info /mnt/backup::archivename
-    Name: archivename
-    Fingerprint: ...
-    Comment: This is a better comment
-    ...
-
-
 .. include:: usage/list.rst.inc
 .. include:: usage/list.rst.inc
 
 
 Examples
 Examples
@@ -628,6 +610,41 @@ Examples
     no key file found for repository
     no key file found for repository
 
 
 
 
+.. include:: usage/recreate.rst.inc
+
+Examples
+~~~~~~~~
+::
+
+    # Make old (Attic / Borg 0.xx) archives deduplicate with Borg 1.x archives
+    # Archives created with Borg 1.1+ and the default chunker params are skipped (archive ID stays the same)
+    $ borg recreate /mnt/backup --chunker-params default --progress
+
+    # Create a backup with little but fast compression
+    $ borg create /mnt/backup::archive /some/files --compression lz4
+    # Then compress it - this might take longer, but the backup has already completed, so no inconsistencies
+    # from a long-running backup job.
+    $ borg recreate /mnt/backup::archive --compression zlib,9
+
+    # Remove unwanted files from all archives in a repository
+    $ borg recreate /mnt/backup -e /home/icke/Pictures/drunk_photos
+
+
+    # Change archive comment
+    $ borg create --comment "This is a comment" /mnt/backup::archivename ~
+    $ borg info /mnt/backup::archivename
+    Name: archivename
+    Fingerprint: ...
+    Comment: This is a comment
+    ...
+    $ borg recreate --comment "This is a better comment" /mnt/backup::archivename
+    $ borg info /mnt/backup::archivename
+    Name: archivename
+    Fingerprint: ...
+    Comment: This is a better comment
+    ...
+
+
 Miscellaneous Help
 Miscellaneous Help
 ------------------
 ------------------
 
 

+ 0 - 23
docs/usage/comment.rst.inc

@@ -1,23 +0,0 @@
-.. _borg_comment:
-
-borg comment
-------------
-::
-
-    borg comment <options> ARCHIVE COMMENT
-
-positional arguments
-    ARCHIVE
-        archive to modify
-    COMMENT
-        the new archive comment
-
-`Common options`_
-    |
-
-Description
-~~~~~~~~~~~
-
-This command sets the archive comment.
-
-This results in a different archive ID.

+ 98 - 0
docs/usage/recreate.rst.inc

@@ -0,0 +1,98 @@
+.. _borg_recreate:
+
+borg recreate
+-------------
+::
+
+    borg recreate <options> REPOSITORY_OR_ARCHIVE PATH
+
+positional arguments
+    REPOSITORY_OR_ARCHIVE
+        repository/archive to recreate
+    PATH
+        paths to recreate; patterns are supported
+
+optional arguments
+    ``--list``
+        | output verbose list of items (files, dirs, ...)
+    ``--filter STATUSCHARS``
+        | only display items with the given status characters
+    ``-p``, ``--progress``
+        | show progress display while recreating archives
+    ``-n``, ``--dry-run``
+        | do not change anything
+    ``-s``, ``--stats``
+        | print statistics at end
+
+`Common options`_
+    |
+
+Exclusion options
+    ``-e PATTERN``, ``--exclude PATTERN``
+        | exclude paths matching PATTERN
+    ``--exclude-from EXCLUDEFILE``
+        | read exclude patterns from EXCLUDEFILE, one per line
+    ``--exclude-caches``
+        | exclude directories that contain a CACHEDIR.TAG file (http://www.brynosaurus.com/cachedir/spec.html)
+    ``--exclude-if-present FILENAME``
+        | exclude directories that contain the specified file
+    ``--keep-tag-files``
+        | keep tag files of excluded caches/directories
+
+Archive options
+    ``--comment COMMENT``
+        | add a comment text to the archive
+    ``--timestamp yyyy-mm-ddThh:mm:ss``
+        | manually specify the archive creation date/time (UTC). alternatively, give a reference file/directory.
+    ``-C COMPRESSION``, ``--compression COMPRESSION``
+        | select compression algorithm (and level):
+        | none == no compression (default),
+        | lz4 == lz4,
+        | zlib == zlib (default level 6),
+        | zlib,0 .. zlib,9 == zlib (with level 0..9),
+        | lzma == lzma (default level 6),
+        | lzma,0 .. lzma,9 == lzma (with level 0..9).
+    ``--chunker-params CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE``
+        | specify the chunker parameters (or "default").
+
+Description
+~~~~~~~~~~~
+
+Recreate the contents of existing archives.
+
+--exclude, --exclude-from and PATH have the exact same semantics
+as in "borg create". If PATHs are specified the resulting archive
+will only contain files from these PATHs.
+
+--compression: all chunks seen will be stored using the given method.
+Due to how Borg stores compressed size information this might display
+incorrect information for archives that were not recreated at the same time.
+There is no risk of data loss by this.
+
+--chunker-params will re-chunk all files in the archive, this can be
+used to have upgraded Borg 0.xx or Attic archives deduplicate with
+Borg 1.x archives.
+
+borg recreate is signal safe. Send either SIGINT (Ctrl-C on most terminals) or
+SIGTERM to request termination.
+
+Use the *exact same* command line to resume the operation later - changing excludes
+or paths will lead to inconsistencies (changed excludes will only apply to newly
+processed files/dirs). Changing compression leads to incorrect size information
+(which does not cause any data loss, but can be misleading).
+Changing chunker params between invocations might lead to data loss.
+
+USE WITH CAUTION.
+Depending on the PATHs and patterns given, recreate can be used to permanently
+delete files from archives.
+When in doubt, use "--dry-run --verbose --list" to see how patterns/PATHS are
+interpreted.
+
+The archive being recreated is only removed after the operation completes. The
+archive that is built during the operation exists at the same time at
+"<ARCHIVE>.recreate". The new archive will have a different archive ID.
+
+When rechunking space usage can be substantial, expect at least the entire
+deduplicated size of the archives using the previous chunker params.
+When recompressing approximately 1 % of the repository size or 512 MB
+(whichever is greater) of additional space is used.