|
@@ -13,7 +13,7 @@ from functools import partial
|
|
from itertools import islice
|
|
from itertools import islice
|
|
|
|
|
|
from .constants import * # NOQA
|
|
from .constants import * # NOQA
|
|
-from .hashindex import NSIndex
|
|
|
|
|
|
+from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant
|
|
from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
|
|
from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
|
|
from .helpers import Location
|
|
from .helpers import Location
|
|
from .helpers import ProgressIndicatorPercent
|
|
from .helpers import ProgressIndicatorPercent
|
|
@@ -52,6 +52,18 @@ MAX_TAG_ID = 15
|
|
FreeSpace = partial(defaultdict, int)
|
|
FreeSpace = partial(defaultdict, int)
|
|
|
|
|
|
|
|
|
|
|
|
+def header_size(tag):
|
|
|
|
+ if tag == TAG_PUT2:
|
|
|
|
+ size = LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE
|
|
|
|
+ elif tag == TAG_PUT or tag == TAG_DELETE:
|
|
|
|
+ size = LoggedIO.HEADER_ID_SIZE
|
|
|
|
+ elif tag == TAG_COMMIT:
|
|
|
|
+ size = LoggedIO.header_fmt.size
|
|
|
|
+ else:
|
|
|
|
+ raise ValueError(f"unsupported tag: {tag!r}")
|
|
|
|
+ return size
|
|
|
|
+
|
|
|
|
+
|
|
class Repository:
|
|
class Repository:
|
|
"""
|
|
"""
|
|
Filesystem based transactional key value store
|
|
Filesystem based transactional key value store
|
|
@@ -525,10 +537,14 @@ class Repository:
|
|
if transaction_id is None:
|
|
if transaction_id is None:
|
|
return NSIndex()
|
|
return NSIndex()
|
|
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
|
|
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
|
|
|
|
+ variant = hashindex_variant(index_path)
|
|
integrity_data = self._read_integrity(transaction_id, 'index')
|
|
integrity_data = self._read_integrity(transaction_id, 'index')
|
|
try:
|
|
try:
|
|
with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
|
|
with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
|
|
- return NSIndex.read(fd)
|
|
|
|
|
|
+ if variant == 'k32_v16':
|
|
|
|
+ return NSIndex.read(fd)
|
|
|
|
+ if variant == 'k32_v8': # legacy
|
|
|
|
+ return NSIndex1.read(fd)
|
|
except (ValueError, OSError, FileIntegrityError) as exc:
|
|
except (ValueError, OSError, FileIntegrityError) as exc:
|
|
logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
|
|
logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
|
|
os.unlink(index_path)
|
|
os.unlink(index_path)
|
|
@@ -798,14 +814,14 @@ class Repository:
|
|
if tag == TAG_COMMIT:
|
|
if tag == TAG_COMMIT:
|
|
continue
|
|
continue
|
|
in_index = self.index.get(key)
|
|
in_index = self.index.get(key)
|
|
- is_index_object = in_index == (segment, offset)
|
|
|
|
|
|
+ is_index_object = in_index and (in_index.segment, in_index.offset) == (segment, offset)
|
|
if tag in (TAG_PUT2, TAG_PUT) and is_index_object:
|
|
if tag in (TAG_PUT2, TAG_PUT) and is_index_object:
|
|
try:
|
|
try:
|
|
new_segment, offset = self.io.write_put(key, data, raise_full=True)
|
|
new_segment, offset = self.io.write_put(key, data, raise_full=True)
|
|
except LoggedIO.SegmentFull:
|
|
except LoggedIO.SegmentFull:
|
|
complete_xfer()
|
|
complete_xfer()
|
|
new_segment, offset = self.io.write_put(key, data)
|
|
new_segment, offset = self.io.write_put(key, data)
|
|
- self.index[key] = new_segment, offset
|
|
|
|
|
|
+ self.index[key] = NSIndexEntry(new_segment, offset, len(data))
|
|
segments.setdefault(new_segment, 0)
|
|
segments.setdefault(new_segment, 0)
|
|
segments[new_segment] += 1
|
|
segments[new_segment] += 1
|
|
segments[segment] -= 1
|
|
segments[segment] -= 1
|
|
@@ -821,10 +837,7 @@ class Repository:
|
|
# do not remove entry with empty shadowed_segments list here,
|
|
# do not remove entry with empty shadowed_segments list here,
|
|
# it is needed for shadowed_put_exists code (see below)!
|
|
# it is needed for shadowed_put_exists code (see below)!
|
|
pass
|
|
pass
|
|
- if tag == TAG_PUT2:
|
|
|
|
- self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
|
|
|
|
- elif tag == TAG_PUT:
|
|
|
|
- self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
|
|
|
|
|
|
+ self.storage_quota_use -= header_size(tag) + len(data)
|
|
elif tag == TAG_DELETE and not in_index:
|
|
elif tag == TAG_DELETE and not in_index:
|
|
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
|
|
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
|
|
# therefore we do not drop the delete, but write it to a current segment.
|
|
# therefore we do not drop the delete, but write it to a current segment.
|
|
@@ -919,27 +932,26 @@ class Repository:
|
|
if tag in (TAG_PUT2, TAG_PUT):
|
|
if tag in (TAG_PUT2, TAG_PUT):
|
|
try:
|
|
try:
|
|
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
|
|
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
|
|
- s, _ = self.index[key]
|
|
|
|
- self.compact[s] += size
|
|
|
|
- self.segments[s] -= 1
|
|
|
|
|
|
+ in_index = self.index[key]
|
|
|
|
+ self.compact[in_index.segment] += header_size(tag) + size
|
|
|
|
+ self.segments[in_index.segment] -= 1
|
|
except KeyError:
|
|
except KeyError:
|
|
pass
|
|
pass
|
|
- self.index[key] = segment, offset
|
|
|
|
|
|
+ self.index[key] = NSIndexEntry(segment, offset, size)
|
|
self.segments[segment] += 1
|
|
self.segments[segment] += 1
|
|
- self.storage_quota_use += size # note: size already includes the put_header_fmt overhead
|
|
|
|
|
|
+ self.storage_quota_use += header_size(tag) + size
|
|
elif tag == TAG_DELETE:
|
|
elif tag == TAG_DELETE:
|
|
try:
|
|
try:
|
|
# if the deleted PUT is not in the index, there is nothing to clean up
|
|
# if the deleted PUT is not in the index, there is nothing to clean up
|
|
- s, offset = self.index.pop(key)
|
|
|
|
|
|
+ in_index = self.index.pop(key)
|
|
except KeyError:
|
|
except KeyError:
|
|
pass
|
|
pass
|
|
else:
|
|
else:
|
|
- if self.io.segment_exists(s):
|
|
|
|
|
|
+ if self.io.segment_exists(in_index.segment):
|
|
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
|
|
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
|
|
# is already gone, then it was already compacted.
|
|
# is already gone, then it was already compacted.
|
|
- self.segments[s] -= 1
|
|
|
|
- size = self.io.read(s, offset, key, read_data=False)
|
|
|
|
- self.compact[s] += size
|
|
|
|
|
|
+ self.segments[in_index.segment] -= 1
|
|
|
|
+ self.compact[in_index.segment] += header_size(tag) + in_index.size
|
|
elif tag == TAG_COMMIT:
|
|
elif tag == TAG_COMMIT:
|
|
continue
|
|
continue
|
|
else:
|
|
else:
|
|
@@ -968,12 +980,13 @@ class Repository:
|
|
self.compact[segment] = 0
|
|
self.compact[segment] = 0
|
|
for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
|
|
for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
|
|
if tag in (TAG_PUT2, TAG_PUT):
|
|
if tag in (TAG_PUT2, TAG_PUT):
|
|
- if self.index.get(key, (-1, -1)) != (segment, offset):
|
|
|
|
|
|
+ in_index = self.index.get(key)
|
|
|
|
+ if not in_index or (in_index.segment, in_index.offset) != (segment, offset):
|
|
# This PUT is superseded later
|
|
# This PUT is superseded later
|
|
- self.compact[segment] += size
|
|
|
|
|
|
+ self.compact[segment] += header_size(tag) + size
|
|
elif tag == TAG_DELETE:
|
|
elif tag == TAG_DELETE:
|
|
# The outcome of the DELETE has been recorded in the PUT branch already
|
|
# The outcome of the DELETE has been recorded in the PUT branch already
|
|
- self.compact[segment] += size
|
|
|
|
|
|
+ self.compact[segment] += header_size(tag) + size
|
|
|
|
|
|
def check(self, repair=False, save_space=False, max_duration=0):
|
|
def check(self, repair=False, save_space=False, max_duration=0):
|
|
"""Check repository consistency
|
|
"""Check repository consistency
|
|
@@ -1169,7 +1182,7 @@ class Repository:
|
|
self.index = self.open_index(transaction_id)
|
|
self.index = self.open_index(transaction_id)
|
|
at_start = marker is None
|
|
at_start = marker is None
|
|
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
|
|
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
|
|
- start_segment, start_offset = (0, 0) if at_start else self.index[marker]
|
|
|
|
|
|
+ start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker]
|
|
result = []
|
|
result = []
|
|
for segment, filename in self.io.segment_iterator(start_segment):
|
|
for segment, filename in self.io.segment_iterator(start_segment):
|
|
obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
|
|
obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
|
|
@@ -1186,19 +1199,21 @@ class Repository:
|
|
# also, for the next segment, we need to start at offset 0.
|
|
# also, for the next segment, we need to start at offset 0.
|
|
start_offset = 0
|
|
start_offset = 0
|
|
continue
|
|
continue
|
|
- if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id):
|
|
|
|
- # we have found an existing and current object
|
|
|
|
- result.append(id)
|
|
|
|
- if len(result) == limit:
|
|
|
|
- return result
|
|
|
|
|
|
+ if tag in (TAG_PUT2, TAG_PUT):
|
|
|
|
+ in_index = self.index.get(id)
|
|
|
|
+ if in_index and (in_index.segment, in_index.offset) == (segment, offset):
|
|
|
|
+ # we have found an existing and current object
|
|
|
|
+ result.append(id)
|
|
|
|
+ if len(result) == limit:
|
|
|
|
+ return result
|
|
return result
|
|
return result
|
|
|
|
|
|
def get(self, id):
|
|
def get(self, id):
|
|
if not self.index:
|
|
if not self.index:
|
|
self.index = self.open_index(self.get_transaction_id())
|
|
self.index = self.open_index(self.get_transaction_id())
|
|
try:
|
|
try:
|
|
- segment, offset = self.index[id]
|
|
|
|
- return self.io.read(segment, offset, id)
|
|
|
|
|
|
+ in_index = NSIndexEntry(*((self.index[id] + (None, ))[:3])) # legacy: index entriess have no size element
|
|
|
|
+ return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size)
|
|
except KeyError:
|
|
except KeyError:
|
|
raise self.ObjectNotFound(id, self.path) from None
|
|
raise self.ObjectNotFound(id, self.path) from None
|
|
|
|
|
|
@@ -1215,7 +1230,7 @@ class Repository:
|
|
if not self._active_txn:
|
|
if not self._active_txn:
|
|
self.prepare_txn(self.get_transaction_id())
|
|
self.prepare_txn(self.get_transaction_id())
|
|
try:
|
|
try:
|
|
- segment, offset = self.index[id]
|
|
|
|
|
|
+ in_index = self.index[id]
|
|
except KeyError:
|
|
except KeyError:
|
|
pass
|
|
pass
|
|
else:
|
|
else:
|
|
@@ -1223,12 +1238,12 @@ class Repository:
|
|
# we do not want to update the shadow_index here, because
|
|
# we do not want to update the shadow_index here, because
|
|
# we know already that we will PUT to this id, so it will
|
|
# we know already that we will PUT to this id, so it will
|
|
# be in the repo index (and we won't need it in the shadow_index).
|
|
# be in the repo index (and we won't need it in the shadow_index).
|
|
- self._delete(id, segment, offset, update_shadow_index=False)
|
|
|
|
|
|
+ self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=False)
|
|
segment, offset = self.io.write_put(id, data)
|
|
segment, offset = self.io.write_put(id, data)
|
|
- self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
|
|
|
|
|
|
+ self.storage_quota_use += header_size(TAG_PUT2) + len(data)
|
|
self.segments.setdefault(segment, 0)
|
|
self.segments.setdefault(segment, 0)
|
|
self.segments[segment] += 1
|
|
self.segments[segment] += 1
|
|
- self.index[id] = segment, offset
|
|
|
|
|
|
+ self.index[id] = NSIndexEntry(segment, offset, len(data))
|
|
if self.storage_quota and self.storage_quota_use > self.storage_quota:
|
|
if self.storage_quota and self.storage_quota_use > self.storage_quota:
|
|
self.transaction_doomed = self.StorageQuotaExceeded(
|
|
self.transaction_doomed = self.StorageQuotaExceeded(
|
|
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
|
|
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
|
|
@@ -1243,22 +1258,21 @@ class Repository:
|
|
if not self._active_txn:
|
|
if not self._active_txn:
|
|
self.prepare_txn(self.get_transaction_id())
|
|
self.prepare_txn(self.get_transaction_id())
|
|
try:
|
|
try:
|
|
- segment, offset = self.index.pop(id)
|
|
|
|
|
|
+ in_index = self.index.pop(id)
|
|
except KeyError:
|
|
except KeyError:
|
|
raise self.ObjectNotFound(id, self.path) from None
|
|
raise self.ObjectNotFound(id, self.path) from None
|
|
# if we get here, there is an object with this id in the repo,
|
|
# if we get here, there is an object with this id in the repo,
|
|
# we write a DEL here that shadows the respective PUT.
|
|
# we write a DEL here that shadows the respective PUT.
|
|
# after the delete, the object is not in the repo index any more,
|
|
# after the delete, the object is not in the repo index any more,
|
|
# for the compaction code, we need to update the shadow_index in this case.
|
|
# for the compaction code, we need to update the shadow_index in this case.
|
|
- self._delete(id, segment, offset, update_shadow_index=True)
|
|
|
|
|
|
+ self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=True)
|
|
|
|
|
|
- def _delete(self, id, segment, offset, *, update_shadow_index):
|
|
|
|
|
|
+ def _delete(self, id, segment, offset, size, *, update_shadow_index):
|
|
# common code used by put and delete
|
|
# common code used by put and delete
|
|
if update_shadow_index:
|
|
if update_shadow_index:
|
|
self.shadow_index.setdefault(id, []).append(segment)
|
|
self.shadow_index.setdefault(id, []).append(segment)
|
|
self.segments[segment] -= 1
|
|
self.segments[segment] -= 1
|
|
- size = self.io.read(segment, offset, id, read_data=False)
|
|
|
|
- self.compact[segment] += size
|
|
|
|
|
|
+ self.compact[segment] += header_size(TAG_PUT2) + size
|
|
segment, size = self.io.write_delete(id)
|
|
segment, size = self.io.write_delete(id)
|
|
self.compact[segment] += size
|
|
self.compact[segment] += size
|
|
self.segments.setdefault(segment, 0)
|
|
self.segments.setdefault(segment, 0)
|
|
@@ -1448,6 +1462,9 @@ class LoggedIO:
|
|
del self.fds[k]
|
|
del self.fds[k]
|
|
|
|
|
|
clean_old()
|
|
clean_old()
|
|
|
|
+ if self._write_fd is not None:
|
|
|
|
+ # without this, we have a test failure now
|
|
|
|
+ self._write_fd.sync()
|
|
try:
|
|
try:
|
|
ts, fd = self.fds[segment]
|
|
ts, fd = self.fds[segment]
|
|
except KeyError:
|
|
except KeyError:
|
|
@@ -1515,7 +1532,8 @@ class LoggedIO:
|
|
if include_data:
|
|
if include_data:
|
|
yield tag, key, offset, data
|
|
yield tag, key, offset, data
|
|
else:
|
|
else:
|
|
- yield tag, key, offset, size
|
|
|
|
|
|
+ yield tag, key, offset, size - header_size(tag) # corresponds to len(data)
|
|
|
|
+ assert size >= 0
|
|
offset += size
|
|
offset += size
|
|
# we must get the fd via get_fd() here again as we yielded to our caller and it might
|
|
# we must get the fd via get_fd() here again as we yielded to our caller and it might
|
|
# have triggered closing of the fd we had before (e.g. by calling io.read() for
|
|
# have triggered closing of the fd we had before (e.g. by calling io.read() for
|
|
@@ -1580,7 +1598,7 @@ class LoggedIO:
|
|
h.update(d)
|
|
h.update(d)
|
|
return h.digest()
|
|
return h.digest()
|
|
|
|
|
|
- def read(self, segment, offset, id, read_data=True):
|
|
|
|
|
|
+ def read(self, segment, offset, id, read_data=True, *, expected_size=None):
|
|
"""
|
|
"""
|
|
Read entry from *segment* at *offset* with *id*.
|
|
Read entry from *segment* at *offset* with *id*.
|
|
If read_data is False the size of the entry is returned instead.
|
|
If read_data is False the size of the entry is returned instead.
|
|
@@ -1596,7 +1614,11 @@ class LoggedIO:
|
|
if id != key:
|
|
if id != key:
|
|
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
|
|
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
|
|
segment, offset))
|
|
segment, offset))
|
|
- return data if read_data else size
|
|
|
|
|
|
+ data_size_from_header = size - header_size(tag)
|
|
|
|
+ if expected_size is not None and expected_size != data_size_from_header:
|
|
|
|
+ raise IntegrityError(f'size from repository index: {expected_size} != '
|
|
|
|
+ f'size from entry header: {data_size_from_header}')
|
|
|
|
+ return data if read_data else data_size_from_header
|
|
|
|
|
|
def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
|
|
def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
|
|
"""
|
|
"""
|