|  | @@ -158,11 +158,11 @@ class Repository:
 | 
	
		
			
				|  |  |              self.lock.release()
 | 
	
		
			
				|  |  |              self.lock = None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def commit(self):
 | 
	
		
			
				|  |  | +    def commit(self, save_space=False):
 | 
	
		
			
				|  |  |          """Commit transaction
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          self.io.write_commit()
 | 
	
		
			
				|  |  | -        self.compact_segments()
 | 
	
		
			
				|  |  | +        self.compact_segments(save_space=save_space)
 | 
	
		
			
				|  |  |          self.write_index()
 | 
	
		
			
				|  |  |          self.rollback()
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -220,31 +220,50 @@ class Repository:
 | 
	
		
			
				|  |  |              os.unlink(os.path.join(self.path, name))
 | 
	
		
			
				|  |  |          self.index = None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def compact_segments(self):
 | 
	
		
			
				|  |  | +    def compact_segments(self, save_space=False):
 | 
	
		
			
				|  |  |          """Compact sparse segments by copying data into new segments
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          if not self.compact:
 | 
	
		
			
				|  |  |              return
 | 
	
		
			
				|  |  |          index_transaction_id = self.get_index_transaction_id()
 | 
	
		
			
				|  |  |          segments = self.segments
 | 
	
		
			
				|  |  | +        unused = []  # list of segments, that are not used anymore
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        def complete_xfer():
 | 
	
		
			
				|  |  | +            # complete the transfer (usually exactly when some target segment
 | 
	
		
			
				|  |  | +            # is full, or at the very end when everything is processed)
 | 
	
		
			
				|  |  | +            nonlocal unused
 | 
	
		
			
				|  |  | +            # commit the new, compact, used segments
 | 
	
		
			
				|  |  | +            self.io.write_commit()
 | 
	
		
			
				|  |  | +            # get rid of the old, sparse, unused segments. free space.
 | 
	
		
			
				|  |  | +            for segment in unused:
 | 
	
		
			
				|  |  | +                assert self.segments.pop(segment) == 0
 | 
	
		
			
				|  |  | +                self.io.delete_segment(segment)
 | 
	
		
			
				|  |  | +            unused = []
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          for segment in sorted(self.compact):
 | 
	
		
			
				|  |  |              if self.io.segment_exists(segment):
 | 
	
		
			
				|  |  |                  for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
 | 
	
		
			
				|  |  |                      if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
 | 
	
		
			
				|  |  | -                        new_segment, offset = self.io.write_put(key, data)
 | 
	
		
			
				|  |  | +                        try:
 | 
	
		
			
				|  |  | +                            new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
 | 
	
		
			
				|  |  | +                        except LoggedIO.SegmentFull:
 | 
	
		
			
				|  |  | +                            complete_xfer()
 | 
	
		
			
				|  |  | +                            new_segment, offset = self.io.write_put(key, data)
 | 
	
		
			
				|  |  |                          self.index[key] = new_segment, offset
 | 
	
		
			
				|  |  |                          segments.setdefault(new_segment, 0)
 | 
	
		
			
				|  |  |                          segments[new_segment] += 1
 | 
	
		
			
				|  |  |                          segments[segment] -= 1
 | 
	
		
			
				|  |  |                      elif tag == TAG_DELETE:
 | 
	
		
			
				|  |  |                          if index_transaction_id is None or segment > index_transaction_id:
 | 
	
		
			
				|  |  | -                            self.io.write_delete(key)
 | 
	
		
			
				|  |  | +                            try:
 | 
	
		
			
				|  |  | +                                self.io.write_delete(key, raise_full=save_space)
 | 
	
		
			
				|  |  | +                            except LoggedIO.SegmentFull:
 | 
	
		
			
				|  |  | +                                complete_xfer()
 | 
	
		
			
				|  |  | +                                self.io.write_delete(key)
 | 
	
		
			
				|  |  |                  assert segments[segment] == 0
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        self.io.write_commit()
 | 
	
		
			
				|  |  | -        for segment in sorted(self.compact):
 | 
	
		
			
				|  |  | -            assert self.segments.pop(segment) == 0
 | 
	
		
			
				|  |  | -            self.io.delete_segment(segment)
 | 
	
		
			
				|  |  | +                unused.append(segment)
 | 
	
		
			
				|  |  | +        complete_xfer()
 | 
	
		
			
				|  |  |          self.compact = set()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def replay_segments(self, index_transaction_id, segments_transaction_id):
 | 
	
	
		
			
				|  | @@ -297,7 +316,7 @@ class Repository:
 | 
	
		
			
				|  |  |          if self.segments[segment] == 0:
 | 
	
		
			
				|  |  |              self.compact.add(segment)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def check(self, repair=False):
 | 
	
		
			
				|  |  | +    def check(self, repair=False, save_space=False):
 | 
	
		
			
				|  |  |          """Check repository consistency
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          This method verifies all segment checksums and makes sure
 | 
	
	
		
			
				|  | @@ -358,7 +377,7 @@ class Repository:
 | 
	
		
			
				|  |  |                      if current_index.get(key, (-1, -1)) != value:
 | 
	
		
			
				|  |  |                          report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
 | 
	
		
			
				|  |  |          if repair:
 | 
	
		
			
				|  |  | -            self.compact_segments()
 | 
	
		
			
				|  |  | +            self.compact_segments(save_space=save_space)
 | 
	
		
			
				|  |  |              self.write_index()
 | 
	
		
			
				|  |  |          self.rollback()
 | 
	
		
			
				|  |  |          if error_found:
 | 
	
	
		
			
				|  | @@ -441,6 +460,9 @@ class Repository:
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class LoggedIO:
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    class SegmentFull(Exception):
 | 
	
		
			
				|  |  | +        """raised when a segment is full, before opening next"""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      header_fmt = struct.Struct('<IIB')
 | 
	
		
			
				|  |  |      assert header_fmt.size == 9
 | 
	
		
			
				|  |  |      put_header_fmt = struct.Struct('<IIB32s')
 | 
	
	
		
			
				|  | @@ -517,8 +539,10 @@ class LoggedIO:
 | 
	
		
			
				|  |  |      def segment_filename(self, segment):
 | 
	
		
			
				|  |  |          return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def get_write_fd(self, no_new=False):
 | 
	
		
			
				|  |  | +    def get_write_fd(self, no_new=False, raise_full=False):
 | 
	
		
			
				|  |  |          if not no_new and self.offset and self.offset > self.limit:
 | 
	
		
			
				|  |  | +            if raise_full:
 | 
	
		
			
				|  |  | +                raise self.SegmentFull
 | 
	
		
			
				|  |  |              self.close_segment()
 | 
	
		
			
				|  |  |          if not self._write_fd:
 | 
	
		
			
				|  |  |              if self.segment % self.segments_per_dir == 0:
 | 
	
	
		
			
				|  | @@ -630,9 +654,9 @@ class LoggedIO:
 | 
	
		
			
				|  |  |              key, data = data[:32], data[32:]
 | 
	
		
			
				|  |  |          return size, tag, key, data
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def write_put(self, id, data):
 | 
	
		
			
				|  |  | +    def write_put(self, id, data, raise_full=False):
 | 
	
		
			
				|  |  | +        fd = self.get_write_fd(raise_full=raise_full)
 | 
	
		
			
				|  |  |          size = len(data) + self.put_header_fmt.size
 | 
	
		
			
				|  |  | -        fd = self.get_write_fd()
 | 
	
		
			
				|  |  |          offset = self.offset
 | 
	
		
			
				|  |  |          header = self.header_no_crc_fmt.pack(size, TAG_PUT)
 | 
	
		
			
				|  |  |          crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
 | 
	
	
		
			
				|  | @@ -640,8 +664,8 @@ class LoggedIO:
 | 
	
		
			
				|  |  |          self.offset += size
 | 
	
		
			
				|  |  |          return self.segment, offset
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def write_delete(self, id):
 | 
	
		
			
				|  |  | -        fd = self.get_write_fd()
 | 
	
		
			
				|  |  | +    def write_delete(self, id, raise_full=False):
 | 
	
		
			
				|  |  | +        fd = self.get_write_fd(raise_full=raise_full)
 | 
	
		
			
				|  |  |          header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
 | 
	
		
			
				|  |  |          crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
 | 
	
		
			
				|  |  |          fd.write(b''.join((crc, header, id)))
 |