|
@@ -44,7 +44,6 @@ from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
|
|
|
from .helpers import os_open, flags_normal, flags_dir
|
|
|
from .helpers import os_stat
|
|
|
from .helpers import msgpack
|
|
|
-from .helpers import sig_int
|
|
|
from .helpers.lrucache import LRUCache
|
|
|
from .manifest import Manifest
|
|
|
from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
|
|
@@ -357,18 +356,6 @@ class ChunkBuffer:
|
|
|
def is_full(self):
|
|
|
return self.buffer.tell() > self.BUFFER_SIZE
|
|
|
|
|
|
- def save_chunks_state(self):
|
|
|
- # as we only append to self.chunks, remembering the current length is good enough
|
|
|
- self.saved_chunks_len = len(self.chunks)
|
|
|
-
|
|
|
- def restore_chunks_state(self):
|
|
|
- scl = self.saved_chunks_len
|
|
|
- assert scl is not None, "forgot to call save_chunks_state?"
|
|
|
- tail_chunks = self.chunks[scl:]
|
|
|
- del self.chunks[scl:]
|
|
|
- self.saved_chunks_len = None
|
|
|
- return tail_chunks
|
|
|
-
|
|
|
|
|
|
class CacheChunkBuffer(ChunkBuffer):
|
|
|
def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
|
|
@@ -509,12 +496,6 @@ class Archive:
|
|
|
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
|
|
|
if name in manifest.archives:
|
|
|
raise self.AlreadyExists(name)
|
|
|
- i = 0
|
|
|
- while True:
|
|
|
- self.checkpoint_name = "{}.checkpoint{}".format(name, i and (".%d" % i) or "")
|
|
|
- if self.checkpoint_name not in manifest.archives:
|
|
|
- break
|
|
|
- i += 1
|
|
|
else:
|
|
|
info = self.manifest.archives.get(name)
|
|
|
if info is None:
|
|
@@ -629,32 +610,6 @@ Duration: {0.duration}
|
|
|
stats.show_progress(item=item, dt=0.2)
|
|
|
self.items_buffer.add(item)
|
|
|
|
|
|
- def prepare_checkpoint(self):
|
|
|
- # we need to flush the archive metadata stream to repo chunks, so that
|
|
|
- # we have the metadata stream chunks WITHOUT the part file item we add later.
|
|
|
- # The part file item will then get into its own metadata stream chunk, which we
|
|
|
- # can easily NOT include into the next checkpoint or the final archive.
|
|
|
- self.items_buffer.flush(flush=True)
|
|
|
- # remember the current state of self.chunks, which corresponds to the flushed chunks
|
|
|
- self.items_buffer.save_chunks_state()
|
|
|
-
|
|
|
- def write_checkpoint(self):
|
|
|
- metadata = self.save(self.checkpoint_name)
|
|
|
- # that .save() has committed the repo.
|
|
|
- # at next commit, we won't need this checkpoint archive any more because we will then
|
|
|
- # have either a newer checkpoint archive or the final archive.
|
|
|
- # so we can already remove it here, the next .save() will then commit this cleanup.
|
|
|
- # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
|
|
|
- del self.manifest.archives[self.checkpoint_name]
|
|
|
- self.cache.chunk_decref(self.id, 1, self.stats)
|
|
|
- for id in metadata.item_ptrs:
|
|
|
- self.cache.chunk_decref(id, 1, self.stats)
|
|
|
- # also get rid of that part item, we do not want to have it in next checkpoint or final archive
|
|
|
- tail_chunks = self.items_buffer.restore_chunks_state()
|
|
|
- # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
|
|
|
- for id in tail_chunks:
|
|
|
- self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here?
|
|
|
-
|
|
|
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
|
|
|
name = name or self.name
|
|
|
if name in self.manifest.archives:
|
|
@@ -1163,60 +1118,11 @@ def cached_hash(chunk, id_hash):
|
|
|
class ChunksProcessor:
|
|
|
# Processes an iterator of chunks for an Item
|
|
|
|
|
|
- def __init__(
|
|
|
- self,
|
|
|
- *,
|
|
|
- key,
|
|
|
- cache,
|
|
|
- add_item,
|
|
|
- prepare_checkpoint,
|
|
|
- write_checkpoint,
|
|
|
- checkpoint_interval,
|
|
|
- checkpoint_volume,
|
|
|
- rechunkify,
|
|
|
- ):
|
|
|
+ def __init__(self, *, key, cache, add_item, rechunkify):
|
|
|
self.key = key
|
|
|
self.cache = cache
|
|
|
self.add_item = add_item
|
|
|
- self.prepare_checkpoint = prepare_checkpoint
|
|
|
- self.write_checkpoint = write_checkpoint
|
|
|
self.rechunkify = rechunkify
|
|
|
- # time interval based checkpointing
|
|
|
- self.checkpoint_interval = checkpoint_interval
|
|
|
- self.last_checkpoint = time.monotonic()
|
|
|
- # file content volume based checkpointing
|
|
|
- self.checkpoint_volume = checkpoint_volume
|
|
|
- self.current_volume = 0
|
|
|
- self.last_volume_checkpoint = 0
|
|
|
-
|
|
|
- def write_part_file(self, item):
|
|
|
- self.prepare_checkpoint()
|
|
|
- item = Item(internal_dict=item.as_dict())
|
|
|
- # for borg recreate, we already have a size member in the source item (giving the total file size),
|
|
|
- # but we consider only a part of the file here, thus we must recompute the size from the chunks:
|
|
|
- item.get_size(memorize=True, from_chunks=True)
|
|
|
- item.path += ".borg_part"
|
|
|
- self.add_item(item, show_progress=False)
|
|
|
- self.write_checkpoint()
|
|
|
-
|
|
|
- def maybe_checkpoint(self, item):
|
|
|
- checkpoint_done = False
|
|
|
- sig_int_triggered = sig_int and sig_int.action_triggered()
|
|
|
- if (
|
|
|
- sig_int_triggered
|
|
|
- or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval)
|
|
|
- or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume)
|
|
|
- ):
|
|
|
- if sig_int_triggered:
|
|
|
- logger.info("checkpoint requested: starting checkpoint creation...")
|
|
|
- self.write_part_file(item)
|
|
|
- checkpoint_done = True
|
|
|
- self.last_checkpoint = time.monotonic()
|
|
|
- self.last_volume_checkpoint = self.current_volume
|
|
|
- if sig_int_triggered:
|
|
|
- sig_int.action_completed()
|
|
|
- logger.info("checkpoint requested: finished checkpoint creation!")
|
|
|
- return checkpoint_done # whether a checkpoint archive was created
|
|
|
|
|
|
def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
|
|
|
if not chunk_processor:
|
|
@@ -1237,16 +1143,13 @@ class ChunksProcessor:
|
|
|
for chunk in chunk_iter:
|
|
|
chunk_entry = chunk_processor(chunk)
|
|
|
item.chunks.append(chunk_entry)
|
|
|
- self.current_volume += chunk_entry[1]
|
|
|
if show_progress:
|
|
|
stats.show_progress(item=item, dt=0.2)
|
|
|
- self.maybe_checkpoint(item)
|
|
|
|
|
|
|
|
|
class FilesystemObjectProcessors:
|
|
|
# When ported to threading, then this doesn't need chunker, cache, key any more.
|
|
|
- # write_checkpoint should then be in the item buffer,
|
|
|
- # and process_file becomes a callback passed to __init__.
|
|
|
+ # process_file becomes a callback passed to __init__.
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
@@ -2195,7 +2098,7 @@ class ArchiveChecker:
|
|
|
if last and len(archive_infos) < last:
|
|
|
logger.warning("--last %d archives: only found %d archives", last, len(archive_infos))
|
|
|
else:
|
|
|
- archive_infos = self.manifest.archives.list(sort_by=sort_by, consider_checkpoints=True)
|
|
|
+ archive_infos = self.manifest.archives.list(sort_by=sort_by)
|
|
|
num_archives = len(archive_infos)
|
|
|
|
|
|
pi = ProgressIndicatorPercent(
|
|
@@ -2279,8 +2182,6 @@ class ArchiveRecreater:
|
|
|
progress=False,
|
|
|
file_status_printer=None,
|
|
|
timestamp=None,
|
|
|
- checkpoint_interval=1800,
|
|
|
- checkpoint_volume=0,
|
|
|
):
|
|
|
self.manifest = manifest
|
|
|
self.repository = manifest.repository
|
|
@@ -2305,8 +2206,6 @@ class ArchiveRecreater:
|
|
|
self.stats = stats
|
|
|
self.progress = progress
|
|
|
self.print_file_status = file_status_printer or (lambda *args: None)
|
|
|
- self.checkpoint_interval = None if dry_run else checkpoint_interval
|
|
|
- self.checkpoint_volume = None if dry_run else checkpoint_volume
|
|
|
|
|
|
def recreate(self, archive_name, comment=None, target_name=None):
|
|
|
assert not self.is_temporary_archive(archive_name)
|
|
@@ -2452,14 +2351,7 @@ class ArchiveRecreater:
|
|
|
"Rechunking archive from %s to %s", source_chunker_params or "(unknown)", target.chunker_params
|
|
|
)
|
|
|
target.process_file_chunks = ChunksProcessor(
|
|
|
- cache=self.cache,
|
|
|
- key=self.key,
|
|
|
- add_item=target.add_item,
|
|
|
- prepare_checkpoint=target.prepare_checkpoint,
|
|
|
- write_checkpoint=target.write_checkpoint,
|
|
|
- checkpoint_interval=self.checkpoint_interval,
|
|
|
- checkpoint_volume=self.checkpoint_volume,
|
|
|
- rechunkify=target.recreate_rechunkify,
|
|
|
+ cache=self.cache, key=self.key, add_item=target.add_item, rechunkify=target.recreate_rechunkify
|
|
|
).process_file_chunks
|
|
|
target.chunker = get_chunker(*target.chunker_params, seed=self.key.chunk_seed, sparse=False)
|
|
|
return target
|