| 
					
				 | 
			
			
				@@ -1,10 +1,13 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from configparser import RawConfigParser 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from .remote import cache_if_remote 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import errno 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import msgpack 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import os 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import sys 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from binascii import hexlify 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import shutil 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import tarfile 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import tempfile 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from .key import PlaintextKey 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, UpgradableLock, int_to_bigint, \ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -93,6 +96,8 @@ class Cache: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         with open(os.path.join(self.path, 'config'), 'w') as fd: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             config.write(fd) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8')) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            pass  # empty file 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         with open(os.path.join(self.path, 'files'), 'wb') as fd: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             pass  # empty file 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -148,6 +153,7 @@ class Cache: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         os.mkdir(txn_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         shutil.copy(os.path.join(self.path, 'config'), txn_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         shutil.copy(os.path.join(self.path, 'chunks'), txn_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         shutil.copy(os.path.join(self.path, 'files'), txn_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         os.rename(os.path.join(self.path, 'txn.tmp'), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                   os.path.join(self.path, 'txn.active')) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -189,6 +195,7 @@ class Cache: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if os.path.exists(txn_dir): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             shutil.copy(os.path.join(txn_dir, 'config'), self.path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             shutil.copy(os.path.join(txn_dir, 'chunks'), self.path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             shutil.copy(os.path.join(txn_dir, 'files'), self.path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if os.path.exists(os.path.join(self.path, 'txn.tmp')): 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -197,37 +204,139 @@ class Cache: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         self._do_open() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     def sync(self): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        """Initializes cache by fetching and reading all archive indicies 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        """Re-synchronize chunks cache with repository. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        If present, uses a compressed tar archive of known backup archive 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        indices, so it only needs to fetch infos from repo and build a chunk 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        index once per backup archive. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        If out of sync, the tar gets rebuilt from known + fetched chunk infos, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        so it has complete and current information about all backup archives. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Finally, it builds the master chunks index by merging all indices from 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        the tar. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Note: compression (esp. xz) is very effective in keeping the tar 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+              relatively small compared to the files it contains. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         """ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        def add(id, size, csize): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        in_archive_path = os.path.join(self.path, 'chunks.archive') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        out_archive_path = os.path.join(self.path, 'chunks.archive.tmp') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def open_in_archive(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                count, size, csize = self.chunks[id] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                self.chunks[id] = count + 1, size, csize 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf = tarfile.open(in_archive_path, 'r') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            except OSError as e: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if e.errno != errno.ENOENT: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    raise 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # file not found 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf = None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            except tarfile.ReadError: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # empty file? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf = None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return tf 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def open_out_archive(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for compression in ('xz', 'bz2', 'gz'): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # xz needs py 3.3, bz2 and gz also work on 3.2 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    break 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                except tarfile.CompressionError: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    continue 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else:  # shouldn't happen 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf = None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return tf 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def close_archive(tf): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if tf: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf.close() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def delete_in_archive(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            os.unlink(in_archive_path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def rename_out_archive(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            os.rename(out_archive_path, in_archive_path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def add(chunk_idx, id, size, csize, incr=1): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                count, size, csize = chunk_idx[id] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                chunk_idx[id] = count + incr, size, csize 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             except KeyError: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                self.chunks[id] = 1, size, csize 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        self.begin_txn() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        print('Initializing cache...') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        self.chunks.clear() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        unpacker = msgpack.Unpacker() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        repository = cache_if_remote(self.repository) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        for name, info in self.manifest.archives.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            archive_id = info[b'id'] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                chunk_idx[id] = incr, size, csize 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def transfer_known_idx(archive_id, tf_in, tf_out): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            archive_id_hex = hexlify(archive_id).decode('ascii') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tarinfo = tf_in.getmember(archive_id_hex) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            archive_name = tarinfo.pax_headers['archive_name'] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            print('Already known archive:', archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            f_in = tf_in.extractfile(archive_id_hex) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tf_out.addfile(tarinfo, f_in) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return archive_name 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            chunk_idx = ChunkIndex() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             cdata = repository.get(archive_id) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            data = self.key.decrypt(archive_id, cdata) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            add(archive_id, len(data), len(cdata)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            data = key.decrypt(archive_id, cdata) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            add(chunk_idx, archive_id, len(data), len(cdata)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             archive = msgpack.unpackb(data) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if archive[b'version'] != 1: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 raise Exception('Unknown archive metadata version') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             decode_dict(archive, (b'name',)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            print('Analyzing archive:', archive[b'name']) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            for key, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                data = self.key.decrypt(key, chunk) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                add(key, len(data), len(chunk)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            print('Analyzing new archive:', archive[b'name']) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            unpacker = msgpack.Unpacker() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = key.decrypt(item_id, chunk) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                add(chunk_idx, item_id, len(data), len(chunk)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 unpacker.feed(data) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 for item in unpacker: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     if b'chunks' in item: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         for chunk_id, size, csize in item[b'chunks']: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            add(chunk_id, size, csize) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            add(chunk_idx, chunk_id, size, csize) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            archive_id_hex = hexlify(archive_id).decode('ascii') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            chunk_idx.write(file_tmp) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tarinfo.pax_headers['archive_name'] = archive[b'name'] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            with open(file_tmp, 'rb') as f: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf_out.addfile(tarinfo, f) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            os.unlink(file_tmp) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        def create_master_idx(chunk_idx, tf_in, tmp_dir): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            chunk_idx.clear() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for tarinfo in tf_in: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                archive_id_hex = tarinfo.name 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tf_in.extract(archive_id_hex, tmp_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                archive_chunk_idx = ChunkIndex.read(chunk_idx_path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for chunk_id, (count, size, csize) in archive_chunk_idx.iteritems(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    add(chunk_idx, chunk_id, size, csize, incr=count) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                os.unlink(chunk_idx_path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        self.begin_txn() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        print('Synchronizing chunks cache...') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # XXX we have to do stuff on disk due to lacking ChunkIndex api 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        with tempfile.TemporaryDirectory() as tmp_dir: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            repository = cache_if_remote(self.repository) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            out_archive = open_out_archive() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            in_archive = open_in_archive() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if in_archive: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                known_ids = set() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            archive_ids = set(info[b'id'] for info in self.manifest.archives.values()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % ( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                len(known_ids), len(archive_ids), len(archive_ids - known_ids), )) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for archive_id in archive_ids & known_ids: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                transfer_known_idx(archive_id, in_archive, out_archive) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            close_archive(in_archive) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            delete_in_archive()  # free disk space 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for archive_id in archive_ids - known_ids: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            close_archive(out_archive) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            rename_out_archive() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            print('Merging collection into master chunks cache...') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            in_archive = open_in_archive() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            create_master_idx(self.chunks, in_archive, tmp_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            close_archive(in_archive) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            print('Done.') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     def add_chunk(self, id, data, stats): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if not self.txn_active: 
			 |