cache.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. from configparser import RawConfigParser
  2. from attic.remote import RemoteRepository, RepositoryCache
  3. import msgpack
  4. import os
  5. from binascii import hexlify
  6. import shutil
  7. from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, UpgradableLock
  8. from .hashindex import ChunkIndex
  9. class Cache(object):
  10. """Client Side cache
  11. """
  12. class RepositoryReplay(Error):
  13. """Cache is newer than repository, refusing to continue"""
  14. def __init__(self, repository, key, manifest, path=None, sync=True):
  15. self.timestamp = None
  16. self.txn_active = False
  17. self.repository = repository
  18. self.key = key
  19. self.manifest = manifest
  20. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  21. if not os.path.exists(self.path):
  22. self.create()
  23. self.open()
  24. if sync and self.manifest.id != self.manifest_id:
  25. # If repository is older than the cache something fishy is going on
  26. if self.timestamp and self.timestamp > manifest.timestamp:
  27. raise self.RepositoryReplay()
  28. self.sync()
  29. self.commit()
  30. def __del__(self):
  31. self.close()
  32. def create(self):
  33. """Create a new empty cache at `path`
  34. """
  35. os.makedirs(self.path)
  36. with open(os.path.join(self.path, 'README'), 'w') as fd:
  37. fd.write('This is an Attic cache')
  38. config = RawConfigParser()
  39. config.add_section('cache')
  40. config.set('cache', 'version', '1')
  41. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  42. config.set('cache', 'manifest', '')
  43. with open(os.path.join(self.path, 'config'), 'w') as fd:
  44. config.write(fd)
  45. ChunkIndex.create(os.path.join(self.path, 'chunks').encode('utf-8'))
  46. with open(os.path.join(self.path, 'files'), 'w') as fd:
  47. pass # empty file
  48. def open(self):
  49. if not os.path.isdir(self.path):
  50. raise Exception('%s Does not look like an Attic cache' % self.path)
  51. self.lock = UpgradableLock(os.path.join(self.path, 'config'), exclusive=True)
  52. self.rollback()
  53. self.config = RawConfigParser()
  54. self.config.read(os.path.join(self.path, 'config'))
  55. if self.config.getint('cache', 'version') != 1:
  56. raise Exception('%s Does not look like an Attic cache')
  57. self.id = self.config.get('cache', 'repository')
  58. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  59. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  60. self.chunks = ChunkIndex(os.path.join(self.path, 'chunks').encode('utf-8'))
  61. self.files = None
  62. def close(self):
  63. self.lock.release()
  64. def _read_files(self):
  65. self.files = {}
  66. self._newest_mtime = 0
  67. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  68. u = msgpack.Unpacker(use_list=True)
  69. while True:
  70. data = fd.read(64 * 1024)
  71. if not data:
  72. break
  73. u.feed(data)
  74. for hash, item in u:
  75. item[0] += 1
  76. self.files[hash] = item
  77. def begin_txn(self):
  78. # Initialize transaction snapshot
  79. txn_dir = os.path.join(self.path, 'txn.tmp')
  80. os.mkdir(txn_dir)
  81. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  82. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  83. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  84. os.rename(os.path.join(self.path, 'txn.tmp'),
  85. os.path.join(self.path, 'txn.active'))
  86. self.txn_active = True
  87. def commit(self):
  88. """Commit transaction
  89. """
  90. if not self.txn_active:
  91. return
  92. if self.files is not None:
  93. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  94. for item in self.files.items():
  95. # Discard cached files with the newest mtime to avoid
  96. # issues with filesystem snapshots and mtime precision
  97. if item[1][0] < 10 and item[1][3] < self._newest_mtime:
  98. msgpack.pack(item, fd)
  99. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  100. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  101. with open(os.path.join(self.path, 'config'), 'w') as fd:
  102. self.config.write(fd)
  103. self.chunks.flush()
  104. os.rename(os.path.join(self.path, 'txn.active'),
  105. os.path.join(self.path, 'txn.tmp'))
  106. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  107. self.txn_active = False
  108. def rollback(self):
  109. """Roll back partial and aborted transactions
  110. """
  111. # Roll back active transaction
  112. txn_dir = os.path.join(self.path, 'txn.active')
  113. if os.path.exists(txn_dir):
  114. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  115. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  116. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  117. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  118. # Remove partial transaction
  119. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  120. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  121. self.txn_active = False
  122. def sync(self):
  123. """Initializes cache by fetching and reading all archive indicies
  124. """
  125. def add(id, size, csize):
  126. try:
  127. count, size, csize = self.chunks[id]
  128. self.chunks[id] = count + 1, size, csize
  129. except KeyError:
  130. self.chunks[id] = 1, size, csize
  131. self.begin_txn()
  132. print('Initializing cache...')
  133. self.chunks.clear()
  134. unpacker = msgpack.Unpacker()
  135. if isinstance(self.repository, RemoteRepository):
  136. repository = RepositoryCache(self.repository)
  137. else:
  138. repository = self.repository
  139. for name, info in self.manifest.archives.items():
  140. archive_id = info[b'id']
  141. cdata = repository.get(archive_id)
  142. data = self.key.decrypt(archive_id, cdata)
  143. add(archive_id, len(data), len(cdata))
  144. archive = msgpack.unpackb(data)
  145. if archive[b'version'] != 1:
  146. raise Exception('Unknown archive metadata version')
  147. decode_dict(archive, (b'name',))
  148. print('Analyzing archive:', archive[b'name'])
  149. for key, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  150. data = self.key.decrypt(key, chunk)
  151. add(key, len(data), len(chunk))
  152. unpacker.feed(data)
  153. for item in unpacker:
  154. if b'chunks' in item:
  155. for chunk_id, size, csize in item[b'chunks']:
  156. add(chunk_id, size, csize)
  157. def add_chunk(self, id, data, stats):
  158. if not self.txn_active:
  159. self.begin_txn()
  160. if self.seen_chunk(id):
  161. return self.chunk_incref(id, stats)
  162. size = len(data)
  163. data = self.key.encrypt(data)
  164. csize = len(data)
  165. self.repository.put(id, data, wait=False)
  166. self.chunks[id] = (1, size, csize)
  167. stats.update(size, csize, True)
  168. return id, size, csize
  169. def seen_chunk(self, id):
  170. return self.chunks.get(id, (0, 0, 0))[0]
  171. def chunk_incref(self, id, stats):
  172. if not self.txn_active:
  173. self.begin_txn()
  174. count, size, csize = self.chunks[id]
  175. self.chunks[id] = (count + 1, size, csize)
  176. stats.update(size, csize, False)
  177. return id, size, csize
  178. def chunk_decref(self, id):
  179. if not self.txn_active:
  180. self.begin_txn()
  181. count, size, csize = self.chunks[id]
  182. if count == 1:
  183. del self.chunks[id]
  184. self.repository.delete(id, wait=False)
  185. else:
  186. self.chunks[id] = (count - 1, size, csize)
  187. def file_known_and_unchanged(self, path_hash, st):
  188. if self.files is None:
  189. self._read_files()
  190. entry = self.files.get(path_hash)
  191. if (entry and entry[3] == st_mtime_ns(st)
  192. and entry[2] == st.st_size and entry[1] == st.st_ino):
  193. # reset entry age
  194. if entry[0] != 0:
  195. self.files[path_hash][0] = 0
  196. return entry[4]
  197. else:
  198. return None
  199. def memorize_file(self, path_hash, st, ids):
  200. # Entry: Age, inode, size, mtime, chunk ids
  201. mtime_ns = st_mtime_ns(st)
  202. self.files[path_hash] = 0, st.st_ino, st.st_size, mtime_ns, ids
  203. self._newest_mtime = max(self._newest_mtime, mtime_ns)