|
@@ -58,7 +58,6 @@ class Statistics:
|
|
|
self.output_json = output_json
|
|
|
self.iec = iec
|
|
|
self.osize = self.usize = self.nfiles = 0
|
|
|
- self.osize_parts = self.usize_parts = self.nfiles_parts = 0
|
|
|
self.last_progress = 0 # timestamp when last progress was shown
|
|
|
self.files_stats = defaultdict(int)
|
|
|
self.chunking_time = 0.0
|
|
@@ -66,15 +65,10 @@ class Statistics:
|
|
|
self.rx_bytes = 0
|
|
|
self.tx_bytes = 0
|
|
|
|
|
|
- def update(self, size, unique, part=False):
|
|
|
- if not part:
|
|
|
- self.osize += size
|
|
|
- if unique:
|
|
|
- self.usize += size
|
|
|
- else:
|
|
|
- self.osize_parts += size
|
|
|
- if unique:
|
|
|
- self.usize_parts += size
|
|
|
+ def update(self, size, unique):
|
|
|
+ self.osize += size
|
|
|
+ if unique:
|
|
|
+ self.usize += size
|
|
|
|
|
|
def __add__(self, other):
|
|
|
if not isinstance(other, Statistics):
|
|
@@ -83,9 +77,6 @@ class Statistics:
|
|
|
stats.osize = self.osize + other.osize
|
|
|
stats.usize = self.usize + other.usize
|
|
|
stats.nfiles = self.nfiles + other.nfiles
|
|
|
- stats.osize_parts = self.osize_parts + other.osize_parts
|
|
|
- stats.usize_parts = self.usize_parts + other.usize_parts
|
|
|
- stats.nfiles_parts = self.nfiles_parts + other.nfiles_parts
|
|
|
stats.chunking_time = self.chunking_time + other.chunking_time
|
|
|
stats.hashing_time = self.hashing_time + other.hashing_time
|
|
|
for key in other.files_stats:
|
|
@@ -134,20 +125,13 @@ Bytes sent to remote: {stats.tx_bytes}
|
|
|
}
|
|
|
|
|
|
def as_raw_dict(self):
|
|
|
- return {
|
|
|
- "size": self.osize,
|
|
|
- "nfiles": self.nfiles,
|
|
|
- "size_parts": self.osize_parts,
|
|
|
- "nfiles_parts": self.nfiles_parts,
|
|
|
- }
|
|
|
+ return {"size": self.osize, "nfiles": self.nfiles}
|
|
|
|
|
|
@classmethod
|
|
|
def from_raw_dict(cls, **kw):
|
|
|
self = cls()
|
|
|
self.osize = kw["size"]
|
|
|
self.nfiles = kw["nfiles"]
|
|
|
- self.osize_parts = kw["size_parts"]
|
|
|
- self.nfiles_parts = kw["nfiles_parts"]
|
|
|
return self
|
|
|
|
|
|
@property
|
|
@@ -353,6 +337,7 @@ class ChunkBuffer:
|
|
|
self.chunks = []
|
|
|
self.key = key
|
|
|
self.chunker = get_chunker(*chunker_params, seed=self.key.chunk_seed, sparse=False)
|
|
|
+ self.saved_chunks_len = None
|
|
|
|
|
|
def add(self, item):
|
|
|
self.buffer.write(self.packer.pack(item.as_dict()))
|
|
@@ -392,6 +377,18 @@ class ChunkBuffer:
|
|
|
def is_full(self):
|
|
|
return self.buffer.tell() > self.BUFFER_SIZE
|
|
|
|
|
|
+ def save_chunks_state(self):
|
|
|
+ # as we only append to self.chunks, remembering the current length is good enough
|
|
|
+ self.saved_chunks_len = len(self.chunks)
|
|
|
+
|
|
|
+ def restore_chunks_state(self):
|
|
|
+ scl = self.saved_chunks_len
|
|
|
+ assert scl is not None, "forgot to call save_chunks_state?"
|
|
|
+ tail_chunks = self.chunks[scl:]
|
|
|
+ del self.chunks[scl:]
|
|
|
+ self.saved_chunks_len = None
|
|
|
+ return tail_chunks
|
|
|
+
|
|
|
|
|
|
class CacheChunkBuffer(ChunkBuffer):
|
|
|
def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
|
|
@@ -484,7 +481,6 @@ class Archive:
|
|
|
start=None,
|
|
|
start_monotonic=None,
|
|
|
end=None,
|
|
|
- consider_part_files=False,
|
|
|
log_json=False,
|
|
|
iec=False,
|
|
|
):
|
|
@@ -519,7 +515,6 @@ class Archive:
|
|
|
if end is None:
|
|
|
end = archive_ts_now()
|
|
|
self.end = end
|
|
|
- self.consider_part_files = consider_part_files
|
|
|
self.pipeline = DownloadPipeline(self.repository, self.repo_objs)
|
|
|
self.create = create
|
|
|
if self.create:
|
|
@@ -629,9 +624,6 @@ Duration: {0.duration}
|
|
|
return "Archive(%r)" % self.name
|
|
|
|
|
|
def item_filter(self, item, filter=None):
|
|
|
- if not self.consider_part_files and "part" in item:
|
|
|
- # this is a part(ial) file, we usually don't want to consider it.
|
|
|
- return False
|
|
|
return filter(item) if filter else True
|
|
|
|
|
|
def iter_items(self, filter=None, preload=False):
|
|
@@ -649,6 +641,15 @@ Duration: {0.duration}
|
|
|
stats.show_progress(item=item, dt=0.2)
|
|
|
self.items_buffer.add(item)
|
|
|
|
|
|
+ def prepare_checkpoint(self):
|
|
|
+ # we need to flush the archive metadata stream to repo chunks, so that
|
|
|
+ # we have the metadata stream chunks WITHOUT the part file item we add later.
|
|
|
+ # The part file item will then get into its own metadata stream chunk, which we
|
|
|
+ # can easily NOT include into the next checkpoint or the final archive.
|
|
|
+ self.items_buffer.flush(flush=True)
|
|
|
+ # remember the current state of self.chunks, which corresponds to the flushed chunks
|
|
|
+ self.items_buffer.save_chunks_state()
|
|
|
+
|
|
|
def write_checkpoint(self):
|
|
|
metadata = self.save(self.checkpoint_name)
|
|
|
# that .save() has committed the repo.
|
|
@@ -660,6 +661,11 @@ Duration: {0.duration}
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
|
for id in metadata.item_ptrs:
|
|
|
self.cache.chunk_decref(id, self.stats)
|
|
|
+ # also get rid of that part item, we do not want to have it in next checkpoint or final archive
|
|
|
+ tail_chunks = self.items_buffer.restore_chunks_state()
|
|
|
+ # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
|
|
|
+ for id in tail_chunks:
|
|
|
+ self.cache.chunk_decref(id, self.stats)
|
|
|
|
|
|
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
|
|
|
name = name or self.name
|
|
@@ -694,14 +700,7 @@ Duration: {0.duration}
|
|
|
# because borg info relies on them. so, either use the given stats (from args)
|
|
|
# or fall back to self.stats if it was not given.
|
|
|
stats = stats or self.stats
|
|
|
- metadata.update(
|
|
|
- {
|
|
|
- "size": stats.osize,
|
|
|
- "nfiles": stats.nfiles,
|
|
|
- "size_parts": stats.osize_parts,
|
|
|
- "nfiles_parts": stats.nfiles_parts,
|
|
|
- }
|
|
|
- )
|
|
|
+ metadata.update({"size": stats.osize, "nfiles": stats.nfiles})
|
|
|
metadata.update(additional_metadata or {})
|
|
|
metadata = ArchiveItem(metadata)
|
|
|
data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
|
|
@@ -751,12 +750,9 @@ Duration: {0.duration}
|
|
|
pi.finish()
|
|
|
|
|
|
stats = Statistics(iec=self.iec)
|
|
|
- stats.usize = unique_size # the part files use same chunks as the full file
|
|
|
+ stats.usize = unique_size
|
|
|
stats.nfiles = self.metadata.nfiles
|
|
|
stats.osize = self.metadata.size
|
|
|
- if self.consider_part_files:
|
|
|
- stats.nfiles += self.metadata.nfiles_parts
|
|
|
- stats.osize += self.metadata.size_parts
|
|
|
return stats
|
|
|
|
|
|
@contextmanager
|
|
@@ -1038,9 +1034,9 @@ Duration: {0.duration}
|
|
|
error = True
|
|
|
return exception_ignored # must not return None here
|
|
|
|
|
|
- def chunk_decref(id, stats, part=False):
|
|
|
+ def chunk_decref(id, stats):
|
|
|
try:
|
|
|
- self.cache.chunk_decref(id, stats, wait=False, part=part)
|
|
|
+ self.cache.chunk_decref(id, stats, wait=False)
|
|
|
except KeyError:
|
|
|
cid = bin_to_hex(id)
|
|
|
raise ChunksIndexError(cid)
|
|
@@ -1064,9 +1060,8 @@ Duration: {0.duration}
|
|
|
for item in unpacker:
|
|
|
item = Item(internal_dict=item)
|
|
|
if "chunks" in item:
|
|
|
- part = not self.consider_part_files and "part" in item
|
|
|
for chunk_id, size in item.chunks:
|
|
|
- chunk_decref(chunk_id, stats, part=part)
|
|
|
+ chunk_decref(chunk_id, stats)
|
|
|
except (TypeError, ValueError):
|
|
|
# if items metadata spans multiple chunks and one chunk got dropped somehow,
|
|
|
# it could be that unpacker yields bad types
|
|
@@ -1234,10 +1229,22 @@ def cached_hash(chunk, id_hash):
|
|
|
class ChunksProcessor:
|
|
|
# Processes an iterator of chunks for an Item
|
|
|
|
|
|
- def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, checkpoint_volume, rechunkify):
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ key,
|
|
|
+ cache,
|
|
|
+ add_item,
|
|
|
+ prepare_checkpoint,
|
|
|
+ write_checkpoint,
|
|
|
+ checkpoint_interval,
|
|
|
+ checkpoint_volume,
|
|
|
+ rechunkify,
|
|
|
+ ):
|
|
|
self.key = key
|
|
|
self.cache = cache
|
|
|
self.add_item = add_item
|
|
|
+ self.prepare_checkpoint = prepare_checkpoint
|
|
|
self.write_checkpoint = write_checkpoint
|
|
|
self.rechunkify = rechunkify
|
|
|
# time interval based checkpointing
|
|
@@ -1248,38 +1255,34 @@ class ChunksProcessor:
|
|
|
self.current_volume = 0
|
|
|
self.last_volume_checkpoint = 0
|
|
|
|
|
|
- def write_part_file(self, item, from_chunk, number):
|
|
|
+ def write_part_file(self, item):
|
|
|
+ self.prepare_checkpoint()
|
|
|
item = Item(internal_dict=item.as_dict())
|
|
|
- length = len(item.chunks)
|
|
|
- # the item should only have the *additional* chunks we processed after the last partial item:
|
|
|
- item.chunks = item.chunks[from_chunk:]
|
|
|
# for borg recreate, we already have a size member in the source item (giving the total file size),
|
|
|
# but we consider only a part of the file here, thus we must recompute the size from the chunks:
|
|
|
item.get_size(memorize=True, from_chunks=True)
|
|
|
- item.path += ".borg_part_%d" % number
|
|
|
- item.part = number
|
|
|
- number += 1
|
|
|
+ item.path += ".borg_part"
|
|
|
self.add_item(item, show_progress=False)
|
|
|
self.write_checkpoint()
|
|
|
- return length, number
|
|
|
|
|
|
- def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
|
|
|
+ def maybe_checkpoint(self, item):
|
|
|
+ checkpoint_done = False
|
|
|
sig_int_triggered = sig_int and sig_int.action_triggered()
|
|
|
if (
|
|
|
- forced
|
|
|
- or sig_int_triggered
|
|
|
+ sig_int_triggered
|
|
|
or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval)
|
|
|
or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume)
|
|
|
):
|
|
|
if sig_int_triggered:
|
|
|
logger.info("checkpoint requested: starting checkpoint creation...")
|
|
|
- from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
|
|
|
+ self.write_part_file(item)
|
|
|
+ checkpoint_done = True
|
|
|
self.last_checkpoint = time.monotonic()
|
|
|
self.last_volume_checkpoint = self.current_volume
|
|
|
if sig_int_triggered:
|
|
|
sig_int.action_completed()
|
|
|
logger.info("checkpoint requested: finished checkpoint creation!")
|
|
|
- return from_chunk, part_number
|
|
|
+ return checkpoint_done # whether a checkpoint archive was created
|
|
|
|
|
|
def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
|
|
|
if not chunk_processor:
|
|
@@ -1297,28 +1300,13 @@ class ChunksProcessor:
|
|
|
# to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
|
|
|
if self.rechunkify and "chunks_healthy" in item:
|
|
|
del item.chunks_healthy
|
|
|
- from_chunk = 0
|
|
|
- part_number = 1
|
|
|
for chunk in chunk_iter:
|
|
|
cle = chunk_processor(chunk)
|
|
|
item.chunks.append(cle)
|
|
|
self.current_volume += cle[1]
|
|
|
if show_progress:
|
|
|
stats.show_progress(item=item, dt=0.2)
|
|
|
- from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
|
|
|
- else:
|
|
|
- if part_number > 1:
|
|
|
- if item.chunks[from_chunk:]:
|
|
|
- # if we already have created a part item inside this file, we want to put the final
|
|
|
- # chunks (if any) into a part item also (so all parts can be concatenated to get
|
|
|
- # the complete file):
|
|
|
- from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)
|
|
|
-
|
|
|
- # if we created part files, we have referenced all chunks from the part files,
|
|
|
- # but we also will reference the same chunks also from the final, complete file:
|
|
|
- for chunk in item.chunks:
|
|
|
- cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
|
|
|
- stats.nfiles_parts += part_number - 1
|
|
|
+ self.maybe_checkpoint(item)
|
|
|
|
|
|
|
|
|
class FilesystemObjectProcessors:
|
|
@@ -2474,6 +2462,7 @@ class ArchiveRecreater:
|
|
|
cache=self.cache,
|
|
|
key=self.key,
|
|
|
add_item=target.add_item,
|
|
|
+ prepare_checkpoint=target.prepare_checkpoint,
|
|
|
write_checkpoint=target.write_checkpoint,
|
|
|
checkpoint_interval=self.checkpoint_interval,
|
|
|
checkpoint_volume=self.checkpoint_volume,
|