|
@@ -21,6 +21,7 @@ from .helpers import ProgressIndicatorPercent
|
|
from .helpers import bin_to_hex
|
|
from .helpers import bin_to_hex
|
|
from .helpers import hostname_is_unique
|
|
from .helpers import hostname_is_unique
|
|
from .helpers import secure_erase, truncate_and_unlink
|
|
from .helpers import secure_erase, truncate_and_unlink
|
|
|
|
+from .helpers import Manifest
|
|
from .locking import Lock, LockError, LockErrorT
|
|
from .locking import Lock, LockError, LockErrorT
|
|
from .logger import create_logger
|
|
from .logger import create_logger
|
|
from .lrucache import LRUCache
|
|
from .lrucache import LRUCache
|
|
@@ -1068,6 +1069,10 @@ class Repository:
|
|
"""
|
|
"""
|
|
if not self._active_txn:
|
|
if not self._active_txn:
|
|
self.prepare_txn(self.get_transaction_id())
|
|
self.prepare_txn(self.get_transaction_id())
|
|
|
|
+ # specialcase deleting / writing the manifest to be in a separate, new segment file,
|
|
|
|
+ # so that when we supersede and compact it later, less segment data has to be shuffled around -
|
|
|
|
+ # compaction can then just delete this segment file and that's all.
|
|
|
|
+ start_new_segment = id == Manifest.MANIFEST_ID
|
|
try:
|
|
try:
|
|
segment, offset = self.index[id]
|
|
segment, offset = self.index[id]
|
|
except KeyError:
|
|
except KeyError:
|
|
@@ -1077,10 +1082,11 @@ class Repository:
|
|
size = self.io.read(segment, offset, id, read_data=False)
|
|
size = self.io.read(segment, offset, id, read_data=False)
|
|
self.storage_quota_use -= size
|
|
self.storage_quota_use -= size
|
|
self.compact[segment] += size
|
|
self.compact[segment] += size
|
|
- segment, size = self.io.write_delete(id)
|
|
|
|
|
|
+ segment, size = self.io.write_delete(id, start_new=start_new_segment)
|
|
|
|
+ start_new_segment = False # already started a new one
|
|
self.compact[segment] += size
|
|
self.compact[segment] += size
|
|
self.segments.setdefault(segment, 0)
|
|
self.segments.setdefault(segment, 0)
|
|
- segment, offset = self.io.write_put(id, data)
|
|
|
|
|
|
+ segment, offset = self.io.write_put(id, data, start_new=start_new_segment)
|
|
self.storage_quota_use += len(data) + self.io.put_header_fmt.size
|
|
self.storage_quota_use += len(data) + self.io.put_header_fmt.size
|
|
self.segments.setdefault(segment, 0)
|
|
self.segments.setdefault(segment, 0)
|
|
self.segments[segment] += 1
|
|
self.segments[segment] += 1
|
|
@@ -1250,11 +1256,13 @@ class LoggedIO:
|
|
def segment_filename(self, segment):
|
|
def segment_filename(self, segment):
|
|
return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
|
|
return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
|
|
|
|
|
|
- def get_write_fd(self, no_new=False, raise_full=False):
|
|
|
|
|
|
+ def get_write_fd(self, no_new=False, raise_full=False, start_new=False):
|
|
if not no_new and self.offset and self.offset > self.limit:
|
|
if not no_new and self.offset and self.offset > self.limit:
|
|
if raise_full:
|
|
if raise_full:
|
|
raise self.SegmentFull
|
|
raise self.SegmentFull
|
|
self.close_segment()
|
|
self.close_segment()
|
|
|
|
+ if start_new:
|
|
|
|
+ self.close_segment()
|
|
if not self._write_fd:
|
|
if not self._write_fd:
|
|
if self.segment % self.segments_per_dir == 0:
|
|
if self.segment % self.segments_per_dir == 0:
|
|
dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
|
|
dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
|
|
@@ -1458,12 +1466,12 @@ class LoggedIO:
|
|
segment, offset))
|
|
segment, offset))
|
|
return size, tag, key, data
|
|
return size, tag, key, data
|
|
|
|
|
|
- def write_put(self, id, data, raise_full=False):
|
|
|
|
|
|
+ def write_put(self, id, data, raise_full=False, start_new=False):
|
|
data_size = len(data)
|
|
data_size = len(data)
|
|
if data_size > MAX_DATA_SIZE:
|
|
if data_size > MAX_DATA_SIZE:
|
|
# this would push the segment entry size beyond MAX_OBJECT_SIZE.
|
|
# this would push the segment entry size beyond MAX_OBJECT_SIZE.
|
|
raise IntegrityError('More than allowed put data [{} > {}]'.format(data_size, MAX_DATA_SIZE))
|
|
raise IntegrityError('More than allowed put data [{} > {}]'.format(data_size, MAX_DATA_SIZE))
|
|
- fd = self.get_write_fd(raise_full=raise_full)
|
|
|
|
|
|
+ fd = self.get_write_fd(raise_full=raise_full, start_new=start_new)
|
|
size = data_size + self.put_header_fmt.size
|
|
size = data_size + self.put_header_fmt.size
|
|
offset = self.offset
|
|
offset = self.offset
|
|
header = self.header_no_crc_fmt.pack(size, TAG_PUT)
|
|
header = self.header_no_crc_fmt.pack(size, TAG_PUT)
|
|
@@ -1472,8 +1480,8 @@ class LoggedIO:
|
|
self.offset += size
|
|
self.offset += size
|
|
return self.segment, offset
|
|
return self.segment, offset
|
|
|
|
|
|
- def write_delete(self, id, raise_full=False):
|
|
|
|
- fd = self.get_write_fd(raise_full=raise_full)
|
|
|
|
|
|
+ def write_delete(self, id, raise_full=False, start_new=False):
|
|
|
|
+ fd = self.get_write_fd(raise_full=raise_full, start_new=start_new)
|
|
header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
|
|
header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
|
|
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
|
|
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
|
|
fd.write(b''.join((crc, header, id)))
|
|
fd.write(b''.join((crc, header, id)))
|