|  | @@ -9,14 +9,18 @@ logger = logging.getLogger(__name__)
 | 
	
		
			
				|  |  |  import os
 | 
	
		
			
				|  |  |  import shutil
 | 
	
		
			
				|  |  |  import struct
 | 
	
		
			
				|  |  | +from collections import defaultdict
 | 
	
		
			
				|  |  | +from functools import partial
 | 
	
		
			
				|  |  |  from zlib import crc32
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import msgpack
 | 
	
		
			
				|  |  |  from .constants import *  # NOQA
 | 
	
		
			
				|  |  | -from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent, bin_to_hex
 | 
	
		
			
				|  |  | +from .helpers import Error, ErrorWithTraceback, IntegrityError, InternalOSError, Location, ProgressIndicatorPercent, \
 | 
	
		
			
				|  |  | +    bin_to_hex
 | 
	
		
			
				|  |  |  from .hashindex import NSIndex
 | 
	
		
			
				|  |  |  from .locking import UpgradableLock, LockError, LockErrorT
 | 
	
		
			
				|  |  |  from .lrucache import LRUCache
 | 
	
		
			
				|  |  | +from .platform import SyncFile, sync_dir
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  MAX_OBJECT_SIZE = 20 * 1024 * 1024
 | 
	
		
			
				|  |  |  MAGIC = b'BORG_SEG'
 | 
	
	
		
			
				|  | @@ -25,17 +29,55 @@ TAG_PUT = 0
 | 
	
		
			
				|  |  |  TAG_DELETE = 1
 | 
	
		
			
				|  |  |  TAG_COMMIT = 2
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +FreeSpace = partial(defaultdict, int)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class Repository:
 | 
	
		
			
				|  |  | -    """Filesystem based transactional key value store
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    Filesystem based transactional key value store
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
 | 
	
		
			
				|  |  | +    called segments. Each segment is a series of log entries. The segment number together with the offset of each
 | 
	
		
			
				|  |  | +    entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
 | 
	
		
			
				|  |  | +    time for the purposes of the log.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Log entries are either PUT, DELETE or COMMIT.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
 | 
	
		
			
				|  |  | +    segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
 | 
	
		
			
				|  |  | +    is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    When reading from a repository it is first checked whether the last segment is committed. If it is not, then
 | 
	
		
			
				|  |  | +    all segments after the last committed segment are deleted; they contain log entries whose consistency is not
 | 
	
		
			
				|  |  | +    established by a COMMIT.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
 | 
	
		
			
				|  |  | +    the platform (including the hardware). See platform_base.SyncFile for details.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
 | 
	
		
			
				|  |  | +    full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    A DELETE marks a key as deleted.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    For a given key only the last entry regarding the key, which is called current (all other entries are called
 | 
	
		
			
				|  |  | +    superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
 | 
	
		
			
				|  |  | +    Otherwise the last PUT defines the value of the key.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
 | 
	
		
			
				|  |  | +    such obsolete entries is called sparse, while a segment containing no such entries is called compact.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
 | 
	
		
			
				|  |  | +    superseded entries where current.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      On disk layout:
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      dir/README
 | 
	
		
			
				|  |  |      dir/config
 | 
	
		
			
				|  |  | -    dir/data/<X / SEGMENTS_PER_DIR>/<X>
 | 
	
		
			
				|  |  | +    dir/data/<X // SEGMENTS_PER_DIR>/<X>
 | 
	
		
			
				|  |  |      dir/index.X
 | 
	
		
			
				|  |  |      dir/hints.X
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      class DoesNotExist(Error):
 | 
	
		
			
				|  |  |          """Repository {} does not exist."""
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -137,7 +179,7 @@ class Repository:
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  |              return None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def get_transaction_id(self):
 | 
	
		
			
				|  |  | +    def check_transaction(self):
 | 
	
		
			
				|  |  |          index_transaction_id = self.get_index_transaction_id()
 | 
	
		
			
				|  |  |          segments_transaction_id = self.io.get_segments_transaction_id()
 | 
	
		
			
				|  |  |          if index_transaction_id is not None and segments_transaction_id is None:
 | 
	
	
		
			
				|  | @@ -150,6 +192,9 @@ class Repository:
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  |                  replay_from = index_transaction_id
 | 
	
		
			
				|  |  |              self.replay_segments(replay_from, segments_transaction_id)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def get_transaction_id(self):
 | 
	
		
			
				|  |  | +        self.check_transaction()
 | 
	
		
			
				|  |  |          return self.get_index_transaction_id()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def break_lock(self):
 | 
	
	
		
			
				|  | @@ -190,10 +235,27 @@ class Repository:
 | 
	
		
			
				|  |  |          self.write_index()
 | 
	
		
			
				|  |  |          self.rollback()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def open_index(self, transaction_id):
 | 
	
		
			
				|  |  | +    def open_index(self, transaction_id, auto_recover=True):
 | 
	
		
			
				|  |  |          if transaction_id is None:
 | 
	
		
			
				|  |  |              return NSIndex()
 | 
	
		
			
				|  |  | -        return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'))
 | 
	
		
			
				|  |  | +        index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
 | 
	
		
			
				|  |  | +        try:
 | 
	
		
			
				|  |  | +            return NSIndex.read(index_path)
 | 
	
		
			
				|  |  | +        except RuntimeError as error:
 | 
	
		
			
				|  |  | +            assert str(error) == 'hashindex_read failed'  # everything else means we're in *deep* trouble
 | 
	
		
			
				|  |  | +            logger.warning('Repository index missing or corrupted, trying to recover')
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                os.unlink(index_path)
 | 
	
		
			
				|  |  | +            except OSError as e:
 | 
	
		
			
				|  |  | +                raise InternalOSError(e) from None
 | 
	
		
			
				|  |  | +            if not auto_recover:
 | 
	
		
			
				|  |  | +                raise
 | 
	
		
			
				|  |  | +            self.prepare_txn(self.get_transaction_id())
 | 
	
		
			
				|  |  | +            # don't leave an open transaction around
 | 
	
		
			
				|  |  | +            self.commit()
 | 
	
		
			
				|  |  | +            return self.open_index(self.get_transaction_id())
 | 
	
		
			
				|  |  | +        except OSError as e:
 | 
	
		
			
				|  |  | +            raise InternalOSError(e) from None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def prepare_txn(self, transaction_id, do_cleanup=True):
 | 
	
		
			
				|  |  |          self._active_txn = True
 | 
	
	
		
			
				|  | @@ -206,24 +268,51 @@ class Repository:
 | 
	
		
			
				|  |  |              self._active_txn = False
 | 
	
		
			
				|  |  |              raise
 | 
	
		
			
				|  |  |          if not self.index or transaction_id is None:
 | 
	
		
			
				|  |  | -            self.index = self.open_index(transaction_id)
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                self.index = self.open_index(transaction_id, False)
 | 
	
		
			
				|  |  | +            except RuntimeError:
 | 
	
		
			
				|  |  | +                self.check_transaction()
 | 
	
		
			
				|  |  | +                self.index = self.open_index(transaction_id, False)
 | 
	
		
			
				|  |  |          if transaction_id is None:
 | 
	
		
			
				|  |  |              self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
 | 
	
		
			
				|  |  | -            self.compact = set()  # XXX bad name: segments_needing_compaction = self.compact
 | 
	
		
			
				|  |  | +            self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  |              if do_cleanup:
 | 
	
		
			
				|  |  |                  self.io.cleanup(transaction_id)
 | 
	
		
			
				|  |  | -            with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
 | 
	
		
			
				|  |  | -                hints = msgpack.unpack(fd)
 | 
	
		
			
				|  |  | -            if hints[b'version'] != 1:
 | 
	
		
			
				|  |  | -                raise ValueError('Unknown hints file version: %d' % hints['version'])
 | 
	
		
			
				|  |  | -            self.segments = hints[b'segments']
 | 
	
		
			
				|  |  | -            self.compact = set(hints[b'compact'])
 | 
	
		
			
				|  |  | +            hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
 | 
	
		
			
				|  |  | +            index_path = os.path.join(self.path, 'index.%d' % transaction_id)
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                with open(hints_path, 'rb') as fd:
 | 
	
		
			
				|  |  | +                    hints = msgpack.unpack(fd)
 | 
	
		
			
				|  |  | +            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
 | 
	
		
			
				|  |  | +                logger.warning('Repository hints file missing or corrupted, trying to recover')
 | 
	
		
			
				|  |  | +                if not isinstance(e, FileNotFoundError):
 | 
	
		
			
				|  |  | +                    os.unlink(hints_path)
 | 
	
		
			
				|  |  | +                # index must exist at this point
 | 
	
		
			
				|  |  | +                os.unlink(index_path)
 | 
	
		
			
				|  |  | +                self.check_transaction()
 | 
	
		
			
				|  |  | +                self.prepare_txn(transaction_id)
 | 
	
		
			
				|  |  | +                return
 | 
	
		
			
				|  |  | +            except OSError as os_error:
 | 
	
		
			
				|  |  | +                raise InternalOSError(os_error) from None
 | 
	
		
			
				|  |  | +            if hints[b'version'] == 1:
 | 
	
		
			
				|  |  | +                logger.debug('Upgrading from v1 hints.%d', transaction_id)
 | 
	
		
			
				|  |  | +                self.segments = hints[b'segments']
 | 
	
		
			
				|  |  | +                self.compact = FreeSpace()
 | 
	
		
			
				|  |  | +                for segment in sorted(hints[b'compact']):
 | 
	
		
			
				|  |  | +                    logger.debug('Rebuilding sparse info for segment %d', segment)
 | 
	
		
			
				|  |  | +                    self._rebuild_sparse(segment)
 | 
	
		
			
				|  |  | +                logger.debug('Upgrade to v2 hints complete')
 | 
	
		
			
				|  |  | +            elif hints[b'version'] != 2:
 | 
	
		
			
				|  |  | +                raise ValueError('Unknown hints file version: %d' % hints[b'version'])
 | 
	
		
			
				|  |  | +            else:
 | 
	
		
			
				|  |  | +                self.segments = hints[b'segments']
 | 
	
		
			
				|  |  | +                self.compact = FreeSpace(hints[b'compact'])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def write_index(self):
 | 
	
		
			
				|  |  | -        hints = {b'version': 1,
 | 
	
		
			
				|  |  | +        hints = {b'version': 2,
 | 
	
		
			
				|  |  |                   b'segments': self.segments,
 | 
	
		
			
				|  |  | -                 b'compact': list(self.compact)}
 | 
	
		
			
				|  |  | +                 b'compact': self.compact}
 | 
	
		
			
				|  |  |          transaction_id = self.io.get_segments_transaction_id()
 | 
	
		
			
				|  |  |          hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
 | 
	
		
			
				|  |  |          with open(hints_file + '.tmp', 'wb') as fd:
 | 
	
	
		
			
				|  | @@ -237,10 +326,10 @@ class Repository:
 | 
	
		
			
				|  |  |          if self.append_only:
 | 
	
		
			
				|  |  |              with open(os.path.join(self.path, 'transactions'), 'a') as log:
 | 
	
		
			
				|  |  |                  print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
 | 
	
		
			
				|  |  | -        # Remove old indices
 | 
	
		
			
				|  |  | +        # Remove old auxiliary files
 | 
	
		
			
				|  |  |          current = '.%d' % transaction_id
 | 
	
		
			
				|  |  |          for name in os.listdir(self.path):
 | 
	
		
			
				|  |  | -            if not name.startswith('index.') and not name.startswith('hints.'):
 | 
	
		
			
				|  |  | +            if not name.startswith(('index.', 'hints.')):
 | 
	
		
			
				|  |  |                  continue
 | 
	
		
			
				|  |  |              if name.endswith(current):
 | 
	
		
			
				|  |  |                  continue
 | 
	
	
		
			
				|  | @@ -266,32 +355,40 @@ class Repository:
 | 
	
		
			
				|  |  |              for segment in unused:
 | 
	
		
			
				|  |  |                  assert self.segments.pop(segment) == 0
 | 
	
		
			
				|  |  |                  self.io.delete_segment(segment)
 | 
	
		
			
				|  |  | +                del self.compact[segment]
 | 
	
		
			
				|  |  |              unused = []
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        for segment in sorted(self.compact):
 | 
	
		
			
				|  |  | -            if self.io.segment_exists(segment):
 | 
	
		
			
				|  |  | -                for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
 | 
	
		
			
				|  |  | -                    if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
 | 
	
		
			
				|  |  | +        for segment, freeable_space in sorted(self.compact.items()):
 | 
	
		
			
				|  |  | +            if not self.io.segment_exists(segment):
 | 
	
		
			
				|  |  | +                del self.compact[segment]
 | 
	
		
			
				|  |  | +                continue
 | 
	
		
			
				|  |  | +            segment_size = self.io.segment_size(segment)
 | 
	
		
			
				|  |  | +            if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
 | 
	
		
			
				|  |  | +                logger.debug('not compacting segment %d for later (only %d bytes are sparse)',
 | 
	
		
			
				|  |  | +                             segment, freeable_space)
 | 
	
		
			
				|  |  | +                continue
 | 
	
		
			
				|  |  | +            segments.setdefault(segment, 0)
 | 
	
		
			
				|  |  | +            for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
 | 
	
		
			
				|  |  | +                if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
 | 
	
		
			
				|  |  | +                    try:
 | 
	
		
			
				|  |  | +                        new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
 | 
	
		
			
				|  |  | +                    except LoggedIO.SegmentFull:
 | 
	
		
			
				|  |  | +                        complete_xfer()
 | 
	
		
			
				|  |  | +                        new_segment, offset = self.io.write_put(key, data)
 | 
	
		
			
				|  |  | +                    self.index[key] = new_segment, offset
 | 
	
		
			
				|  |  | +                    segments.setdefault(new_segment, 0)
 | 
	
		
			
				|  |  | +                    segments[new_segment] += 1
 | 
	
		
			
				|  |  | +                    segments[segment] -= 1
 | 
	
		
			
				|  |  | +                elif tag == TAG_DELETE:
 | 
	
		
			
				|  |  | +                    if index_transaction_id is None or segment > index_transaction_id:
 | 
	
		
			
				|  |  |                          try:
 | 
	
		
			
				|  |  | -                            new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
 | 
	
		
			
				|  |  | +                            self.io.write_delete(key, raise_full=save_space)
 | 
	
		
			
				|  |  |                          except LoggedIO.SegmentFull:
 | 
	
		
			
				|  |  |                              complete_xfer()
 | 
	
		
			
				|  |  | -                            new_segment, offset = self.io.write_put(key, data)
 | 
	
		
			
				|  |  | -                        self.index[key] = new_segment, offset
 | 
	
		
			
				|  |  | -                        segments.setdefault(new_segment, 0)
 | 
	
		
			
				|  |  | -                        segments[new_segment] += 1
 | 
	
		
			
				|  |  | -                        segments[segment] -= 1
 | 
	
		
			
				|  |  | -                    elif tag == TAG_DELETE:
 | 
	
		
			
				|  |  | -                        if index_transaction_id is None or segment > index_transaction_id:
 | 
	
		
			
				|  |  | -                            try:
 | 
	
		
			
				|  |  | -                                self.io.write_delete(key, raise_full=save_space)
 | 
	
		
			
				|  |  | -                            except LoggedIO.SegmentFull:
 | 
	
		
			
				|  |  | -                                complete_xfer()
 | 
	
		
			
				|  |  | -                                self.io.write_delete(key)
 | 
	
		
			
				|  |  | -                assert segments[segment] == 0
 | 
	
		
			
				|  |  | -                unused.append(segment)
 | 
	
		
			
				|  |  | +                            self.io.write_delete(key)
 | 
	
		
			
				|  |  | +            assert segments[segment] == 0
 | 
	
		
			
				|  |  | +            unused.append(segment)
 | 
	
		
			
				|  |  |          complete_xfer()
 | 
	
		
			
				|  |  | -        self.compact = set()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def replay_segments(self, index_transaction_id, segments_transaction_id):
 | 
	
		
			
				|  |  |          self.prepare_txn(index_transaction_id, do_cleanup=False)
 | 
	
	
		
			
				|  | @@ -314,11 +411,12 @@ class Repository:
 | 
	
		
			
				|  |  |      def _update_index(self, segment, objects, report=None):
 | 
	
		
			
				|  |  |          """some code shared between replay_segments and check"""
 | 
	
		
			
				|  |  |          self.segments[segment] = 0
 | 
	
		
			
				|  |  | -        for tag, key, offset in objects:
 | 
	
		
			
				|  |  | +        for tag, key, offset, size in objects:
 | 
	
		
			
				|  |  |              if tag == TAG_PUT:
 | 
	
		
			
				|  |  |                  try:
 | 
	
		
			
				|  |  | +                    # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
 | 
	
		
			
				|  |  |                      s, _ = self.index[key]
 | 
	
		
			
				|  |  | -                    self.compact.add(s)
 | 
	
		
			
				|  |  | +                    self.compact[s] += size
 | 
	
		
			
				|  |  |                      self.segments[s] -= 1
 | 
	
		
			
				|  |  |                  except KeyError:
 | 
	
		
			
				|  |  |                      pass
 | 
	
	
		
			
				|  | @@ -326,12 +424,17 @@ class Repository:
 | 
	
		
			
				|  |  |                  self.segments[segment] += 1
 | 
	
		
			
				|  |  |              elif tag == TAG_DELETE:
 | 
	
		
			
				|  |  |                  try:
 | 
	
		
			
				|  |  | -                    s, _ = self.index.pop(key)
 | 
	
		
			
				|  |  | -                    self.segments[s] -= 1
 | 
	
		
			
				|  |  | -                    self.compact.add(s)
 | 
	
		
			
				|  |  | +                    # if the deleted PUT is not in the index, there is nothing to clean up
 | 
	
		
			
				|  |  | +                    s, offset = self.index.pop(key)
 | 
	
		
			
				|  |  |                  except KeyError:
 | 
	
		
			
				|  |  |                      pass
 | 
	
		
			
				|  |  | -                self.compact.add(segment)
 | 
	
		
			
				|  |  | +                else:
 | 
	
		
			
				|  |  | +                    if self.io.segment_exists(s):
 | 
	
		
			
				|  |  | +                        # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
 | 
	
		
			
				|  |  | +                        # is already gone, then it was already compacted.
 | 
	
		
			
				|  |  | +                        self.segments[s] -= 1
 | 
	
		
			
				|  |  | +                        size = self.io.read(s, offset, key, read_data=False)
 | 
	
		
			
				|  |  | +                        self.compact[s] += size
 | 
	
		
			
				|  |  |              elif tag == TAG_COMMIT:
 | 
	
		
			
				|  |  |                  continue
 | 
	
		
			
				|  |  |              else:
 | 
	
	
		
			
				|  | @@ -341,7 +444,22 @@ class Repository:
 | 
	
		
			
				|  |  |                  else:
 | 
	
		
			
				|  |  |                      report(msg)
 | 
	
		
			
				|  |  |          if self.segments[segment] == 0:
 | 
	
		
			
				|  |  | -            self.compact.add(segment)
 | 
	
		
			
				|  |  | +            self.compact[segment] += self.io.segment_size(segment)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _rebuild_sparse(self, segment):
 | 
	
		
			
				|  |  | +        """Rebuild sparse bytes count for a single segment relative to the current index."""
 | 
	
		
			
				|  |  | +        self.compact[segment] = 0
 | 
	
		
			
				|  |  | +        if self.segments[segment] == 0:
 | 
	
		
			
				|  |  | +            self.compact[segment] += self.io.segment_size(segment)
 | 
	
		
			
				|  |  | +            return
 | 
	
		
			
				|  |  | +        for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
 | 
	
		
			
				|  |  | +            if tag == TAG_PUT:
 | 
	
		
			
				|  |  | +                if self.index.get(key, (-1, -1)) != (segment, offset):
 | 
	
		
			
				|  |  | +                    # This PUT is superseded later
 | 
	
		
			
				|  |  | +                    self.compact[segment] += size
 | 
	
		
			
				|  |  | +            elif tag == TAG_DELETE:
 | 
	
		
			
				|  |  | +                # The outcome of the DELETE has been recorded in the PUT branch already
 | 
	
		
			
				|  |  | +                self.compact[segment] += size
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def check(self, repair=False, save_space=False):
 | 
	
		
			
				|  |  |          """Check repository consistency
 | 
	
	
		
			
				|  | @@ -456,14 +574,16 @@ class Repository:
 | 
	
		
			
				|  |  |          if not self._active_txn:
 | 
	
		
			
				|  |  |              self.prepare_txn(self.get_transaction_id())
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  | -            segment, _ = self.index[id]
 | 
	
		
			
				|  |  | -            self.segments[segment] -= 1
 | 
	
		
			
				|  |  | -            self.compact.add(segment)
 | 
	
		
			
				|  |  | -            segment = self.io.write_delete(id)
 | 
	
		
			
				|  |  | -            self.segments.setdefault(segment, 0)
 | 
	
		
			
				|  |  | -            self.compact.add(segment)
 | 
	
		
			
				|  |  | +            segment, offset = self.index[id]
 | 
	
		
			
				|  |  |          except KeyError:
 | 
	
		
			
				|  |  |              pass
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            self.segments[segment] -= 1
 | 
	
		
			
				|  |  | +            size = self.io.read(segment, offset, id, read_data=False)
 | 
	
		
			
				|  |  | +            self.compact[segment] += size
 | 
	
		
			
				|  |  | +            segment, size = self.io.write_delete(id)
 | 
	
		
			
				|  |  | +            self.compact[segment] += size
 | 
	
		
			
				|  |  | +            self.segments.setdefault(segment, 0)
 | 
	
		
			
				|  |  |          segment, offset = self.io.write_put(id, data)
 | 
	
		
			
				|  |  |          self.segments.setdefault(segment, 0)
 | 
	
		
			
				|  |  |          self.segments[segment] += 1
 | 
	
	
		
			
				|  | @@ -477,9 +597,10 @@ class Repository:
 | 
	
		
			
				|  |  |          except KeyError:
 | 
	
		
			
				|  |  |              raise self.ObjectNotFound(id, self.path) from None
 | 
	
		
			
				|  |  |          self.segments[segment] -= 1
 | 
	
		
			
				|  |  | -        self.compact.add(segment)
 | 
	
		
			
				|  |  | -        segment = self.io.write_delete(id)
 | 
	
		
			
				|  |  | -        self.compact.add(segment)
 | 
	
		
			
				|  |  | +        size = self.io.read(segment, offset, id, read_data=False)
 | 
	
		
			
				|  |  | +        self.compact[segment] += size
 | 
	
		
			
				|  |  | +        segment, size = self.io.write_delete(id)
 | 
	
		
			
				|  |  | +        self.compact[segment] += size
 | 
	
		
			
				|  |  |          self.segments.setdefault(segment, 0)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def preload(self, ids):
 | 
	
	
		
			
				|  | @@ -507,7 +628,7 @@ class LoggedIO:
 | 
	
		
			
				|  |  |      def __init__(self, path, limit, segments_per_dir, capacity=90):
 | 
	
		
			
				|  |  |          self.path = path
 | 
	
		
			
				|  |  |          self.fds = LRUCache(capacity,
 | 
	
		
			
				|  |  | -                            dispose=lambda fd: fd.close())
 | 
	
		
			
				|  |  | +                            dispose=self.close_fd)
 | 
	
		
			
				|  |  |          self.segment = 0
 | 
	
		
			
				|  |  |          self.limit = limit
 | 
	
		
			
				|  |  |          self.segments_per_dir = segments_per_dir
 | 
	
	
		
			
				|  | @@ -519,6 +640,11 @@ class LoggedIO:
 | 
	
		
			
				|  |  |          self.fds.clear()
 | 
	
		
			
				|  |  |          self.fds = None  # Just to make sure we're disabled
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    def close_fd(self, fd):
 | 
	
		
			
				|  |  | +        if hasattr(os, 'posix_fadvise'):  # only on UNIX
 | 
	
		
			
				|  |  | +            os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
 | 
	
		
			
				|  |  | +        fd.close()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def segment_iterator(self, reverse=False):
 | 
	
		
			
				|  |  |          data_path = os.path.join(self.path, 'data')
 | 
	
		
			
				|  |  |          dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
 | 
	
	
		
			
				|  | @@ -535,10 +661,10 @@ class LoggedIO:
 | 
	
		
			
				|  |  |          return None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def get_segments_transaction_id(self):
 | 
	
		
			
				|  |  | -        """Verify that the transaction id is consistent with the index transaction id
 | 
	
		
			
				|  |  | +        """Return the last committed segment.
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          for segment, filename in self.segment_iterator(reverse=True):
 | 
	
		
			
				|  |  | -            if self.is_committed_segment(filename):
 | 
	
		
			
				|  |  | +            if self.is_committed_segment(segment):
 | 
	
		
			
				|  |  |                  return segment
 | 
	
		
			
				|  |  |          return None
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -552,10 +678,14 @@ class LoggedIO:
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  |                  break
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def is_committed_segment(self, filename):
 | 
	
		
			
				|  |  | +    def is_committed_segment(self, segment):
 | 
	
		
			
				|  |  |          """Check if segment ends with a COMMIT_TAG tag
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  | -        with open(filename, 'rb') as fd:
 | 
	
		
			
				|  |  | +        try:
 | 
	
		
			
				|  |  | +            iterator = self.iter_objects(segment)
 | 
	
		
			
				|  |  | +        except IntegrityError:
 | 
	
		
			
				|  |  | +            return False
 | 
	
		
			
				|  |  | +        with open(self.segment_filename(segment), 'rb') as fd:
 | 
	
		
			
				|  |  |              try:
 | 
	
		
			
				|  |  |                  fd.seek(-self.header_fmt.size, os.SEEK_END)
 | 
	
		
			
				|  |  |              except OSError as e:
 | 
	
	
		
			
				|  | @@ -563,7 +693,22 @@ class LoggedIO:
 | 
	
		
			
				|  |  |                  if e.errno == errno.EINVAL:
 | 
	
		
			
				|  |  |                      return False
 | 
	
		
			
				|  |  |                  raise e
 | 
	
		
			
				|  |  | -            return fd.read(self.header_fmt.size) == self.COMMIT
 | 
	
		
			
				|  |  | +            if fd.read(self.header_fmt.size) != self.COMMIT:
 | 
	
		
			
				|  |  | +                return False
 | 
	
		
			
				|  |  | +        seen_commit = False
 | 
	
		
			
				|  |  | +        while True:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                tag, key, offset, _ = next(iterator)
 | 
	
		
			
				|  |  | +            except IntegrityError:
 | 
	
		
			
				|  |  | +                return False
 | 
	
		
			
				|  |  | +            except StopIteration:
 | 
	
		
			
				|  |  | +                break
 | 
	
		
			
				|  |  | +            if tag == TAG_COMMIT:
 | 
	
		
			
				|  |  | +                seen_commit = True
 | 
	
		
			
				|  |  | +                continue
 | 
	
		
			
				|  |  | +            if seen_commit:
 | 
	
		
			
				|  |  | +                return False
 | 
	
		
			
				|  |  | +        return seen_commit
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def segment_filename(self, segment):
 | 
	
		
			
				|  |  |          return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
 | 
	
	
		
			
				|  | @@ -578,7 +723,8 @@ class LoggedIO:
 | 
	
		
			
				|  |  |                  dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
 | 
	
		
			
				|  |  |                  if not os.path.exists(dirname):
 | 
	
		
			
				|  |  |                      os.mkdir(dirname)
 | 
	
		
			
				|  |  | -            self._write_fd = open(self.segment_filename(self.segment), 'ab')
 | 
	
		
			
				|  |  | +                    sync_dir(os.path.join(self.path, 'data'))
 | 
	
		
			
				|  |  | +            self._write_fd = SyncFile(self.segment_filename(self.segment))
 | 
	
		
			
				|  |  |              self._write_fd.write(MAGIC)
 | 
	
		
			
				|  |  |              self.offset = MAGIC_LEN
 | 
	
		
			
				|  |  |          return self._write_fd
 | 
	
	
		
			
				|  | @@ -591,6 +737,13 @@ class LoggedIO:
 | 
	
		
			
				|  |  |              self.fds[segment] = fd
 | 
	
		
			
				|  |  |              return fd
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    def close_segment(self):
 | 
	
		
			
				|  |  | +        if self._write_fd:
 | 
	
		
			
				|  |  | +            self.segment += 1
 | 
	
		
			
				|  |  | +            self.offset = 0
 | 
	
		
			
				|  |  | +            self._write_fd.close()
 | 
	
		
			
				|  |  | +            self._write_fd = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def delete_segment(self, segment):
 | 
	
		
			
				|  |  |          if segment in self.fds:
 | 
	
		
			
				|  |  |              del self.fds[segment]
 | 
	
	
		
			
				|  | @@ -602,7 +755,18 @@ class LoggedIO:
 | 
	
		
			
				|  |  |      def segment_exists(self, segment):
 | 
	
		
			
				|  |  |          return os.path.exists(self.segment_filename(segment))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def iter_objects(self, segment, include_data=False):
 | 
	
		
			
				|  |  | +    def segment_size(self, segment):
 | 
	
		
			
				|  |  | +        return os.path.getsize(self.segment_filename(segment))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def iter_objects(self, segment, include_data=False, read_data=True):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        Return object iterator for *segment*.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        If read_data is False then include_data must be False as well.
 | 
	
		
			
				|  |  | +        Integrity checks are skipped: all data obtained from the iterator must be considered informational.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        The iterator returns four-tuples of (tag, key, offset, data|size).
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  |          fd = self.get_fd(segment)
 | 
	
		
			
				|  |  |          fd.seek(0)
 | 
	
		
			
				|  |  |          if fd.read(MAGIC_LEN) != MAGIC:
 | 
	
	
		
			
				|  | @@ -611,11 +775,12 @@ class LoggedIO:
 | 
	
		
			
				|  |  |          header = fd.read(self.header_fmt.size)
 | 
	
		
			
				|  |  |          while header:
 | 
	
		
			
				|  |  |              size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
 | 
	
		
			
				|  |  | -                                              (TAG_PUT, TAG_DELETE, TAG_COMMIT))
 | 
	
		
			
				|  |  | +                                              (TAG_PUT, TAG_DELETE, TAG_COMMIT),
 | 
	
		
			
				|  |  | +                                              read_data=read_data)
 | 
	
		
			
				|  |  |              if include_data:
 | 
	
		
			
				|  |  |                  yield tag, key, offset, data
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  | -                yield tag, key, offset
 | 
	
		
			
				|  |  | +                yield tag, key, offset, size
 | 
	
		
			
				|  |  |              offset += size
 | 
	
		
			
				|  |  |              header = fd.read(self.header_fmt.size)
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -639,19 +804,25 @@ class LoggedIO:
 | 
	
		
			
				|  |  |                  fd.write(data[:size])
 | 
	
		
			
				|  |  |                  data = data[size:]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def read(self, segment, offset, id):
 | 
	
		
			
				|  |  | +    def read(self, segment, offset, id, read_data=True):
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        Read entry from *segment* at *offset* with *id*.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        If read_data is False the size of the entry is returned instead and integrity checks are skipped.
 | 
	
		
			
				|  |  | +        The return value should thus be considered informational.
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  |          if segment == self.segment and self._write_fd:
 | 
	
		
			
				|  |  | -            self._write_fd.flush()
 | 
	
		
			
				|  |  | +            self._write_fd.sync()
 | 
	
		
			
				|  |  |          fd = self.get_fd(segment)
 | 
	
		
			
				|  |  |          fd.seek(offset)
 | 
	
		
			
				|  |  |          header = fd.read(self.put_header_fmt.size)
 | 
	
		
			
				|  |  | -        size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ))
 | 
	
		
			
				|  |  | +        size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
 | 
	
		
			
				|  |  |          if id != key:
 | 
	
		
			
				|  |  |              raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
 | 
	
		
			
				|  |  |                  segment, offset))
 | 
	
		
			
				|  |  | -        return data
 | 
	
		
			
				|  |  | +        return data if read_data else size
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def _read(self, fd, fmt, header, segment, offset, acceptable_tags):
 | 
	
		
			
				|  |  | +    def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
 | 
	
		
			
				|  |  |          # some code shared by read() and iter_objects()
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  |              hdr_tuple = fmt.unpack(header)
 | 
	
	
		
			
				|  | @@ -669,18 +840,32 @@ class LoggedIO:
 | 
	
		
			
				|  |  |              raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format(
 | 
	
		
			
				|  |  |                  segment, offset))
 | 
	
		
			
				|  |  |          length = size - fmt.size
 | 
	
		
			
				|  |  | -        data = fd.read(length)
 | 
	
		
			
				|  |  | -        if len(data) != length:
 | 
	
		
			
				|  |  | -            raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
 | 
	
		
			
				|  |  | -                segment, offset, length, len(data)))
 | 
	
		
			
				|  |  | -        if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
 | 
	
		
			
				|  |  | -            raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
 | 
	
		
			
				|  |  | -                segment, offset))
 | 
	
		
			
				|  |  | +        if read_data:
 | 
	
		
			
				|  |  | +            data = fd.read(length)
 | 
	
		
			
				|  |  | +            if len(data) != length:
 | 
	
		
			
				|  |  | +                raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
 | 
	
		
			
				|  |  | +                    segment, offset, length, len(data)))
 | 
	
		
			
				|  |  | +            if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
 | 
	
		
			
				|  |  | +                raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
 | 
	
		
			
				|  |  | +                    segment, offset))
 | 
	
		
			
				|  |  | +            if key is None and tag in (TAG_PUT, TAG_DELETE):
 | 
	
		
			
				|  |  | +                key, data = data[:32], data[32:]
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            if key is None and tag in (TAG_PUT, TAG_DELETE):
 | 
	
		
			
				|  |  | +                key = fd.read(32)
 | 
	
		
			
				|  |  | +                length -= 32
 | 
	
		
			
				|  |  | +                if len(key) != 32:
 | 
	
		
			
				|  |  | +                    raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
 | 
	
		
			
				|  |  | +                        segment, offset, 32, len(key)))
 | 
	
		
			
				|  |  | +            oldpos = fd.tell()
 | 
	
		
			
				|  |  | +            seeked = fd.seek(length, os.SEEK_CUR) - oldpos
 | 
	
		
			
				|  |  | +            data = None
 | 
	
		
			
				|  |  | +            if seeked != length:
 | 
	
		
			
				|  |  | +                raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
 | 
	
		
			
				|  |  | +                        segment, offset, length, seeked))
 | 
	
		
			
				|  |  |          if tag not in acceptable_tags:
 | 
	
		
			
				|  |  |              raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
 | 
	
		
			
				|  |  |                  segment, offset))
 | 
	
		
			
				|  |  | -        if key is None and tag in (TAG_PUT, TAG_DELETE):
 | 
	
		
			
				|  |  | -            key, data = data[:32], data[32:]
 | 
	
		
			
				|  |  |          return size, tag, key, data
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def write_put(self, id, data, raise_full=False):
 | 
	
	
		
			
				|  | @@ -699,24 +884,12 @@ class LoggedIO:
 | 
	
		
			
				|  |  |          crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
 | 
	
		
			
				|  |  |          fd.write(b''.join((crc, header, id)))
 | 
	
		
			
				|  |  |          self.offset += self.put_header_fmt.size
 | 
	
		
			
				|  |  | -        return self.segment
 | 
	
		
			
				|  |  | +        return self.segment, self.put_header_fmt.size
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def write_commit(self):
 | 
	
		
			
				|  |  | -        fd = self.get_write_fd(no_new=True)
 | 
	
		
			
				|  |  | +        self.close_segment()
 | 
	
		
			
				|  |  | +        fd = self.get_write_fd()
 | 
	
		
			
				|  |  |          header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
 | 
	
		
			
				|  |  |          crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
 | 
	
		
			
				|  |  |          fd.write(b''.join((crc, header)))
 | 
	
		
			
				|  |  |          self.close_segment()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def close_segment(self):
 | 
	
		
			
				|  |  | -        if self._write_fd:
 | 
	
		
			
				|  |  | -            self.segment += 1
 | 
	
		
			
				|  |  | -            self.offset = 0
 | 
	
		
			
				|  |  | -            self._write_fd.flush()
 | 
	
		
			
				|  |  | -            os.fsync(self._write_fd.fileno())
 | 
	
		
			
				|  |  | -            if hasattr(os, 'posix_fadvise'):  # only on UNIX
 | 
	
		
			
				|  |  | -                # tell the OS that it does not need to cache what we just wrote,
 | 
	
		
			
				|  |  | -                # avoids spoiling the cache for the OS and other processes.
 | 
	
		
			
				|  |  | -                os.posix_fadvise(self._write_fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
 | 
	
		
			
				|  |  | -            self._write_fd.close()
 | 
	
		
			
				|  |  | -            self._write_fd = None
 |