|
@@ -40,6 +40,7 @@ from .helpers import safe_ns
|
|
from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
|
|
from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
|
|
from .helpers import os_open, flags_normal
|
|
from .helpers import os_open, flags_normal
|
|
from .helpers import msgpack
|
|
from .helpers import msgpack
|
|
|
|
+from .helpers import sig_int
|
|
from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
|
|
from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
|
|
from .item import Item, ArchiveItem, ItemDiff
|
|
from .item import Item, ArchiveItem, ItemDiff
|
|
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
|
|
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
|
|
@@ -1096,6 +1097,19 @@ class ChunksProcessor:
|
|
self.write_checkpoint()
|
|
self.write_checkpoint()
|
|
return length, number
|
|
return length, number
|
|
|
|
|
|
|
|
+ def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
|
|
|
|
+ sig_int_triggered = sig_int and sig_int.action_triggered()
|
|
|
|
+ if forced or sig_int_triggered or \
|
|
|
|
+ self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
|
|
|
|
+ if sig_int_triggered:
|
|
|
|
+ logger.info('checkpoint requested: starting checkpoint creation...')
|
|
|
|
+ from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
|
|
|
|
+ self.last_checkpoint = time.monotonic()
|
|
|
|
+ if sig_int_triggered:
|
|
|
|
+ sig_int.action_completed()
|
|
|
|
+ logger.info('checkpoint requested: finished checkpoint creation!')
|
|
|
|
+ return from_chunk, part_number
|
|
|
|
+
|
|
def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
|
|
def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
|
|
if not chunk_processor:
|
|
if not chunk_processor:
|
|
def chunk_processor(data):
|
|
def chunk_processor(data):
|
|
@@ -1114,17 +1128,14 @@ class ChunksProcessor:
|
|
item.chunks.append(chunk_processor(data))
|
|
item.chunks.append(chunk_processor(data))
|
|
if show_progress:
|
|
if show_progress:
|
|
stats.show_progress(item=item, dt=0.2)
|
|
stats.show_progress(item=item, dt=0.2)
|
|
- if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
|
|
|
|
- from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
|
|
|
|
- self.last_checkpoint = time.monotonic()
|
|
|
|
|
|
+ from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
|
|
else:
|
|
else:
|
|
if part_number > 1:
|
|
if part_number > 1:
|
|
if item.chunks[from_chunk:]:
|
|
if item.chunks[from_chunk:]:
|
|
# if we already have created a part item inside this file, we want to put the final
|
|
# if we already have created a part item inside this file, we want to put the final
|
|
# chunks (if any) into a part item also (so all parts can be concatenated to get
|
|
# chunks (if any) into a part item also (so all parts can be concatenated to get
|
|
# the complete file):
|
|
# the complete file):
|
|
- from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
|
|
|
|
- self.last_checkpoint = time.monotonic()
|
|
|
|
|
|
+ from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)
|
|
|
|
|
|
# if we created part files, we have referenced all chunks from the part files,
|
|
# if we created part files, we have referenced all chunks from the part files,
|
|
# but we also will reference the same chunks also from the final, complete file:
|
|
# but we also will reference the same chunks also from the final, complete file:
|