|
@@ -256,12 +256,15 @@ class FileReader:
|
|
"""
|
|
"""
|
|
This is a buffered reader for file data.
|
|
This is a buffered reader for file data.
|
|
|
|
|
|
- It maintains a buffer that is filled by using FileFMAPReader.blockify generator when needed.
|
|
|
|
- The data in that buffer is consumed by clients calling FileReader.read.
|
|
|
|
|
|
+ It maintains a buffer that is filled with Chunks from the FileFMAPReader.blockify generator.
|
|
|
|
+ The data in that buffer is consumed by clients calling FileReader.read, which returns a Chunk.
|
|
|
|
+
|
|
|
|
+ Most complexity in here comes from the desired size when a user calls FileReader.read does
|
|
|
|
+ not need to match the Chunk sizes we got from the FileFMAPReader.
|
|
"""
|
|
"""
|
|
def __init__(self, *, fd=None, fh=-1, read_size=0, sparse=False, fmap=None):
|
|
def __init__(self, *, fd=None, fh=-1, read_size=0, sparse=False, fmap=None):
|
|
self.reader = FileFMAPReader(fd=fd, fh=fh, read_size=read_size, sparse=sparse, fmap=fmap)
|
|
self.reader = FileFMAPReader(fd=fd, fh=fh, read_size=read_size, sparse=sparse, fmap=fmap)
|
|
- self.buffer = [] # list of (data, meta) tuples
|
|
|
|
|
|
+ self.buffer = [] # list of Chunk objects
|
|
self.offset = 0 # offset into the first buffer object's data
|
|
self.offset = 0 # offset into the first buffer object's data
|
|
self.remaining_bytes = 0 # total bytes available in buffer
|
|
self.remaining_bytes = 0 # total bytes available in buffer
|
|
self.blockify_gen = None # generator from FileFMAPReader.blockify
|
|
self.blockify_gen = None # generator from FileFMAPReader.blockify
|
|
@@ -279,22 +282,20 @@ class FileReader:
|
|
|
|
|
|
try:
|
|
try:
|
|
chunk = next(self.blockify_gen)
|
|
chunk = next(self.blockify_gen)
|
|
- # Store both data and metadata in the buffer
|
|
|
|
- self.buffer.append((chunk.data, chunk.meta))
|
|
|
|
|
|
+ # Store the Chunk object directly in the buffer
|
|
|
|
+ self.buffer.append(chunk)
|
|
self.remaining_bytes += chunk.meta["size"]
|
|
self.remaining_bytes += chunk.meta["size"]
|
|
return True
|
|
return True
|
|
except StopIteration:
|
|
except StopIteration:
|
|
self.blockify_gen = None
|
|
self.blockify_gen = None
|
|
return False
|
|
return False
|
|
|
|
|
|
- def read(self, size, return_chunk_info=False):
|
|
|
|
|
|
+ def read(self, size):
|
|
"""
|
|
"""
|
|
- Read up to 'size' bytes from the file.
|
|
|
|
|
|
+ Read a Chunk of up to 'size' bytes from the file.
|
|
|
|
|
|
:param size: Number of bytes to read
|
|
:param size: Number of bytes to read
|
|
- :param return_chunk_info: if True, return a tuple (data, allocation, size) instead of just data
|
|
|
|
- :return: Bytes object containing the read data, or None if no data is available.
|
|
|
|
- If return_chunk_info is True, returns a tuple (data, allocation, size).
|
|
|
|
|
|
+ :return: Chunk object containing the read data. If no data is available, returns Chunk(None, size=0, allocation=CH_DATA).
|
|
"""
|
|
"""
|
|
# Initialize if not already done
|
|
# Initialize if not already done
|
|
if self.blockify_gen is None:
|
|
if self.blockify_gen is None:
|
|
@@ -309,18 +310,19 @@ class FileReader:
|
|
# No more data available, return what we have
|
|
# No more data available, return what we have
|
|
break
|
|
break
|
|
|
|
|
|
- # If we have no data at all, return None
|
|
|
|
|
|
+ # If we have no data at all, return an empty Chunk
|
|
if not self.buffer:
|
|
if not self.buffer:
|
|
- return None if not return_chunk_info else (None, None, 0)
|
|
|
|
|
|
+ return Chunk(None, size=0, allocation=CH_ALLOC)
|
|
|
|
|
|
# Get the first chunk from the buffer
|
|
# Get the first chunk from the buffer
|
|
- data, meta = self.buffer[0]
|
|
|
|
- chunk_size = meta["size"]
|
|
|
|
- allocation = meta["allocation"]
|
|
|
|
-
|
|
|
|
- # If we're returning chunk info and this is a non-data chunk, handle it specially
|
|
|
|
- if return_chunk_info and (allocation != CH_DATA or data is None):
|
|
|
|
- # For non-data chunks, we return the allocation type and size
|
|
|
|
|
|
+ chunk = self.buffer[0]
|
|
|
|
+ chunk_size = chunk.meta["size"]
|
|
|
|
+ allocation = chunk.meta["allocation"]
|
|
|
|
+ data = chunk.data
|
|
|
|
+
|
|
|
|
+ # If this is a non-data chunk, handle it specially
|
|
|
|
+ if allocation != CH_DATA or data is None:
|
|
|
|
+ # For non-data chunks, we return a Chunk with the allocation type and size
|
|
size_to_return = min(size, chunk_size - self.offset)
|
|
size_to_return = min(size, chunk_size - self.offset)
|
|
|
|
|
|
# Update buffer state
|
|
# Update buffer state
|
|
@@ -332,9 +334,9 @@ class FileReader:
|
|
|
|
|
|
self.remaining_bytes -= size_to_return
|
|
self.remaining_bytes -= size_to_return
|
|
|
|
|
|
- return (None, allocation, size_to_return)
|
|
|
|
|
|
+ return Chunk(None, size=size_to_return, allocation=allocation)
|
|
|
|
|
|
- # For data chunks or when not returning chunk info, proceed as before
|
|
|
|
|
|
+ # For data chunks, proceed as before
|
|
# Prepare to collect the requested data
|
|
# Prepare to collect the requested data
|
|
result = bytearray()
|
|
result = bytearray()
|
|
bytes_to_read = min(size, self.remaining_bytes)
|
|
bytes_to_read = min(size, self.remaining_bytes)
|
|
@@ -342,18 +344,15 @@ class FileReader:
|
|
|
|
|
|
# Read data from the buffer
|
|
# Read data from the buffer
|
|
while bytes_read < bytes_to_read and self.buffer:
|
|
while bytes_read < bytes_to_read and self.buffer:
|
|
- data, meta = self.buffer[0]
|
|
|
|
- chunk_size = meta["size"]
|
|
|
|
- allocation = meta["allocation"]
|
|
|
|
|
|
+ chunk = self.buffer[0]
|
|
|
|
+ chunk_size = chunk.meta["size"]
|
|
|
|
+ allocation = chunk.meta["allocation"]
|
|
|
|
+ data = chunk.data
|
|
|
|
|
|
- # Skip non-data chunks if not returning chunk info
|
|
|
|
- if (allocation != CH_DATA or data is None) and not return_chunk_info:
|
|
|
|
- self.buffer.pop(0)
|
|
|
|
- self.remaining_bytes -= chunk_size
|
|
|
|
- continue
|
|
|
|
|
|
+ # We now handle all chunk types, so no need to skip non-data chunks
|
|
|
|
|
|
- # If this is a non-data chunk and we're returning chunk info, break to handle it
|
|
|
|
- if (allocation != CH_DATA or data is None) and return_chunk_info:
|
|
|
|
|
|
+ # If this is a non-data chunk, break to handle it
|
|
|
|
+ if allocation != CH_DATA or data is None:
|
|
if bytes_read > 0:
|
|
if bytes_read > 0:
|
|
# We've already read some data, so return that first
|
|
# We've already read some data, so return that first
|
|
break
|
|
break
|
|
@@ -370,7 +369,7 @@ class FileReader:
|
|
|
|
|
|
self.remaining_bytes -= size_to_return
|
|
self.remaining_bytes -= size_to_return
|
|
|
|
|
|
- return (None, allocation, size_to_return)
|
|
|
|
|
|
+ return Chunk(None, size=size_to_return, allocation=allocation)
|
|
|
|
|
|
# Calculate how much we can read from this chunk
|
|
# Calculate how much we can read from this chunk
|
|
available = chunk_size - self.offset
|
|
available = chunk_size - self.offset
|
|
@@ -390,10 +389,8 @@ class FileReader:
|
|
|
|
|
|
self.remaining_bytes -= to_read
|
|
self.remaining_bytes -= to_read
|
|
|
|
|
|
- if return_chunk_info:
|
|
|
|
- return (bytes(result) if result else None, CH_DATA, bytes_read)
|
|
|
|
- else:
|
|
|
|
- return bytes(result) if result else None
|
|
|
|
|
|
+ # Return a Chunk object with the data
|
|
|
|
+ return Chunk(bytes(result), size=bytes_read, allocation=CH_DATA)
|
|
|
|
|
|
|
|
|
|
class ChunkerFixed:
|
|
class ChunkerFixed:
|
|
@@ -442,31 +439,24 @@ class ChunkerFixed:
|
|
if self.header_size > 0:
|
|
if self.header_size > 0:
|
|
# Read the header block using read
|
|
# Read the header block using read
|
|
started_chunking = time.monotonic()
|
|
started_chunking = time.monotonic()
|
|
- header_info = self.reader.read(self.header_size, return_chunk_info=True)
|
|
|
|
|
|
+ header_chunk = self.reader.read(self.header_size)
|
|
self.chunking_time += time.monotonic() - started_chunking
|
|
self.chunking_time += time.monotonic() - started_chunking
|
|
|
|
|
|
- if header_info is not None and header_info[2] > 0:
|
|
|
|
- # Unpack the header info
|
|
|
|
- data, allocation, size = header_info
|
|
|
|
- assert self.header_size == size
|
|
|
|
|
|
+ if header_chunk.meta["size"] > 0:
|
|
|
|
+ assert self.header_size == header_chunk.meta["size"]
|
|
# Yield the header chunk
|
|
# Yield the header chunk
|
|
- yield Chunk(data, size=size, allocation=allocation)
|
|
|
|
|
|
+ yield header_chunk
|
|
|
|
|
|
# Process the rest of the file using read
|
|
# Process the rest of the file using read
|
|
while True:
|
|
while True:
|
|
started_chunking = time.monotonic()
|
|
started_chunking = time.monotonic()
|
|
- chunk_info = self.reader.read(self.block_size, return_chunk_info=True)
|
|
|
|
|
|
+ chunk = self.reader.read(self.block_size)
|
|
self.chunking_time += time.monotonic() - started_chunking
|
|
self.chunking_time += time.monotonic() - started_chunking
|
|
-
|
|
|
|
- if chunk_info is None or chunk_info[2] == 0:
|
|
|
|
- # End of file
|
|
|
|
- break
|
|
|
|
-
|
|
|
|
- # Unpack the chunk info
|
|
|
|
- data, allocation, size = chunk_info
|
|
|
|
-
|
|
|
|
- # Yield the chunk with the appropriate allocation type
|
|
|
|
- yield Chunk(data, size=size, allocation=allocation)
|
|
|
|
|
|
+ size = chunk.meta["size"]
|
|
|
|
+ if size == 0:
|
|
|
|
+ break # EOF
|
|
|
|
+ assert size <= self.block_size
|
|
|
|
+ yield chunk
|
|
|
|
|
|
|
|
|
|
# Cyclic polynomial / buzhash
|
|
# Cyclic polynomial / buzhash
|