|
@@ -754,6 +754,9 @@ def valid_msgpacked_dict(d, keys_serialized):
|
|
|
class RobustUnpacker:
|
|
|
"""A restartable/robust version of the streaming msgpack unpacker
|
|
|
"""
|
|
|
+ class UnpackerCrashed(Exception):
|
|
|
+ """raise if unpacker crashed"""
|
|
|
+
|
|
|
def __init__(self, validator, item_keys):
|
|
|
super().__init__()
|
|
|
self.item_keys = [msgpack.packb(name) for name in item_keys]
|
|
@@ -776,6 +779,14 @@ class RobustUnpacker:
|
|
|
return self
|
|
|
|
|
|
def __next__(self):
|
|
|
+ def unpack_next():
|
|
|
+ try:
|
|
|
+ return next(self._unpacker)
|
|
|
+ except (TypeError, ValueError) as err:
|
|
|
+ # transform exceptions that might be raised when feeding
|
|
|
+ # msgpack with invalid data to a more specific exception
|
|
|
+ raise self.UnpackerCrashed(str(err))
|
|
|
+
|
|
|
if self._resync:
|
|
|
data = b''.join(self._buffered_data)
|
|
|
while self._resync:
|
|
@@ -788,17 +799,17 @@ class RobustUnpacker:
|
|
|
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
|
|
|
self._unpacker.feed(data)
|
|
|
try:
|
|
|
- item = next(self._unpacker)
|
|
|
+ item = unpack_next()
|
|
|
+ except (self.UnpackerCrashed, StopIteration):
|
|
|
+ # as long as we are resyncing, we also ignore StopIteration
|
|
|
+ pass
|
|
|
+ else:
|
|
|
if self.validator(item):
|
|
|
self._resync = False
|
|
|
return item
|
|
|
- # Ignore exceptions that might be raised when feeding
|
|
|
- # msgpack with invalid data
|
|
|
- except (TypeError, ValueError, StopIteration):
|
|
|
- pass
|
|
|
data = data[1:]
|
|
|
else:
|
|
|
- return next(self._unpacker)
|
|
|
+ return unpack_next()
|
|
|
|
|
|
|
|
|
class ArchiveChecker:
|
|
@@ -1017,6 +1028,9 @@ class ArchiveChecker:
|
|
|
yield item
|
|
|
else:
|
|
|
report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
|
|
|
+ except RobustUnpacker.UnpackerCrashed as err:
|
|
|
+ report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
|
|
|
+ unpacker.resync()
|
|
|
except Exception:
|
|
|
report('Exception while unpacking item metadata', chunk_id, i)
|
|
|
raise
|