Przeglądaj źródła

Merge pull request #1217 from ThomasWaldmann/in-file-checkpoints

in-file checkpoints
TW 8 lat temu
rodzic
commit
3bdfe2a564
5 zmienionych plików z 106 dodań i 52 usunięć
  1. 15 20
      docs/faq.rst
  2. 74 26
      src/borg/archive.py
  3. 13 5
      src/borg/archiver.py
  4. 2 1
      src/borg/constants.py
  5. 2 0
      src/borg/item.py

+ 15 - 20
docs/faq.rst

@@ -225,10 +225,7 @@ During a backup a special checkpoint archive named ``<archive-name>.checkpoint``
 is saved every checkpoint interval (the default value for this is 30
 is saved every checkpoint interval (the default value for this is 30
 minutes) containing all the data backed-up until that point.
 minutes) containing all the data backed-up until that point.
 
 
-Checkpoints only happen between files (so they don't help for interruptions
-happening while a very large file is being processed).
-
-This checkpoint archive is a valid archive (all files in it are valid and complete),
+This checkpoint archive is a valid archive,
 but it is only a partial backup (not all files that you wanted to backup are
 but it is only a partial backup (not all files that you wanted to backup are
 contained in it). Having it in the repo until a successful, full backup is
 contained in it). Having it in the repo until a successful, full backup is
 completed is useful because it references all the transmitted chunks up
 completed is useful because it references all the transmitted chunks up
@@ -249,27 +246,25 @@ Once your backup has finished successfully, you can delete all
 ``<archive-name>.checkpoint`` archives. If you run ``borg prune``, it will
 ``<archive-name>.checkpoint`` archives. If you run ``borg prune``, it will
 also care for deleting unneeded checkpoints.
 also care for deleting unneeded checkpoints.
 
 
+Note: the checkpointing mechanism creates hidden, partial files in an archive,
+so that checkpoints even work while a big file is being processed.
+They are named ``<filename>.borg_part_<N>`` and all operations usually ignore
+these files, but you can make them considered by giving the option
+``--consider-part-files``. You usually only need that option if you are
+really desperate (e.g. if you have no completed backup of that file and you'ld
+rather get a partial file extracted than nothing). You do **not** want to give
+that option under any normal circumstances.
+
 How can I backup huge file(s) over a unstable connection?
 How can I backup huge file(s) over a unstable connection?
 ---------------------------------------------------------
 ---------------------------------------------------------
 
 
-You can use this "split trick" as a workaround for the in-between-files-only
-checkpoints (see above), huge files and a instable connection to the repository:
-
-Split the huge file(s) into parts of manageable size (e.g. 100MB) and create
-a temporary archive of them. Borg will create checkpoints now more frequently
-than if you try to backup the files in their original form (e.g. 100GB).
-
-After that, you can remove the parts again and backup the huge file(s) in
-their original form. This will now work a lot faster as a lot of content chunks
-are already in the repository.
-
-After you have successfully backed up the huge original file(s), you can remove
-the temporary archive you made from the parts.
+This is not a problem any more, see previous FAQ item.
 
 
-We realize that this is just a better-than-nothing workaround, see :issue:`1198`
-for a potential solution.
+How can I restore huge file(s) over a unstable connection?
+----------------------------------------------------------
 
 
-Please note that this workaround only helps you for backup, not for restore.
+If you can not manage to extract the whole big file in one go, you can extract
+all the part files (see above) and manually concatenate them together.
 
 
 If it crashes with a UnicodeError, what can I do?
 If it crashes with a UnicodeError, what can I do?
 -------------------------------------------------
 -------------------------------------------------

+ 74 - 26
src/borg/archive.py

@@ -231,7 +231,8 @@ class Archive:
 
 
     def __init__(self, repository, key, manifest, name, cache=None, create=False,
     def __init__(self, repository, key, manifest, name, cache=None, create=False,
                  checkpoint_interval=300, numeric_owner=False, progress=False,
                  checkpoint_interval=300, numeric_owner=False, progress=False,
-                 chunker_params=CHUNKER_PARAMS, start=None, end=None, compression=None, compression_files=None):
+                 chunker_params=CHUNKER_PARAMS, start=None, end=None, compression=None, compression_files=None,
+                 consider_part_files=False):
         self.cwd = os.getcwd()
         self.cwd = os.getcwd()
         self.key = key
         self.key = key
         self.repository = repository
         self.repository = repository
@@ -250,6 +251,7 @@ class Archive:
         if end is None:
         if end is None:
             end = datetime.utcnow()
             end = datetime.utcnow()
         self.end = end
         self.end = end
+        self.consider_part_files = consider_part_files
         self.pipeline = DownloadPipeline(self.repository, self.key)
         self.pipeline = DownloadPipeline(self.repository, self.key)
         if create:
         if create:
             self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
             self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
@@ -327,17 +329,21 @@ Number of files: {0.stats.nfiles}'''.format(
     def __repr__(self):
     def __repr__(self):
         return 'Archive(%r)' % self.name
         return 'Archive(%r)' % self.name
 
 
+    def item_filter(self, item, filter=None):
+        if not self.consider_part_files and 'part' in item:
+            # this is a part(ial) file, we usually don't want to consider it.
+            return False
+        return filter(item) if filter else True
+
     def iter_items(self, filter=None, preload=False):
     def iter_items(self, filter=None, preload=False):
-        for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
+        for item in self.pipeline.unpack_many(self.metadata[b'items'], preload=preload,
+                                              filter=lambda item: self.item_filter(item, filter)):
             yield item
             yield item
 
 
-    def add_item(self, item):
-        if self.show_progress:
+    def add_item(self, item, show_progress=True):
+        if show_progress and self.show_progress:
             self.stats.show_progress(item=item, dt=0.2)
             self.stats.show_progress(item=item, dt=0.2)
         self.items_buffer.add(item)
         self.items_buffer.add(item)
-        if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
-            self.write_checkpoint()
-            self.last_checkpoint = time.time()
 
 
     def write_checkpoint(self):
     def write_checkpoint(self):
         self.save(self.checkpoint_name)
         self.save(self.checkpoint_name)
@@ -651,17 +657,24 @@ Number of files: {0.stats.nfiles}'''.format(
             logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
             logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
             logger.warning('borg check --repair is required to free all space.')
             logger.warning('borg check --repair is required to free all space.')
 
 
-    def stat_attrs(self, st, path):
+    def stat_simple_attrs(self, st):
         attrs = dict(
         attrs = dict(
             mode=st.st_mode,
             mode=st.st_mode,
-            uid=st.st_uid, user=uid2user(st.st_uid),
-            gid=st.st_gid, group=gid2group(st.st_gid),
+            uid=st.st_uid,
+            gid=st.st_gid,
             atime=st.st_atime_ns,
             atime=st.st_atime_ns,
             ctime=st.st_ctime_ns,
             ctime=st.st_ctime_ns,
             mtime=st.st_mtime_ns,
             mtime=st.st_mtime_ns,
         )
         )
         if self.numeric_owner:
         if self.numeric_owner:
             attrs['user'] = attrs['group'] = None
             attrs['user'] = attrs['group'] = None
+        else:
+            attrs['user'] = uid2user(st.st_uid)
+            attrs['group'] = gid2group(st.st_gid)
+        return attrs
+
+    def stat_ext_attrs(self, st, path):
+        attrs = {}
         with backup_io():
         with backup_io():
             xattrs = xattr.get_all(path, follow_symlinks=False)
             xattrs = xattr.get_all(path, follow_symlinks=False)
             bsdflags = get_flags(path, st)
             bsdflags = get_flags(path, st)
@@ -672,6 +685,11 @@ Number of files: {0.stats.nfiles}'''.format(
             attrs['bsdflags'] = bsdflags
             attrs['bsdflags'] = bsdflags
         return attrs
         return attrs
 
 
+    def stat_attrs(self, st, path):
+        attrs = self.stat_simple_attrs(st)
+        attrs.update(self.stat_ext_attrs(st, path))
+        return attrs
+
     def process_dir(self, path, st):
     def process_dir(self, path, st):
         item = Item(path=make_path_safe(path))
         item = Item(path=make_path_safe(path))
         item.update(self.stat_attrs(st, path))
         item.update(self.stat_attrs(st, path))
@@ -700,22 +718,56 @@ Number of files: {0.stats.nfiles}'''.format(
         self.add_item(item)
         self.add_item(item)
         return 's'  # symlink
         return 's'  # symlink
 
 
+    def chunk_file(self, item, cache, stats, fd, fh=-1, **chunk_kw):
+        def write_part(item, from_chunk, number):
+            item = Item(internal_dict=item.as_dict())
+            length = len(item.chunks)
+            # the item should only have the *additional* chunks we processed after the last partial item:
+            item.chunks = item.chunks[from_chunk:]
+            item.path += '.borg_part_%d' % number
+            item.part = number
+            number += 1
+            self.add_item(item, show_progress=False)
+            self.write_checkpoint()
+            return length, number
+
+        item.chunks = []
+        from_chunk = 0
+        part_number = 1
+        for data in backup_io_iter(self.chunker.chunkify(fd, fh)):
+            item.chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats))
+            if self.show_progress:
+                self.stats.show_progress(item=item, dt=0.2)
+            if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
+                from_chunk, part_number = write_part(item, from_chunk, part_number)
+                self.last_checkpoint = time.time()
+        else:
+            if part_number > 1:
+                if item.chunks[from_chunk:]:
+                    # if we already have created a part item inside this file, we want to put the final
+                    # chunks (if any) into a part item also (so all parts can be concatenated to get
+                    # the complete file):
+                    from_chunk, part_number = write_part(item, from_chunk, part_number)
+                    self.last_checkpoint = time.time()
+
+                # if we created part files, we have referenced all chunks from the part files,
+                # but we also will reference the same chunks also from the final, complete file:
+                for chunk in item.chunks:
+                    cache.chunk_incref(chunk.id, stats)
+
     def process_stdin(self, path, cache):
     def process_stdin(self, path, cache):
         uid, gid = 0, 0
         uid, gid = 0, 0
-        fd = sys.stdin.buffer  # binary
-        chunks = []
-        for data in backup_io_iter(self.chunker.chunkify(fd)):
-            chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
-        self.stats.nfiles += 1
         t = int(time.time()) * 1000000000
         t = int(time.time()) * 1000000000
         item = Item(
         item = Item(
             path=path,
             path=path,
-            chunks=chunks,
             mode=0o100660,  # regular file, ug=rw
             mode=0o100660,  # regular file, ug=rw
             uid=uid, user=uid2user(uid),
             uid=uid, user=uid2user(uid),
             gid=gid, group=gid2group(gid),
             gid=gid, group=gid2group(gid),
             mtime=t, atime=t, ctime=t,
             mtime=t, atime=t, ctime=t,
         )
         )
+        fd = sys.stdin.buffer  # binary
+        self.chunk_file(item, cache, self.stats, fd)
+        self.stats.nfiles += 1
         self.add_item(item)
         self.add_item(item)
         return 'i'  # stdin
         return 'i'  # stdin
 
 
@@ -760,26 +812,22 @@ Number of files: {0.stats.nfiles}'''.format(
             path=safe_path,
             path=safe_path,
             hardlink_master=st.st_nlink > 1,  # item is a hard link and has the chunks
             hardlink_master=st.st_nlink > 1,  # item is a hard link and has the chunks
         )
         )
+        item.update(self.stat_simple_attrs(st))
         # Only chunkify the file if needed
         # Only chunkify the file if needed
-        if chunks is None:
+        if chunks is not None:
+            item.chunks = chunks
+        else:
             compress = self.compression_decider1.decide(path)
             compress = self.compression_decider1.decide(path)
             logger.debug('%s -> compression %s', path, compress['name'])
             logger.debug('%s -> compression %s', path, compress['name'])
             with backup_io():
             with backup_io():
                 fh = Archive._open_rb(path)
                 fh = Archive._open_rb(path)
             with os.fdopen(fh, 'rb') as fd:
             with os.fdopen(fh, 'rb') as fd:
-                chunks = []
-                for data in backup_io_iter(self.chunker.chunkify(fd, fh)):
-                    chunks.append(cache.add_chunk(self.key.id_hash(data),
-                                                  Chunk(data, compress=compress),
-                                                  self.stats))
-                    if self.show_progress:
-                        self.stats.show_progress(item=item, dt=0.2)
+                self.chunk_file(item, cache, self.stats, fd, fh, compress=compress)
             if not is_special_file:
             if not is_special_file:
                 # we must not memorize special files, because the contents of e.g. a
                 # we must not memorize special files, because the contents of e.g. a
                 # block or char device will change without its mtime/size/inode changing.
                 # block or char device will change without its mtime/size/inode changing.
-                cache.memorize_file(path_hash, st, [c.id for c in chunks])
+                cache.memorize_file(path_hash, st, [c.id for c in item.chunks])
             status = status or 'M'  # regular file, modified (if not 'A' already)
             status = status or 'M'  # regular file, modified (if not 'A' already)
-        item.chunks = chunks
         item.update(self.stat_attrs(st, path))
         item.update(self.stat_attrs(st, path))
         if is_special_file:
         if is_special_file:
             # we processed a special file like a regular file. reflect that in mode,
             # we processed a special file like a regular file. reflect that in mode,

+ 13 - 5
src/borg/archiver.py

@@ -100,7 +100,8 @@ def with_archive(method):
     @functools.wraps(method)
     @functools.wraps(method)
     def wrapper(self, args, repository, key, manifest, **kwargs):
     def wrapper(self, args, repository, key, manifest, **kwargs):
         archive = Archive(repository, key, manifest, args.location.archive,
         archive = Archive(repository, key, manifest, args.location.archive,
-                          numeric_owner=getattr(args, 'numeric_owner', False), cache=kwargs.get('cache'))
+                          numeric_owner=getattr(args, 'numeric_owner', False), cache=kwargs.get('cache'),
+                          consider_part_files=args.consider_part_files)
         return method(self, args, repository=repository, manifest=manifest, key=key, archive=archive, **kwargs)
         return method(self, args, repository=repository, manifest=manifest, key=key, archive=archive, **kwargs)
     return wrapper
     return wrapper
 
 
@@ -668,7 +669,8 @@ class Archiver:
                 print_output(line)
                 print_output(line)
 
 
         archive1 = archive
         archive1 = archive
-        archive2 = Archive(repository, key, manifest, args.archive2)
+        archive2 = Archive(repository, key, manifest, args.archive2,
+                           consider_part_files=args.consider_part_files)
 
 
         can_compare_chunk_ids = archive1.metadata.get(b'chunker_params', False) == archive2.metadata.get(
         can_compare_chunk_ids = archive1.metadata.get(b'chunker_params', False) == archive2.metadata.get(
             b'chunker_params', True) or args.same_chunker_params
             b'chunker_params', True) or args.same_chunker_params
@@ -753,7 +755,8 @@ class Archiver:
 
 
         with cache_if_remote(repository) as cached_repo:
         with cache_if_remote(repository) as cached_repo:
             if args.location.archive:
             if args.location.archive:
-                archive = Archive(repository, key, manifest, args.location.archive)
+                archive = Archive(repository, key, manifest, args.location.archive,
+                                  consider_part_files=args.consider_part_files)
             else:
             else:
                 archive = None
                 archive = None
             operations = FuseOperations(key, repository, manifest, archive, cached_repo)
             operations = FuseOperations(key, repository, manifest, archive, cached_repo)
@@ -779,7 +782,8 @@ class Archiver:
         if args.location.archive:
         if args.location.archive:
             matcher, _ = self.build_matcher(args.excludes, args.paths)
             matcher, _ = self.build_matcher(args.excludes, args.paths)
             with Cache(repository, key, manifest, lock_wait=self.lock_wait) as cache:
             with Cache(repository, key, manifest, lock_wait=self.lock_wait) as cache:
-                archive = Archive(repository, key, manifest, args.location.archive, cache=cache)
+                archive = Archive(repository, key, manifest, args.location.archive, cache=cache,
+                                  consider_part_files=args.consider_part_files)
 
 
                 if args.format:
                 if args.format:
                     format = args.format
                     format = args.format
@@ -981,7 +985,8 @@ class Archiver:
     @with_repository()
     @with_repository()
     def do_debug_dump_archive_items(self, args, repository, manifest, key):
     def do_debug_dump_archive_items(self, args, repository, manifest, key):
         """dump (decrypted, decompressed) archive items metadata (not: data)"""
         """dump (decrypted, decompressed) archive items metadata (not: data)"""
-        archive = Archive(repository, key, manifest, args.location.archive)
+        archive = Archive(repository, key, manifest, args.location.archive,
+                          consider_part_files=args.consider_part_files)
         for i, item_id in enumerate(archive.metadata[b'items']):
         for i, item_id in enumerate(archive.metadata[b'items']):
             _, data = key.decrypt(item_id, repository.get(item_id))
             _, data = key.decrypt(item_id, repository.get(item_id))
             filename = '%06d_%s.items' % (i, bin_to_hex(item_id))
             filename = '%06d_%s.items' % (i, bin_to_hex(item_id))
@@ -1232,6 +1237,9 @@ class Archiver:
                                   help='set umask to M (local and remote, default: %(default)04o)')
                                   help='set umask to M (local and remote, default: %(default)04o)')
         common_group.add_argument('--remote-path', dest='remote_path', metavar='PATH',
         common_group.add_argument('--remote-path', dest='remote_path', metavar='PATH',
                                   help='set remote path to executable (default: "borg")')
                                   help='set remote path to executable (default: "borg")')
+        common_group.add_argument('--consider-part-files', dest='consider_part_files',
+                                  action='store_true', default=False,
+                                  help='treat part files like normal files (e.g. to list/extract them)')
 
 
         parser = argparse.ArgumentParser(prog=prog, description='Borg - Deduplicated Backups')
         parser = argparse.ArgumentParser(prog=prog, description='Borg - Deduplicated Backups')
         parser.add_argument('-V', '--version', action='version', version='%(prog)s ' + __version__,
         parser.add_argument('-V', '--version', action='version', version='%(prog)s ' + __version__,

+ 2 - 1
src/borg/constants.py

@@ -1,7 +1,8 @@
 # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
 # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
 ITEM_KEYS = frozenset(['path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hardlink_master',
 ITEM_KEYS = frozenset(['path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hardlink_master',
                        'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime',
                        'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime',
-                       'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended', ])
+                       'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended',
+                       'part'])
 
 
 # this is the set of keys that are always present in items:
 # this is the set of keys that are always present in items:
 REQUIRED_ITEM_KEYS = frozenset(['path', 'mtime', ])
 REQUIRED_ITEM_KEYS = frozenset(['path', 'mtime', ])

+ 2 - 0
src/borg/item.py

@@ -155,6 +155,8 @@ class Item(PropDict):
     deleted = PropDict._make_property('deleted', bool)
     deleted = PropDict._make_property('deleted', bool)
     nlink = PropDict._make_property('nlink', int)
     nlink = PropDict._make_property('nlink', int)
 
 
+    part = PropDict._make_property('part', int)
+
 
 
 class EncryptedKey(PropDict):
 class EncryptedKey(PropDict):
     """
     """