|
@@ -12,8 +12,9 @@ from xattr import xattr, XATTR_NOFOLLOW
|
|
|
from . import NS_ARCHIVE_METADATA, NS_CHUNK
|
|
|
from ._speedups import chunkify
|
|
|
from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError, \
|
|
|
- Counter, encode_filename
|
|
|
+ Counter, encode_filename, Statistics
|
|
|
|
|
|
+ITEMS_BUFFER = 1024 * 1024
|
|
|
CHUNK_SIZE = 64 * 1024
|
|
|
WINDOW_SIZE = 4096
|
|
|
|
|
@@ -33,6 +34,7 @@ class Archive(object):
|
|
|
self.items = StringIO()
|
|
|
self.items_ids = []
|
|
|
self.hard_links = {}
|
|
|
+ self.stats = Statistics()
|
|
|
if name:
|
|
|
self.load(self.key.archive_hash(name))
|
|
|
|
|
@@ -74,7 +76,7 @@ class Archive(object):
|
|
|
|
|
|
def add_item(self, item):
|
|
|
self.items.write(msgpack.packb(item))
|
|
|
- if self.items.tell() > 1024 * 1024:
|
|
|
+ if self.items.tell() > ITEMS_BUFFER:
|
|
|
self.flush_items()
|
|
|
|
|
|
def flush_items(self, flush=False):
|
|
@@ -85,9 +87,11 @@ class Archive(object):
|
|
|
self.items.seek(0)
|
|
|
self.items.truncate()
|
|
|
for chunk in chunks[:-1]:
|
|
|
- self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunk), chunk))
|
|
|
+ self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunk),
|
|
|
+ chunk, self.stats))
|
|
|
if flush or len(chunks) == 1:
|
|
|
- self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1]))
|
|
|
+ self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunks[-1]),
|
|
|
+ chunks[-1], self.stats))
|
|
|
else:
|
|
|
self.items.write(chunks[-1])
|
|
|
|
|
@@ -108,7 +112,7 @@ class Archive(object):
|
|
|
self.store.commit()
|
|
|
cache.commit()
|
|
|
|
|
|
- def stats(self, cache):
|
|
|
+ def calc_stats(self, cache):
|
|
|
# This function is a bit evil since it abuses the cache to calculate
|
|
|
# the stats. The cache transaction must be rolled back afterwards
|
|
|
def cb(chunk, error, id):
|
|
@@ -120,21 +124,15 @@ class Archive(object):
|
|
|
try:
|
|
|
for id, size, csize in item['chunks']:
|
|
|
count, _, _ = self.cache.chunks[id]
|
|
|
- stats['osize'] += size
|
|
|
- stats['csize'] += csize
|
|
|
- if count == 1:
|
|
|
- stats['usize'] += csize
|
|
|
+ stats.update(size, csize, count==1)
|
|
|
self.cache.chunks[id] = count - 1, size, csize
|
|
|
except KeyError:
|
|
|
pass
|
|
|
unpacker = msgpack.Unpacker()
|
|
|
cache.begin_txn()
|
|
|
- stats = {'osize': 0, 'csize': 0, 'usize': 0}
|
|
|
+ stats = Statistics()
|
|
|
for id, size, csize in self.metadata['items']:
|
|
|
- stats['osize'] += size
|
|
|
- stats['csize'] += csize
|
|
|
- if self.cache.seen_chunk(id) == 1:
|
|
|
- stats['usize'] += csize
|
|
|
+ stats.update(size, csize, self.cache.seen_chunk(id) == 1)
|
|
|
self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
|
|
|
self.cache.chunk_decref(id)
|
|
|
self.store.flush_rpc()
|
|
@@ -323,14 +321,14 @@ class Archive(object):
|
|
|
if not cache.seen_chunk(id):
|
|
|
break
|
|
|
else:
|
|
|
- chunks = [cache.chunk_incref(id) for id in ids]
|
|
|
+ chunks = [cache.chunk_incref(id, self.stats) for id in ids]
|
|
|
# Only chunkify the file if needed
|
|
|
if chunks is None:
|
|
|
with open(path, 'rb') as fd:
|
|
|
chunks = []
|
|
|
for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
|
|
|
self.key.chunk_seed):
|
|
|
- chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk))
|
|
|
+ chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
|
|
|
ids = [id for id, _, _ in chunks]
|
|
|
cache.memorize_file(path_hash, st, ids)
|
|
|
item = {'path': safe_path, 'chunks': chunks}
|