|
@@ -43,8 +43,9 @@ class ItemCache:
|
|
|
and retrieves items from these inode numbers.
|
|
|
"""
|
|
|
|
|
|
- # Approximately ~230000 items (depends on the average number of items per metadata chunk)
|
|
|
- # Since growing a bytearray has to copy it, growing it will converge to o(n), however,
|
|
|
+ # 2 MiB are approximately ~230000 items (depends on the average number of items per metadata chunk).
|
|
|
+ #
|
|
|
+ # Since growing a bytearray has to copy it, growing it will converge to O(n^2), however,
|
|
|
# this is not yet relevant due to the swiftness of copying memory. If it becomes an issue,
|
|
|
# use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need
|
|
|
# to resize it in the first place; that's free).
|
|
@@ -58,32 +59,35 @@ class ItemCache:
|
|
|
# self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
|
|
|
# It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
|
|
|
# unices did this).
|
|
|
- # The meta-array contains chunk IDs and item entries (described in inode_for_current_item).
|
|
|
+ # The meta-array contains chunk IDs and item entries (described in iter_archive_items).
|
|
|
# The chunk IDs are referenced by item entries through relative offsets,
|
|
|
# which are bounded by the metadata chunk size.
|
|
|
self.meta = bytearray()
|
|
|
# The current write offset in self.meta
|
|
|
self.write_offset = 0
|
|
|
|
|
|
- # Offset added to meta-indices, resulting in an inode,
|
|
|
- # or substracted from inodes, resulting in a meta-indices.
|
|
|
+ # Offset added to meta-indices, resulting in inodes,
|
|
|
+ # or subtracted from inodes, resulting in meta-indices.
|
|
|
+ # XXX: Merge FuseOperations.items and ItemCache to avoid
|
|
|
+ # this implicit limitation / hack (on the number of synthetic inodes, degenerate
|
|
|
+ # cases can inflate their number far beyond the number of archives).
|
|
|
self.offset = 1000000
|
|
|
|
|
|
# A temporary file that contains direct items, i.e. items directly cached in this layer.
|
|
|
# These are items that span more than one chunk and thus cannot be efficiently cached
|
|
|
# by the object cache (self.decrypted_repository), which would require variable-length structures;
|
|
|
- # possible but not worth the effort, see inode_for_current_item.
|
|
|
+ # possible but not worth the effort, see iter_archive_items.
|
|
|
self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
|
|
|
|
|
|
# A small LRU cache for chunks requested by ItemCache.get() from the object cache,
|
|
|
# this significantly speeds up directory traversal and similar operations which
|
|
|
# tend to re-read the same chunks over and over.
|
|
|
# The capacity is kept low because increasing it does not provide any significant advantage,
|
|
|
- # but makes LRUCache's square behaviour noticeable as well as consuming some memory.
|
|
|
+ # but makes LRUCache's square behaviour noticeable and consumes more memory.
|
|
|
self.chunks = LRUCache(capacity=10, dispose=lambda _: None)
|
|
|
|
|
|
# Instrumentation
|
|
|
- # Count of indirect items, i.e. data is cached in the object cache, in this cache
|
|
|
+ # Count of indirect items, i.e. data is cached in the object cache, not directly in this cache
|
|
|
self.indirect_items = 0
|
|
|
# Count of direct items, i.e. data is in self.fd
|
|
|
self.direct_items = 0
|
|
@@ -92,16 +96,11 @@ class ItemCache:
|
|
|
offset = inode - self.offset
|
|
|
if offset < 0:
|
|
|
raise ValueError('ItemCache.get() called with an invalid inode number')
|
|
|
- if self.meta[offset] == ord(b'S'):
|
|
|
- fd_offset = int.from_bytes(self.meta[offset + 1:offset + 9], 'little')
|
|
|
- self.fd.seek(fd_offset, io.SEEK_SET)
|
|
|
- return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
|
|
|
- else:
|
|
|
+ if self.meta[offset] == ord(b'I'):
|
|
|
_, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset)
|
|
|
chunk_id_offset = offset - chunk_id_relative_offset
|
|
|
# bytearray slices are bytearrays as well, explicitly convert to bytes()
|
|
|
chunk_id = bytes(self.meta[chunk_id_offset:chunk_id_offset + 32])
|
|
|
- chunk_offset = int.from_bytes(self.meta[offset + 5:offset + 9], 'little')
|
|
|
chunk = self.chunks.get(chunk_id)
|
|
|
if not chunk:
|
|
|
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
|
|
@@ -110,12 +109,21 @@ class ItemCache:
|
|
|
unpacker = msgpack.Unpacker()
|
|
|
unpacker.feed(data)
|
|
|
return Item(internal_dict=next(unpacker))
|
|
|
+ elif self.meta[offset] == ord(b'S'):
|
|
|
+ fd_offset = int.from_bytes(self.meta[offset + 1:offset + 9], 'little')
|
|
|
+ self.fd.seek(fd_offset, io.SEEK_SET)
|
|
|
+ return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
|
|
|
+ else:
|
|
|
+ raise ValueError('Invalid entry type in self.meta')
|
|
|
|
|
|
def iter_archive_items(self, archive_item_ids):
|
|
|
unpacker = msgpack.Unpacker()
|
|
|
|
|
|
+ # Current offset in the metadata stream, which consists of all metadata chunks glued together
|
|
|
stream_offset = 0
|
|
|
+ # Offset of the current chunk in the metadata stream
|
|
|
chunk_begin = 0
|
|
|
+ # Length of the chunk preciding the current chunk
|
|
|
last_chunk_length = 0
|
|
|
msgpacked_bytes = b''
|
|
|
|
|
@@ -124,6 +132,7 @@ class ItemCache:
|
|
|
pack_indirect_into = self.indirect_entry_struct.pack_into
|
|
|
|
|
|
def write_bytes(append_msgpacked_bytes):
|
|
|
+ # XXX: Future versions of msgpack include an Unpacker.tell() method that provides this for free.
|
|
|
nonlocal msgpacked_bytes
|
|
|
nonlocal stream_offset
|
|
|
msgpacked_bytes += append_msgpacked_bytes
|
|
@@ -150,9 +159,9 @@ class ItemCache:
|
|
|
# Need more data, feed the next chunk
|
|
|
break
|
|
|
|
|
|
- current_item_length = len(msgpacked_bytes)
|
|
|
- current_spans_chunks = stream_offset - current_item_length <= chunk_begin
|
|
|
current_item = msgpacked_bytes
|
|
|
+ current_item_length = len(current_item)
|
|
|
+ current_spans_chunks = stream_offset - current_item_length < chunk_begin
|
|
|
msgpacked_bytes = b''
|
|
|
|
|
|
if write_offset + 9 >= len(meta):
|
|
@@ -178,15 +187,13 @@ class ItemCache:
|
|
|
pos = self.fd.seek(0, io.SEEK_END)
|
|
|
self.fd.write(current_item)
|
|
|
meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
|
|
|
- write_offset += 9
|
|
|
self.direct_items += 1
|
|
|
- inode = write_offset - 9 + self.offset
|
|
|
else:
|
|
|
item_offset = stream_offset - current_item_length - chunk_begin
|
|
|
pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset)
|
|
|
- write_offset += 9
|
|
|
self.indirect_items += 1
|
|
|
- inode = write_offset - 9 + self.offset
|
|
|
+ inode = write_offset + self.offset
|
|
|
+ write_offset += 9
|
|
|
|
|
|
yield inode, Item(internal_dict=item)
|
|
|
|