|
@@ -754,6 +754,9 @@ def valid_msgpacked_dict(d, keys_serialized):
|
|
|
class RobustUnpacker:
|
|
|
"""A restartable/robust version of the streaming msgpack unpacker
|
|
|
"""
|
|
|
+ class UnpackerCrashed(Exception):
|
|
|
+ """raise if unpacker crashed"""
|
|
|
+
|
|
|
def __init__(self, validator, item_keys):
|
|
|
super().__init__()
|
|
|
self.item_keys = [msgpack.packb(name) for name in item_keys]
|
|
@@ -798,7 +801,10 @@ class RobustUnpacker:
|
|
|
pass
|
|
|
data = data[1:]
|
|
|
else:
|
|
|
- return next(self._unpacker)
|
|
|
+ try:
|
|
|
+ return next(self._unpacker)
|
|
|
+ except (TypeError, ValueError) as err:
|
|
|
+ raise self.UnpackerCrashed(str(err))
|
|
|
|
|
|
|
|
|
class ArchiveChecker:
|
|
@@ -1017,6 +1023,9 @@ class ArchiveChecker:
|
|
|
yield item
|
|
|
else:
|
|
|
report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
|
|
|
+ except RobustUnpacker.UnpackerCrashed as err:
|
|
|
+ report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
|
|
|
+ unpacker.resync()
|
|
|
except Exception:
|
|
|
report('Exception while unpacking item metadata', chunk_id, i)
|
|
|
raise
|