|
@@ -534,26 +534,9 @@ class LoggedIO:
|
|
|
offset = MAGIC_LEN
|
|
|
header = fd.read(self.header_fmt.size)
|
|
|
while header:
|
|
|
- try:
|
|
|
- crc, size, tag = self.header_fmt.unpack(header)
|
|
|
- except struct.error as err:
|
|
|
- raise IntegrityError('Invalid segment entry header [offset {}]: {}'.format(offset, err))
|
|
|
- if size > MAX_OBJECT_SIZE or size < self.header_fmt.size:
|
|
|
- raise IntegrityError('Invalid segment entry size [offset {}]'.format(offset))
|
|
|
- length = size - self.header_fmt.size
|
|
|
- rest = fd.read(length)
|
|
|
- if len(rest) != length:
|
|
|
- raise IntegrityError('Segment entry data short read [offset {}]: expected: {}, got {} bytes'.format(
|
|
|
- offset, length, len(rest)))
|
|
|
- if crc32(rest, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
|
|
|
- raise IntegrityError('Segment entry checksum mismatch [offset {}]'.format(offset))
|
|
|
- if tag not in (TAG_PUT, TAG_DELETE, TAG_COMMIT):
|
|
|
- raise IntegrityError('Invalid segment entry tag [offset {}]'.format(offset))
|
|
|
- key = None
|
|
|
- if tag in (TAG_PUT, TAG_DELETE):
|
|
|
- key = rest[:32]
|
|
|
+ size, tag, key, data = self._read(fd, self.header_fmt, header, offset, (TAG_PUT, TAG_DELETE, TAG_COMMIT))
|
|
|
if include_data:
|
|
|
- yield tag, key, offset, rest[32:]
|
|
|
+ yield tag, key, offset, data
|
|
|
else:
|
|
|
yield tag, key, offset
|
|
|
offset += size
|
|
@@ -586,16 +569,39 @@ class LoggedIO:
|
|
|
fd = self.get_fd(segment)
|
|
|
fd.seek(offset)
|
|
|
header = fd.read(self.put_header_fmt.size)
|
|
|
- crc, size, tag, key = self.put_header_fmt.unpack(header)
|
|
|
- if size > MAX_OBJECT_SIZE:
|
|
|
- raise IntegrityError('Invalid segment object size')
|
|
|
- data = fd.read(size - self.put_header_fmt.size)
|
|
|
- if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
|
|
|
- raise IntegrityError('Segment checksum mismatch')
|
|
|
- if tag != TAG_PUT or id != key:
|
|
|
- raise IntegrityError('Invalid segment entry header')
|
|
|
+ size, tag, key, data = self._read(fd, self.put_header_fmt, header, offset, (TAG_PUT, ))
|
|
|
+ if id != key:
|
|
|
+ raise IntegrityError('Invalid segment entry header, is not for wanted id [offset {}]'.format(offset))
|
|
|
return data
|
|
|
|
|
|
+ def _read(self, fd, fmt, header, offset, acceptable_tags):
|
|
|
+ # some code shared by read() and iter_objects()
|
|
|
+ try:
|
|
|
+ hdr_tuple = fmt.unpack(header)
|
|
|
+ except struct.error as err:
|
|
|
+ raise IntegrityError('Invalid segment entry header [offset {}]: {}'.format(offset, err))
|
|
|
+ if fmt is self.put_header_fmt:
|
|
|
+ crc, size, tag, key = hdr_tuple
|
|
|
+ elif fmt is self.header_fmt:
|
|
|
+ crc, size, tag = hdr_tuple
|
|
|
+ key = None
|
|
|
+ else:
|
|
|
+ raise TypeError("_read called with unsupported format")
|
|
|
+ if size > MAX_OBJECT_SIZE or size < fmt.size:
|
|
|
+ raise IntegrityError('Invalid segment entry size [offset {}]'.format(offset))
|
|
|
+ length = size - fmt.size
|
|
|
+ data = fd.read(length)
|
|
|
+ if len(data) != length:
|
|
|
+ raise IntegrityError('Segment entry data short read [offset {}]: expected: {}, got {} bytes'.format(
|
|
|
+ offset, length, len(data)))
|
|
|
+ if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
|
|
|
+ raise IntegrityError('Segment entry checksum mismatch [offset {}]'.format(offset))
|
|
|
+ if tag not in acceptable_tags:
|
|
|
+ raise IntegrityError('Invalid segment entry header, did not get acceptable tag [offset {}]'.format(offset))
|
|
|
+ if key is None and tag in (TAG_PUT, TAG_DELETE):
|
|
|
+ key, data = data[:32], data[32:]
|
|
|
+ return size, tag, key, data
|
|
|
+
|
|
|
def write_put(self, id, data):
|
|
|
size = len(data) + self.put_header_fmt.size
|
|
|
fd = self.get_write_fd()
|