|
@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
|
|
|
|
|
|
from .constants import * # NOQA
|
|
|
from .hashindex import NSIndex
|
|
|
-from .helpers import Error, ErrorWithTraceback, IntegrityError
|
|
|
+from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
|
|
|
from .helpers import Location
|
|
|
from .helpers import ProgressIndicatorPercent
|
|
|
from .helpers import bin_to_hex
|
|
@@ -101,6 +101,9 @@ class Repository:
|
|
|
id = bin_to_hex(id)
|
|
|
super().__init__(id, repo)
|
|
|
|
|
|
+ class InsufficientFreeSpaceError(Error):
|
|
|
+ """Insufficient free space to complete transaction (required: {}, available: {})."""
|
|
|
+
|
|
|
def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False):
|
|
|
self.path = os.path.abspath(path)
|
|
|
self._location = Location('file://%s' % self.path)
|
|
@@ -136,8 +139,10 @@ class Repository:
|
|
|
# EIO or FS corruption ensues, which is why we specifically check for ENOSPC.
|
|
|
if self._active_txn and no_space_left_on_device:
|
|
|
logger.warning('No space left on device, cleaning up partial transaction to free space.')
|
|
|
- self.io.cleanup(self.io.get_segments_transaction_id())
|
|
|
- self.rollback()
|
|
|
+ cleanup = True
|
|
|
+ else:
|
|
|
+ cleanup = False
|
|
|
+ self.rollback(cleanup)
|
|
|
self.close()
|
|
|
|
|
|
@property
|
|
@@ -160,6 +165,7 @@ class Repository:
|
|
|
config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
|
|
|
config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
|
|
|
config.set('repository', 'append_only', str(int(self.append_only)))
|
|
|
+ config.set('repository', 'additional_free_space', '0')
|
|
|
config.set('repository', 'id', bin_to_hex(os.urandom(32)))
|
|
|
self.save_config(path, config)
|
|
|
|
|
@@ -231,6 +237,7 @@ class Repository:
|
|
|
raise self.InvalidRepository(path)
|
|
|
self.max_segment_size = self.config.getint('repository', 'max_segment_size')
|
|
|
self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
|
|
|
+ self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0))
|
|
|
# append_only can be set in the constructor
|
|
|
# it shouldn't be overridden (True -> False) here
|
|
|
self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
|
|
@@ -248,9 +255,11 @@ class Repository:
|
|
|
def commit(self, save_space=False):
|
|
|
"""Commit transaction
|
|
|
"""
|
|
|
+ # save_space is not used anymore, but stays for RPC/API compatibility.
|
|
|
+ self.check_free_space()
|
|
|
self.io.write_commit()
|
|
|
if not self.append_only:
|
|
|
- self.compact_segments(save_space=save_space)
|
|
|
+ self.compact_segments()
|
|
|
self.write_index()
|
|
|
self.rollback()
|
|
|
|
|
@@ -348,7 +357,45 @@ class Repository:
|
|
|
os.unlink(os.path.join(self.path, name))
|
|
|
self.index = None
|
|
|
|
|
|
- def compact_segments(self, save_space=False):
|
|
|
+ def check_free_space(self):
|
|
|
+ """Pre-commit check for sufficient free space to actually perform the commit."""
|
|
|
+ # As a baseline we take four times the current (on-disk) index size.
|
|
|
+ # At this point the index may only be updated by compaction, which won't resize it.
|
|
|
+ # We still apply a factor of four so that a later, separate invocation can free space
|
|
|
+ # (journaling all deletes for all chunks is one index size) or still make minor additions
|
|
|
+ # (which may grow the index up to twice it's current size).
|
|
|
+ # Note that in a subsequent operation the committed index is still on-disk, therefore we
|
|
|
+ # arrive at index_size * (1 + 2 + 1).
|
|
|
+ # In that order: journaled deletes (1), hashtable growth (2), persisted index (1).
|
|
|
+ required_free_space = self.index.size() * 4
|
|
|
+
|
|
|
+ # Conservatively estimate hints file size:
|
|
|
+ # 10 bytes for each segment-refcount pair, 10 bytes for each segment-space pair
|
|
|
+ # Assume maximum of 5 bytes per integer. Segment numbers will usually be packed more densely (1-3 bytes),
|
|
|
+ # as will refcounts and free space integers. For 5 MiB segments this estimate is good to ~20 PB repo size.
|
|
|
+ # Add 4K to generously account for constant format overhead.
|
|
|
+ hints_size = len(self.segments) * 10 + len(self.compact) * 10 + 4096
|
|
|
+ required_free_space += hints_size
|
|
|
+
|
|
|
+ required_free_space += self.additional_free_space
|
|
|
+ if not self.append_only:
|
|
|
+ # Keep one full worst-case segment free in non-append-only mode
|
|
|
+ required_free_space += self.max_segment_size + MAX_OBJECT_SIZE
|
|
|
+ try:
|
|
|
+ st_vfs = os.statvfs(self.path)
|
|
|
+ except OSError as os_error:
|
|
|
+ logger.warning('Failed to check free space before committing: ' + str(os_error))
|
|
|
+ return
|
|
|
+ # f_bavail: even as root - don't touch the Federal Block Reserve!
|
|
|
+ free_space = st_vfs.f_bavail * st_vfs.f_bsize
|
|
|
+ logger.debug('check_free_space: required bytes {}, free bytes {}'.format(required_free_space, free_space))
|
|
|
+ if free_space < required_free_space:
|
|
|
+ self.rollback(cleanup=True)
|
|
|
+ formatted_required = format_file_size(required_free_space)
|
|
|
+ formatted_free = format_file_size(free_space)
|
|
|
+ raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
|
|
|
+
|
|
|
+ def compact_segments(self):
|
|
|
"""Compact sparse segments by copying data into new segments
|
|
|
"""
|
|
|
if not self.compact:
|
|
@@ -357,12 +404,11 @@ class Repository:
|
|
|
segments = self.segments
|
|
|
unused = [] # list of segments, that are not used anymore
|
|
|
|
|
|
- def complete_xfer():
|
|
|
- # complete the transfer (usually exactly when some target segment
|
|
|
- # is full, or at the very end when everything is processed)
|
|
|
+ def complete_xfer(intermediate=True):
|
|
|
+ # complete the current transfer (when some target segment is full)
|
|
|
nonlocal unused
|
|
|
# commit the new, compact, used segments
|
|
|
- self.io.write_commit()
|
|
|
+ self.io.write_commit(intermediate=intermediate)
|
|
|
# get rid of the old, sparse, unused segments. free space.
|
|
|
for segment in unused:
|
|
|
assert self.segments.pop(segment) == 0
|
|
@@ -383,7 +429,7 @@ class Repository:
|
|
|
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
|
|
|
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
|
|
|
try:
|
|
|
- new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
|
|
|
+ new_segment, offset = self.io.write_put(key, data, raise_full=True)
|
|
|
except LoggedIO.SegmentFull:
|
|
|
complete_xfer()
|
|
|
new_segment, offset = self.io.write_put(key, data)
|
|
@@ -394,13 +440,13 @@ class Repository:
|
|
|
elif tag == TAG_DELETE:
|
|
|
if index_transaction_id is None or segment > index_transaction_id:
|
|
|
try:
|
|
|
- self.io.write_delete(key, raise_full=save_space)
|
|
|
+ self.io.write_delete(key, raise_full=True)
|
|
|
except LoggedIO.SegmentFull:
|
|
|
complete_xfer()
|
|
|
self.io.write_delete(key)
|
|
|
assert segments[segment] == 0
|
|
|
unused.append(segment)
|
|
|
- complete_xfer()
|
|
|
+ complete_xfer(intermediate=False)
|
|
|
|
|
|
def replay_segments(self, index_transaction_id, segments_transaction_id):
|
|
|
self.prepare_txn(index_transaction_id, do_cleanup=False)
|
|
@@ -536,7 +582,7 @@ class Repository:
|
|
|
if current_index.get(key, (-1, -1)) != value:
|
|
|
report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
|
|
|
if repair:
|
|
|
- self.compact_segments(save_space=save_space)
|
|
|
+ self.compact_segments()
|
|
|
self.write_index()
|
|
|
self.rollback()
|
|
|
if error_found:
|
|
@@ -548,9 +594,11 @@ class Repository:
|
|
|
logger.info('Completed repository check, no problems found.')
|
|
|
return not error_found or repair
|
|
|
|
|
|
- def rollback(self):
|
|
|
+ def rollback(self, cleanup=False):
|
|
|
"""
|
|
|
"""
|
|
|
+ if cleanup:
|
|
|
+ self.io.cleanup(self.io.get_segments_transaction_id())
|
|
|
self.index = None
|
|
|
self._active_txn = False
|
|
|
|
|
@@ -898,9 +946,15 @@ class LoggedIO:
|
|
|
self.offset += self.put_header_fmt.size
|
|
|
return self.segment, self.put_header_fmt.size
|
|
|
|
|
|
- def write_commit(self):
|
|
|
- self.close_segment()
|
|
|
- fd = self.get_write_fd()
|
|
|
+ def write_commit(self, intermediate=False):
|
|
|
+ if intermediate:
|
|
|
+ # Intermediate commits go directly into the current segment - this makes checking their validity more
|
|
|
+ # expensive, but is faster and reduces clobber.
|
|
|
+ fd = self.get_write_fd()
|
|
|
+ fd.sync()
|
|
|
+ else:
|
|
|
+ self.close_segment()
|
|
|
+ fd = self.get_write_fd()
|
|
|
header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
|
|
|
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
|
|
|
fd.write(b''.join((crc, header)))
|