|
@@ -273,14 +273,16 @@ class DownloadPipeline:
|
|
|
"""
|
|
|
self.hlids_preloaded = set()
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
|
- for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM):
|
|
|
+ for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM, replacement_chunk=False):
|
|
|
+ if data is None:
|
|
|
+ continue # archive stream chunk missing
|
|
|
unpacker.feed(data)
|
|
|
for _item in unpacker:
|
|
|
item = Item(internal_dict=_item)
|
|
|
if filter is None or filter(item):
|
|
|
if "chunks" in item:
|
|
|
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
|
|
|
- if "chunks_healthy" in item:
|
|
|
+ if "chunks_healthy" in item: # legacy
|
|
|
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
|
|
|
yield item
|
|
|
|
|
@@ -312,10 +314,32 @@ class DownloadPipeline:
|
|
|
self.repository.preload([c.id for c in item.chunks])
|
|
|
return preload_chunks
|
|
|
|
|
|
- def fetch_many(self, ids, is_preloaded=False, ro_type=None):
|
|
|
+ def fetch_many(self, chunks, is_preloaded=False, ro_type=None, replacement_chunk=True):
|
|
|
assert ro_type is not None
|
|
|
- for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
|
|
|
- _, data = self.repo_objs.parse(id_, cdata, ro_type=ro_type)
|
|
|
+ ids = []
|
|
|
+ sizes = []
|
|
|
+ if all(isinstance(chunk, ChunkListEntry) for chunk in chunks):
|
|
|
+ for chunk in chunks:
|
|
|
+ ids.append(chunk.id)
|
|
|
+ sizes.append(chunk.size)
|
|
|
+ elif all(isinstance(chunk, bytes) for chunk in chunks):
|
|
|
+ ids = list(chunks)
|
|
|
+ sizes = [None] * len(ids)
|
|
|
+ else:
|
|
|
+ raise TypeError(f"unsupported or mixed element types: {chunks}")
|
|
|
+ for id, size, cdata in zip(
|
|
|
+ ids, sizes, self.repository.get_many(ids, is_preloaded=is_preloaded, raise_missing=False)
|
|
|
+ ):
|
|
|
+ if cdata is None:
|
|
|
+ if replacement_chunk and size is not None:
|
|
|
+ logger.error(f"repository object {bin_to_hex(id)} missing, returning {size} zero bytes.")
|
|
|
+ data = zeros[:size] # return an all-zero replacement chunk of correct size
|
|
|
+ else:
|
|
|
+ logger.error(f"repository object {bin_to_hex(id)} missing, returning None.")
|
|
|
+ data = None
|
|
|
+ else:
|
|
|
+ _, data = self.repo_objs.parse(id, cdata, ro_type=ro_type)
|
|
|
+ assert size is None or len(data) == size
|
|
|
yield data
|
|
|
|
|
|
|
|
@@ -762,7 +786,6 @@ Duration: {0.duration}
|
|
|
# if a previous extraction was interrupted between setting the mtime and setting non-default flags.
|
|
|
return True
|
|
|
|
|
|
- has_damaged_chunks = "chunks_healthy" in item
|
|
|
if dry_run or stdout:
|
|
|
with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set:
|
|
|
if not hardlink_set:
|
|
@@ -771,9 +794,7 @@ Duration: {0.duration}
|
|
|
# it would get stuck.
|
|
|
if "chunks" in item:
|
|
|
item_chunks_size = 0
|
|
|
- for data in self.pipeline.fetch_many(
|
|
|
- [c.id for c in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM
|
|
|
- ):
|
|
|
+ for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
|
|
|
if pi:
|
|
|
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
|
|
|
if stdout:
|
|
@@ -789,8 +810,6 @@ Duration: {0.duration}
|
|
|
item_size, item_chunks_size
|
|
|
)
|
|
|
)
|
|
|
- if has_damaged_chunks:
|
|
|
- raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")
|
|
|
return
|
|
|
|
|
|
dest = self.cwd
|
|
@@ -824,8 +843,7 @@ Duration: {0.duration}
|
|
|
with backup_io("open"):
|
|
|
fd = open(path, "wb")
|
|
|
with fd:
|
|
|
- ids = [c.id for c in item.chunks]
|
|
|
- for data in self.pipeline.fetch_many(ids, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
|
|
|
+ for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
|
|
|
if pi:
|
|
|
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
|
|
|
with backup_io("write"):
|
|
@@ -845,8 +863,6 @@ Duration: {0.duration}
|
|
|
raise BackupError(
|
|
|
f"Size inconsistency detected: size {item_size}, chunks size {item_chunks_size}"
|
|
|
)
|
|
|
- if has_damaged_chunks:
|
|
|
- raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")
|
|
|
return
|
|
|
with backup_io:
|
|
|
# No repository access beyond this point.
|
|
@@ -1010,8 +1026,8 @@ Duration: {0.duration}
|
|
|
path,
|
|
|
item1,
|
|
|
item2,
|
|
|
- archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])], ro_type=ROBJ_FILE_STREAM),
|
|
|
- archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])], ro_type=ROBJ_FILE_STREAM),
|
|
|
+ archive1.pipeline.fetch_many(item1.get("chunks", []), ro_type=ROBJ_FILE_STREAM),
|
|
|
+ archive2.pipeline.fetch_many(item2.get("chunks", []), ro_type=ROBJ_FILE_STREAM),
|
|
|
can_compare_chunk_ids=can_compare_chunk_ids,
|
|
|
)
|
|
|
|
|
@@ -1159,10 +1175,6 @@ class ChunksProcessor:
|
|
|
return chunk_entry
|
|
|
|
|
|
item.chunks = []
|
|
|
- # if we rechunkify, we'll get a fundamentally different chunks list, thus we need
|
|
|
- # to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
|
|
|
- if self.rechunkify and "chunks_healthy" in item:
|
|
|
- del item.chunks_healthy
|
|
|
for chunk in chunk_iter:
|
|
|
chunk_entry = chunk_processor(chunk)
|
|
|
item.chunks.append(chunk_entry)
|
|
@@ -1779,13 +1791,10 @@ class ArchiveChecker:
|
|
|
if defect_chunks:
|
|
|
if self.repair:
|
|
|
# if we kill the defect chunk here, subsequent actions within this "borg check"
|
|
|
- # run will find missing chunks and replace them with all-zero replacement
|
|
|
- # chunks and flag the files as "repaired".
|
|
|
- # if another backup is done later and the missing chunks get backed up again,
|
|
|
- # a "borg check" afterwards can heal all files where this chunk was missing.
|
|
|
+ # run will find missing chunks.
|
|
|
logger.warning(
|
|
|
- "Found defect chunks. They will be deleted now, so affected files can "
|
|
|
- "get repaired now and maybe healed later."
|
|
|
+ "Found defect chunks and will delete them now. "
|
|
|
+ "Reading files referencing these chunks will result in an I/O error."
|
|
|
)
|
|
|
for defect_chunk in defect_chunks:
|
|
|
# remote repo (ssh): retry might help for strange network / NIC / RAM errors
|
|
@@ -1805,10 +1814,7 @@ class ArchiveChecker:
|
|
|
else:
|
|
|
logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk))
|
|
|
else:
|
|
|
- logger.warning(
|
|
|
- "Found defect chunks. With --repair, they would get deleted, so affected "
|
|
|
- "files could get repaired then and maybe healed later."
|
|
|
- )
|
|
|
+ logger.warning("Found defect chunks. With --repair, they would get deleted.")
|
|
|
for defect_chunk in defect_chunks:
|
|
|
logger.debug("chunk %s is defect.", bin_to_hex(defect_chunk))
|
|
|
log = logger.error if errors else logger.info
|
|
@@ -1919,80 +1925,18 @@ class ArchiveChecker:
|
|
|
self.repository.put(id_, cdata)
|
|
|
|
|
|
def verify_file_chunks(archive_name, item):
|
|
|
- """Verifies that all file chunks are present.
|
|
|
-
|
|
|
- Missing file chunks will be replaced with new chunks of the same length containing all zeros.
|
|
|
- If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
|
|
|
- """
|
|
|
-
|
|
|
- def replacement_chunk(size):
|
|
|
- chunk = Chunk(None, allocation=CH_ALLOC, size=size)
|
|
|
- chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
|
|
- cdata = self.repo_objs.format(chunk_id, {}, data, ro_type=ROBJ_FILE_STREAM)
|
|
|
- return chunk_id, size, cdata
|
|
|
-
|
|
|
+ """Verifies that all file chunks are present. Missing file chunks will be logged."""
|
|
|
offset = 0
|
|
|
- chunk_list = []
|
|
|
- chunks_replaced = False
|
|
|
- has_chunks_healthy = "chunks_healthy" in item
|
|
|
- chunks_current = item.chunks
|
|
|
- chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
|
|
|
- if has_chunks_healthy and len(chunks_current) != len(chunks_healthy):
|
|
|
- # should never happen, but there was issue #3218.
|
|
|
- logger.warning(f"{archive_name}: {item.path}: Invalid chunks_healthy metadata removed!")
|
|
|
- del item.chunks_healthy
|
|
|
- has_chunks_healthy = False
|
|
|
- chunks_healthy = chunks_current
|
|
|
- for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
|
|
|
- chunk_id, size = chunk_healthy
|
|
|
+ for chunk in item.chunks:
|
|
|
+ chunk_id, size = chunk
|
|
|
if chunk_id not in self.chunks:
|
|
|
- # a chunk of the healthy list is missing
|
|
|
- if chunk_current == chunk_healthy:
|
|
|
- logger.error(
|
|
|
- "{}: {}: New missing file chunk detected (Byte {}-{}, Chunk {}). "
|
|
|
- "Replacing with all-zero chunk.".format(
|
|
|
- archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
|
|
|
- )
|
|
|
+ logger.error(
|
|
|
+ "{}: {}: Missing file chunk detected (Byte {}-{}, Chunk {}).".format(
|
|
|
+ archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
|
|
|
)
|
|
|
- self.error_found = chunks_replaced = True
|
|
|
- chunk_id, size, cdata = replacement_chunk(size)
|
|
|
- add_reference(chunk_id, size, cdata)
|
|
|
- else:
|
|
|
- logger.info(
|
|
|
- "{}: {}: Previously missing file chunk is still missing (Byte {}-{}, Chunk {}). "
|
|
|
- "It has an all-zero replacement chunk already.".format(
|
|
|
- archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
|
|
|
- )
|
|
|
- )
|
|
|
- chunk_id, size = chunk_current
|
|
|
- if chunk_id not in self.chunks:
|
|
|
- logger.warning(
|
|
|
- "{}: {}: Missing all-zero replacement chunk detected (Byte {}-{}, Chunk {}). "
|
|
|
- "Generating new replacement chunk.".format(
|
|
|
- archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
|
|
|
- )
|
|
|
- )
|
|
|
- self.error_found = chunks_replaced = True
|
|
|
- chunk_id, size, cdata = replacement_chunk(size)
|
|
|
- add_reference(chunk_id, size, cdata)
|
|
|
- else:
|
|
|
- if chunk_current == chunk_healthy:
|
|
|
- pass # normal case, all fine.
|
|
|
- else:
|
|
|
- logger.info(
|
|
|
- "{}: {}: Healed previously missing file chunk! (Byte {}-{}, Chunk {}).".format(
|
|
|
- archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
|
|
|
- )
|
|
|
- )
|
|
|
- chunk_list.append([chunk_id, size]) # list-typed element as chunks_healthy is list-of-lists
|
|
|
+ )
|
|
|
+ self.error_found = True
|
|
|
offset += size
|
|
|
- if chunks_replaced and not has_chunks_healthy:
|
|
|
- # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
|
|
|
- item.chunks_healthy = item.chunks
|
|
|
- if has_chunks_healthy and chunk_list == chunks_healthy:
|
|
|
- logger.info(f"{archive_name}: {item.path}: Completely healed previously damaged file!")
|
|
|
- del item.chunks_healthy
|
|
|
- item.chunks = chunk_list
|
|
|
if "size" in item:
|
|
|
item_size = item.size
|
|
|
item_chunks_size = item.get_size(from_chunks=True)
|
|
@@ -2270,7 +2214,7 @@ class ArchiveRecreater:
|
|
|
return chunk_entry
|
|
|
|
|
|
def iter_chunks(self, archive, target, chunks):
|
|
|
- chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks], ro_type=ROBJ_FILE_STREAM)
|
|
|
+ chunk_iterator = archive.pipeline.fetch_many(chunks, ro_type=ROBJ_FILE_STREAM)
|
|
|
if target.recreate_rechunkify:
|
|
|
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
|
|
|
# (does not load the entire file into memory)
|