|
@@ -1226,14 +1226,19 @@ def cached_hash(chunk, id_hash):
|
|
|
class ChunksProcessor:
|
|
|
# Processes an iterator of chunks for an Item
|
|
|
|
|
|
- def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, rechunkify):
|
|
|
+ def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, checkpoint_volume, rechunkify):
|
|
|
self.key = key
|
|
|
self.cache = cache
|
|
|
self.add_item = add_item
|
|
|
self.write_checkpoint = write_checkpoint
|
|
|
+ self.rechunkify = rechunkify
|
|
|
+ # time interval based checkpointing
|
|
|
self.checkpoint_interval = checkpoint_interval
|
|
|
self.last_checkpoint = time.monotonic()
|
|
|
- self.rechunkify = rechunkify
|
|
|
+ # file content volume based checkpointing
|
|
|
+ self.checkpoint_volume = checkpoint_volume
|
|
|
+ self.current_volume = 0
|
|
|
+ self.last_volume_checkpoint = 0
|
|
|
|
|
|
def write_part_file(self, item, from_chunk, number):
|
|
|
item = Item(internal_dict=item.as_dict())
|
|
@@ -1255,13 +1260,14 @@ class ChunksProcessor:
|
|
|
if (
|
|
|
forced
|
|
|
or sig_int_triggered
|
|
|
- or self.checkpoint_interval
|
|
|
- and time.monotonic() - self.last_checkpoint > self.checkpoint_interval
|
|
|
+ or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval)
|
|
|
+ or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume)
|
|
|
):
|
|
|
if sig_int_triggered:
|
|
|
logger.info("checkpoint requested: starting checkpoint creation...")
|
|
|
from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
|
|
|
self.last_checkpoint = time.monotonic()
|
|
|
+ self.last_volume_checkpoint = self.current_volume
|
|
|
if sig_int_triggered:
|
|
|
sig_int.action_completed()
|
|
|
logger.info("checkpoint requested: finished checkpoint creation!")
|
|
@@ -1286,7 +1292,9 @@ class ChunksProcessor:
|
|
|
from_chunk = 0
|
|
|
part_number = 1
|
|
|
for chunk in chunk_iter:
|
|
|
- item.chunks.append(chunk_processor(chunk))
|
|
|
+ cle = chunk_processor(chunk)
|
|
|
+ item.chunks.append(cle)
|
|
|
+ self.current_volume += cle[1]
|
|
|
if show_progress:
|
|
|
stats.show_progress(item=item, dt=0.2)
|
|
|
from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
|
|
@@ -2262,6 +2270,7 @@ class ArchiveRecreater:
|
|
|
file_status_printer=None,
|
|
|
timestamp=None,
|
|
|
checkpoint_interval=1800,
|
|
|
+ checkpoint_volume=0,
|
|
|
):
|
|
|
self.manifest = manifest
|
|
|
self.repository = manifest.repository
|
|
@@ -2289,6 +2298,7 @@ class ArchiveRecreater:
|
|
|
self.progress = progress
|
|
|
self.print_file_status = file_status_printer or (lambda *args: None)
|
|
|
self.checkpoint_interval = None if dry_run else checkpoint_interval
|
|
|
+ self.checkpoint_volume = None if dry_run else checkpoint_volume
|
|
|
|
|
|
def recreate(self, archive_name, comment=None, target_name=None):
|
|
|
assert not self.is_temporary_archive(archive_name)
|
|
@@ -2456,6 +2466,7 @@ class ArchiveRecreater:
|
|
|
add_item=target.add_item,
|
|
|
write_checkpoint=target.write_checkpoint,
|
|
|
checkpoint_interval=self.checkpoint_interval,
|
|
|
+ checkpoint_volume=self.checkpoint_volume,
|
|
|
rechunkify=target.recreate_rechunkify,
|
|
|
).process_file_chunks
|
|
|
target.chunker = get_chunker(*target.chunker_params, seed=self.key.chunk_seed, sparse=False)
|