|
@@ -21,10 +21,12 @@ from .helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, grou
|
|
|
Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int
|
|
|
|
|
|
ITEMS_BUFFER = 1024 * 1024
|
|
|
+
|
|
|
CHUNK_MIN = 1024
|
|
|
CHUNK_MAX = 10 * 1024 * 1024
|
|
|
WINDOW_SIZE = 0xfff
|
|
|
CHUNK_MASK = 0xffff
|
|
|
+CHUNKER_PARAMS = (WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX)
|
|
|
|
|
|
ZEROS = b'\0' * CHUNK_MAX
|
|
|
|
|
@@ -69,12 +71,13 @@ class DownloadPipeline:
|
|
|
class ChunkBuffer:
|
|
|
BUFFER_SIZE = 1 * 1024 * 1024
|
|
|
|
|
|
- def __init__(self, key):
|
|
|
+ def __init__(self, key, chunker_params=CHUNKER_PARAMS):
|
|
|
self.buffer = BytesIO()
|
|
|
self.packer = msgpack.Packer(unicode_errors='surrogateescape')
|
|
|
self.chunks = []
|
|
|
self.key = key
|
|
|
- self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX,self.key.chunk_seed)
|
|
|
+ chunker_params += (self.key.chunk_seed, )
|
|
|
+ self.chunker = Chunker(*chunker_params)
|
|
|
|
|
|
def add(self, item):
|
|
|
self.buffer.write(self.packer.pack(StableDict(item)))
|
|
@@ -104,8 +107,8 @@ class ChunkBuffer:
|
|
|
|
|
|
class CacheChunkBuffer(ChunkBuffer):
|
|
|
|
|
|
- def __init__(self, cache, key, stats):
|
|
|
- super(CacheChunkBuffer, self).__init__(key)
|
|
|
+ def __init__(self, cache, key, stats, chunker_params=CHUNKER_PARAMS):
|
|
|
+ super(CacheChunkBuffer, self).__init__(key, chunker_params)
|
|
|
self.cache = cache
|
|
|
self.stats = stats
|
|
|
|
|
@@ -127,7 +130,8 @@ class Archive:
|
|
|
|
|
|
|
|
|
def __init__(self, repository, key, manifest, name, cache=None, create=False,
|
|
|
- checkpoint_interval=300, numeric_owner=False, progress=False):
|
|
|
+ checkpoint_interval=300, numeric_owner=False, progress=False,
|
|
|
+ chunker_params=CHUNKER_PARAMS):
|
|
|
self.cwd = os.getcwd()
|
|
|
self.key = key
|
|
|
self.repository = repository
|
|
@@ -142,8 +146,9 @@ class Archive:
|
|
|
self.numeric_owner = numeric_owner
|
|
|
self.pipeline = DownloadPipeline(self.repository, self.key)
|
|
|
if create:
|
|
|
- self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
|
|
|
- self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX, self.key.chunk_seed)
|
|
|
+ self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats, chunker_params)
|
|
|
+ chunker_params += (self.key.chunk_seed, )
|
|
|
+ self.chunker = Chunker(*chunker_params)
|
|
|
if name in manifest.archives:
|
|
|
raise self.AlreadyExists(name)
|
|
|
self.last_checkpoint = time.time()
|