|
@@ -9,11 +9,14 @@ logger = logging.getLogger(__name__)
|
|
|
import os
|
|
|
import shutil
|
|
|
import struct
|
|
|
+from collections import defaultdict
|
|
|
+from functools import partial
|
|
|
from zlib import crc32
|
|
|
|
|
|
import msgpack
|
|
|
from .constants import * # NOQA
|
|
|
-from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent, bin_to_hex
|
|
|
+from .helpers import Error, ErrorWithTraceback, IntegrityError, InternalOSError, Location, ProgressIndicatorPercent, \
|
|
|
+ bin_to_hex
|
|
|
from .hashindex import NSIndex
|
|
|
from .locking import UpgradableLock, LockError, LockErrorT
|
|
|
from .lrucache import LRUCache
|
|
@@ -26,17 +29,55 @@ TAG_PUT = 0
|
|
|
TAG_DELETE = 1
|
|
|
TAG_COMMIT = 2
|
|
|
|
|
|
+FreeSpace = partial(defaultdict, int)
|
|
|
+
|
|
|
|
|
|
class Repository:
|
|
|
- """Filesystem based transactional key value store
|
|
|
+ """
|
|
|
+ Filesystem based transactional key value store
|
|
|
+
|
|
|
+ Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
|
|
|
+ called segments. Each segment is a series of log entries. The segment number together with the offset of each
|
|
|
+ entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
|
|
|
+ time for the purposes of the log.
|
|
|
+
|
|
|
+ Log entries are either PUT, DELETE or COMMIT.
|
|
|
+
|
|
|
+ A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
|
|
|
+ segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
|
|
|
+ is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
|
|
|
+
|
|
|
+ When reading from a repository it is first checked whether the last segment is committed. If it is not, then
|
|
|
+ all segments after the last committed segment are deleted; they contain log entries whose consistency is not
|
|
|
+ established by a COMMIT.
|
|
|
+
|
|
|
+ Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
|
|
|
+ the platform (including the hardware). See platform_base.SyncFile for details.
|
|
|
+
|
|
|
+ A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
|
|
|
+ full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
|
|
|
+
|
|
|
+ A DELETE marks a key as deleted.
|
|
|
+
|
|
|
+ For a given key only the last entry regarding the key, which is called current (all other entries are called
|
|
|
+ superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
|
|
|
+ Otherwise the last PUT defines the value of the key.
|
|
|
+
|
|
|
+ By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
|
|
|
+ such obsolete entries is called sparse, while a segment containing no such entries is called compact.
|
|
|
+
|
|
|
+ Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
|
|
|
+ superseded entries where current.
|
|
|
|
|
|
On disk layout:
|
|
|
+
|
|
|
dir/README
|
|
|
dir/config
|
|
|
dir/data/<X // SEGMENTS_PER_DIR>/<X>
|
|
|
dir/index.X
|
|
|
dir/hints.X
|
|
|
"""
|
|
|
+
|
|
|
class DoesNotExist(Error):
|
|
|
"""Repository {} does not exist."""
|
|
|
|
|
@@ -138,7 +179,7 @@ class Repository:
|
|
|
else:
|
|
|
return None
|
|
|
|
|
|
- def get_transaction_id(self):
|
|
|
+ def check_transaction(self):
|
|
|
index_transaction_id = self.get_index_transaction_id()
|
|
|
segments_transaction_id = self.io.get_segments_transaction_id()
|
|
|
if index_transaction_id is not None and segments_transaction_id is None:
|
|
@@ -151,6 +192,9 @@ class Repository:
|
|
|
else:
|
|
|
replay_from = index_transaction_id
|
|
|
self.replay_segments(replay_from, segments_transaction_id)
|
|
|
+
|
|
|
+ def get_transaction_id(self):
|
|
|
+ self.check_transaction()
|
|
|
return self.get_index_transaction_id()
|
|
|
|
|
|
def break_lock(self):
|
|
@@ -191,10 +235,27 @@ class Repository:
|
|
|
self.write_index()
|
|
|
self.rollback()
|
|
|
|
|
|
- def open_index(self, transaction_id):
|
|
|
+ def open_index(self, transaction_id, auto_recover=True):
|
|
|
if transaction_id is None:
|
|
|
return NSIndex()
|
|
|
- return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'))
|
|
|
+ index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
|
|
|
+ try:
|
|
|
+ return NSIndex.read(index_path)
|
|
|
+ except RuntimeError as error:
|
|
|
+ assert str(error) == 'hashindex_read failed' # everything else means we're in *deep* trouble
|
|
|
+ logger.warning('Repository index missing or corrupted, trying to recover')
|
|
|
+ try:
|
|
|
+ os.unlink(index_path)
|
|
|
+ except OSError as e:
|
|
|
+ raise InternalOSError(e) from None
|
|
|
+ if not auto_recover:
|
|
|
+ raise
|
|
|
+ self.prepare_txn(self.get_transaction_id())
|
|
|
+ # don't leave an open transaction around
|
|
|
+ self.commit()
|
|
|
+ return self.open_index(self.get_transaction_id())
|
|
|
+ except OSError as e:
|
|
|
+ raise InternalOSError(e) from None
|
|
|
|
|
|
def prepare_txn(self, transaction_id, do_cleanup=True):
|
|
|
self._active_txn = True
|
|
@@ -207,24 +268,51 @@ class Repository:
|
|
|
self._active_txn = False
|
|
|
raise
|
|
|
if not self.index or transaction_id is None:
|
|
|
- self.index = self.open_index(transaction_id)
|
|
|
+ try:
|
|
|
+ self.index = self.open_index(transaction_id, False)
|
|
|
+ except RuntimeError:
|
|
|
+ self.check_transaction()
|
|
|
+ self.index = self.open_index(transaction_id, False)
|
|
|
if transaction_id is None:
|
|
|
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
|
|
|
- self.compact = set() # XXX bad name: segments_needing_compaction = self.compact
|
|
|
+ self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
|
|
|
else:
|
|
|
if do_cleanup:
|
|
|
self.io.cleanup(transaction_id)
|
|
|
- with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
|
|
|
- hints = msgpack.unpack(fd)
|
|
|
- if hints[b'version'] != 1:
|
|
|
- raise ValueError('Unknown hints file version: %d' % hints['version'])
|
|
|
- self.segments = hints[b'segments']
|
|
|
- self.compact = set(hints[b'compact'])
|
|
|
+ hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
|
|
|
+ index_path = os.path.join(self.path, 'index.%d' % transaction_id)
|
|
|
+ try:
|
|
|
+ with open(hints_path, 'rb') as fd:
|
|
|
+ hints = msgpack.unpack(fd)
|
|
|
+ except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
|
|
|
+ logger.warning('Repository hints file missing or corrupted, trying to recover')
|
|
|
+ if not isinstance(e, FileNotFoundError):
|
|
|
+ os.unlink(hints_path)
|
|
|
+ # index must exist at this point
|
|
|
+ os.unlink(index_path)
|
|
|
+ self.check_transaction()
|
|
|
+ self.prepare_txn(transaction_id)
|
|
|
+ return
|
|
|
+ except OSError as os_error:
|
|
|
+ raise InternalOSError(os_error) from None
|
|
|
+ if hints[b'version'] == 1:
|
|
|
+ logger.debug('Upgrading from v1 hints.%d', transaction_id)
|
|
|
+ self.segments = hints[b'segments']
|
|
|
+ self.compact = FreeSpace()
|
|
|
+ for segment in sorted(hints[b'compact']):
|
|
|
+ logger.debug('Rebuilding sparse info for segment %d', segment)
|
|
|
+ self._rebuild_sparse(segment)
|
|
|
+ logger.debug('Upgrade to v2 hints complete')
|
|
|
+ elif hints[b'version'] != 2:
|
|
|
+ raise ValueError('Unknown hints file version: %d' % hints[b'version'])
|
|
|
+ else:
|
|
|
+ self.segments = hints[b'segments']
|
|
|
+ self.compact = FreeSpace(hints[b'compact'])
|
|
|
|
|
|
def write_index(self):
|
|
|
- hints = {b'version': 1,
|
|
|
+ hints = {b'version': 2,
|
|
|
b'segments': self.segments,
|
|
|
- b'compact': list(self.compact)}
|
|
|
+ b'compact': self.compact}
|
|
|
transaction_id = self.io.get_segments_transaction_id()
|
|
|
hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
|
|
|
with open(hints_file + '.tmp', 'wb') as fd:
|
|
@@ -238,10 +326,10 @@ class Repository:
|
|
|
if self.append_only:
|
|
|
with open(os.path.join(self.path, 'transactions'), 'a') as log:
|
|
|
print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
|
|
|
- # Remove old indices
|
|
|
+ # Remove old auxiliary files
|
|
|
current = '.%d' % transaction_id
|
|
|
for name in os.listdir(self.path):
|
|
|
- if not name.startswith('index.') and not name.startswith('hints.'):
|
|
|
+ if not name.startswith(('index.', 'hints.')):
|
|
|
continue
|
|
|
if name.endswith(current):
|
|
|
continue
|
|
@@ -267,32 +355,40 @@ class Repository:
|
|
|
for segment in unused:
|
|
|
assert self.segments.pop(segment) == 0
|
|
|
self.io.delete_segment(segment)
|
|
|
+ del self.compact[segment]
|
|
|
unused = []
|
|
|
|
|
|
- for segment in sorted(self.compact):
|
|
|
- if self.io.segment_exists(segment):
|
|
|
- for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
|
|
|
- if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
|
|
|
+ for segment, freeable_space in sorted(self.compact.items()):
|
|
|
+ if not self.io.segment_exists(segment):
|
|
|
+ del self.compact[segment]
|
|
|
+ continue
|
|
|
+ segment_size = self.io.segment_size(segment)
|
|
|
+ if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
|
|
|
+ logger.debug('not compacting segment %d for later (only %d bytes are sparse)',
|
|
|
+ segment, freeable_space)
|
|
|
+ continue
|
|
|
+ segments.setdefault(segment, 0)
|
|
|
+ for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
|
|
|
+ if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
|
|
|
+ try:
|
|
|
+ new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
|
|
|
+ except LoggedIO.SegmentFull:
|
|
|
+ complete_xfer()
|
|
|
+ new_segment, offset = self.io.write_put(key, data)
|
|
|
+ self.index[key] = new_segment, offset
|
|
|
+ segments.setdefault(new_segment, 0)
|
|
|
+ segments[new_segment] += 1
|
|
|
+ segments[segment] -= 1
|
|
|
+ elif tag == TAG_DELETE:
|
|
|
+ if index_transaction_id is None or segment > index_transaction_id:
|
|
|
try:
|
|
|
- new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
|
|
|
+ self.io.write_delete(key, raise_full=save_space)
|
|
|
except LoggedIO.SegmentFull:
|
|
|
complete_xfer()
|
|
|
- new_segment, offset = self.io.write_put(key, data)
|
|
|
- self.index[key] = new_segment, offset
|
|
|
- segments.setdefault(new_segment, 0)
|
|
|
- segments[new_segment] += 1
|
|
|
- segments[segment] -= 1
|
|
|
- elif tag == TAG_DELETE:
|
|
|
- if index_transaction_id is None or segment > index_transaction_id:
|
|
|
- try:
|
|
|
- self.io.write_delete(key, raise_full=save_space)
|
|
|
- except LoggedIO.SegmentFull:
|
|
|
- complete_xfer()
|
|
|
- self.io.write_delete(key)
|
|
|
- assert segments[segment] == 0
|
|
|
- unused.append(segment)
|
|
|
+ self.io.write_delete(key)
|
|
|
+ assert segments[segment] == 0
|
|
|
+ unused.append(segment)
|
|
|
complete_xfer()
|
|
|
- self.compact = set()
|
|
|
|
|
|
def replay_segments(self, index_transaction_id, segments_transaction_id):
|
|
|
self.prepare_txn(index_transaction_id, do_cleanup=False)
|
|
@@ -315,11 +411,12 @@ class Repository:
|
|
|
def _update_index(self, segment, objects, report=None):
|
|
|
"""some code shared between replay_segments and check"""
|
|
|
self.segments[segment] = 0
|
|
|
- for tag, key, offset in objects:
|
|
|
+ for tag, key, offset, size in objects:
|
|
|
if tag == TAG_PUT:
|
|
|
try:
|
|
|
+ # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
|
|
|
s, _ = self.index[key]
|
|
|
- self.compact.add(s)
|
|
|
+ self.compact[s] += size
|
|
|
self.segments[s] -= 1
|
|
|
except KeyError:
|
|
|
pass
|
|
@@ -327,12 +424,17 @@ class Repository:
|
|
|
self.segments[segment] += 1
|
|
|
elif tag == TAG_DELETE:
|
|
|
try:
|
|
|
- s, _ = self.index.pop(key)
|
|
|
- self.segments[s] -= 1
|
|
|
- self.compact.add(s)
|
|
|
+ # if the deleted PUT is not in the index, there is nothing to clean up
|
|
|
+ s, offset = self.index.pop(key)
|
|
|
except KeyError:
|
|
|
pass
|
|
|
- self.compact.add(segment)
|
|
|
+ else:
|
|
|
+ if self.io.segment_exists(s):
|
|
|
+ # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
|
|
|
+ # is already gone, then it was already compacted.
|
|
|
+ self.segments[s] -= 1
|
|
|
+ size = self.io.read(s, offset, key, read_data=False)
|
|
|
+ self.compact[s] += size
|
|
|
elif tag == TAG_COMMIT:
|
|
|
continue
|
|
|
else:
|
|
@@ -342,7 +444,22 @@ class Repository:
|
|
|
else:
|
|
|
report(msg)
|
|
|
if self.segments[segment] == 0:
|
|
|
- self.compact.add(segment)
|
|
|
+ self.compact[segment] += self.io.segment_size(segment)
|
|
|
+
|
|
|
+ def _rebuild_sparse(self, segment):
|
|
|
+ """Rebuild sparse bytes count for a single segment relative to the current index."""
|
|
|
+ self.compact[segment] = 0
|
|
|
+ if self.segments[segment] == 0:
|
|
|
+ self.compact[segment] += self.io.segment_size(segment)
|
|
|
+ return
|
|
|
+ for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
|
|
|
+ if tag == TAG_PUT:
|
|
|
+ if self.index.get(key, (-1, -1)) != (segment, offset):
|
|
|
+ # This PUT is superseded later
|
|
|
+ self.compact[segment] += size
|
|
|
+ elif tag == TAG_DELETE:
|
|
|
+ # The outcome of the DELETE has been recorded in the PUT branch already
|
|
|
+ self.compact[segment] += size
|
|
|
|
|
|
def check(self, repair=False, save_space=False):
|
|
|
"""Check repository consistency
|
|
@@ -457,14 +574,16 @@ class Repository:
|
|
|
if not self._active_txn:
|
|
|
self.prepare_txn(self.get_transaction_id())
|
|
|
try:
|
|
|
- segment, _ = self.index[id]
|
|
|
- self.segments[segment] -= 1
|
|
|
- self.compact.add(segment)
|
|
|
- segment = self.io.write_delete(id)
|
|
|
- self.segments.setdefault(segment, 0)
|
|
|
- self.compact.add(segment)
|
|
|
+ segment, offset = self.index[id]
|
|
|
except KeyError:
|
|
|
pass
|
|
|
+ else:
|
|
|
+ self.segments[segment] -= 1
|
|
|
+ size = self.io.read(segment, offset, id, read_data=False)
|
|
|
+ self.compact[segment] += size
|
|
|
+ segment, size = self.io.write_delete(id)
|
|
|
+ self.compact[segment] += size
|
|
|
+ self.segments.setdefault(segment, 0)
|
|
|
segment, offset = self.io.write_put(id, data)
|
|
|
self.segments.setdefault(segment, 0)
|
|
|
self.segments[segment] += 1
|
|
@@ -478,9 +597,10 @@ class Repository:
|
|
|
except KeyError:
|
|
|
raise self.ObjectNotFound(id, self.path) from None
|
|
|
self.segments[segment] -= 1
|
|
|
- self.compact.add(segment)
|
|
|
- segment = self.io.write_delete(id)
|
|
|
- self.compact.add(segment)
|
|
|
+ size = self.io.read(segment, offset, id, read_data=False)
|
|
|
+ self.compact[segment] += size
|
|
|
+ segment, size = self.io.write_delete(id)
|
|
|
+ self.compact[segment] += size
|
|
|
self.segments.setdefault(segment, 0)
|
|
|
|
|
|
def preload(self, ids):
|
|
@@ -578,7 +698,7 @@ class LoggedIO:
|
|
|
seen_commit = False
|
|
|
while True:
|
|
|
try:
|
|
|
- tag, key, offset = next(iterator)
|
|
|
+ tag, key, offset, _ = next(iterator)
|
|
|
except IntegrityError:
|
|
|
return False
|
|
|
except StopIteration:
|
|
@@ -635,7 +755,18 @@ class LoggedIO:
|
|
|
def segment_exists(self, segment):
|
|
|
return os.path.exists(self.segment_filename(segment))
|
|
|
|
|
|
- def iter_objects(self, segment, include_data=False):
|
|
|
+ def segment_size(self, segment):
|
|
|
+ return os.path.getsize(self.segment_filename(segment))
|
|
|
+
|
|
|
+ def iter_objects(self, segment, include_data=False, read_data=True):
|
|
|
+ """
|
|
|
+ Return object iterator for *segment*.
|
|
|
+
|
|
|
+ If read_data is False then include_data must be False as well.
|
|
|
+ Integrity checks are skipped: all data obtained from the iterator must be considered informational.
|
|
|
+
|
|
|
+ The iterator returns four-tuples of (tag, key, offset, data|size).
|
|
|
+ """
|
|
|
fd = self.get_fd(segment)
|
|
|
fd.seek(0)
|
|
|
if fd.read(MAGIC_LEN) != MAGIC:
|
|
@@ -644,11 +775,12 @@ class LoggedIO:
|
|
|
header = fd.read(self.header_fmt.size)
|
|
|
while header:
|
|
|
size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
|
|
|
- (TAG_PUT, TAG_DELETE, TAG_COMMIT))
|
|
|
+ (TAG_PUT, TAG_DELETE, TAG_COMMIT),
|
|
|
+ read_data=read_data)
|
|
|
if include_data:
|
|
|
yield tag, key, offset, data
|
|
|
else:
|
|
|
- yield tag, key, offset
|
|
|
+ yield tag, key, offset, size
|
|
|
offset += size
|
|
|
header = fd.read(self.header_fmt.size)
|
|
|
|
|
@@ -672,19 +804,25 @@ class LoggedIO:
|
|
|
fd.write(data[:size])
|
|
|
data = data[size:]
|
|
|
|
|
|
- def read(self, segment, offset, id):
|
|
|
+ def read(self, segment, offset, id, read_data=True):
|
|
|
+ """
|
|
|
+ Read entry from *segment* at *offset* with *id*.
|
|
|
+
|
|
|
+ If read_data is False the size of the entry is returned instead and integrity checks are skipped.
|
|
|
+ The return value should thus be considered informational.
|
|
|
+ """
|
|
|
if segment == self.segment and self._write_fd:
|
|
|
self._write_fd.sync()
|
|
|
fd = self.get_fd(segment)
|
|
|
fd.seek(offset)
|
|
|
header = fd.read(self.put_header_fmt.size)
|
|
|
- size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ))
|
|
|
+ size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
|
|
|
if id != key:
|
|
|
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
|
|
|
segment, offset))
|
|
|
- return data
|
|
|
+ return data if read_data else size
|
|
|
|
|
|
- def _read(self, fd, fmt, header, segment, offset, acceptable_tags):
|
|
|
+ def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
|
|
|
# some code shared by read() and iter_objects()
|
|
|
try:
|
|
|
hdr_tuple = fmt.unpack(header)
|
|
@@ -702,18 +840,32 @@ class LoggedIO:
|
|
|
raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format(
|
|
|
segment, offset))
|
|
|
length = size - fmt.size
|
|
|
- data = fd.read(length)
|
|
|
- if len(data) != length:
|
|
|
- raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
|
|
|
- segment, offset, length, len(data)))
|
|
|
- if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
|
|
|
- raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
|
|
|
- segment, offset))
|
|
|
+ if read_data:
|
|
|
+ data = fd.read(length)
|
|
|
+ if len(data) != length:
|
|
|
+ raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
|
|
|
+ segment, offset, length, len(data)))
|
|
|
+ if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
|
|
|
+ raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
|
|
|
+ segment, offset))
|
|
|
+ if key is None and tag in (TAG_PUT, TAG_DELETE):
|
|
|
+ key, data = data[:32], data[32:]
|
|
|
+ else:
|
|
|
+ if key is None and tag in (TAG_PUT, TAG_DELETE):
|
|
|
+ key = fd.read(32)
|
|
|
+ length -= 32
|
|
|
+ if len(key) != 32:
|
|
|
+ raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
|
|
|
+ segment, offset, 32, len(key)))
|
|
|
+ oldpos = fd.tell()
|
|
|
+ seeked = fd.seek(length, os.SEEK_CUR) - oldpos
|
|
|
+ data = None
|
|
|
+ if seeked != length:
|
|
|
+ raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
|
|
|
+ segment, offset, length, seeked))
|
|
|
if tag not in acceptable_tags:
|
|
|
raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
|
|
|
segment, offset))
|
|
|
- if key is None and tag in (TAG_PUT, TAG_DELETE):
|
|
|
- key, data = data[:32], data[32:]
|
|
|
return size, tag, key, data
|
|
|
|
|
|
def write_put(self, id, data, raise_full=False):
|
|
@@ -732,11 +884,11 @@ class LoggedIO:
|
|
|
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
|
|
|
fd.write(b''.join((crc, header, id)))
|
|
|
self.offset += self.put_header_fmt.size
|
|
|
- return self.segment
|
|
|
+ return self.segment, self.put_header_fmt.size
|
|
|
|
|
|
def write_commit(self):
|
|
|
- fd = self.get_write_fd(no_new=True)
|
|
|
- fd.sync()
|
|
|
+ self.close_segment()
|
|
|
+ fd = self.get_write_fd()
|
|
|
header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
|
|
|
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
|
|
|
fd.write(b''.join((crc, header)))
|