2
0

repository.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. from configparser import RawConfigParser
  2. from binascii import hexlify
  3. import errno
  4. import os
  5. import re
  6. import shutil
  7. import struct
  8. import sys
  9. import time
  10. from zlib import crc32
  11. from .hashindex import NSIndex
  12. from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock
  13. from .lrucache import LRUCache
  14. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  15. MAGIC = b'ATTICSEG'
  16. TAG_PUT = 0
  17. TAG_DELETE = 1
  18. TAG_COMMIT = 2
  19. class Repository(object):
  20. """Filesystem based transactional key value store
  21. On disk layout:
  22. dir/README
  23. dir/config
  24. dir/data/<X / SEGMENTS_PER_DIR>/<X>
  25. dir/index.X
  26. dir/hints.X
  27. """
  28. DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
  29. DEFAULT_SEGMENTS_PER_DIR = 10000
  30. class DoesNotExist(Error):
  31. """Repository {} does not exist"""
  32. class AlreadyExists(Error):
  33. """Repository {} already exists"""
  34. class InvalidRepository(Error):
  35. """{} is not a valid repository"""
  36. class CheckNeeded(Error):
  37. '''Inconsistency detected. Please "run attic check {}"'''
  38. def __init__(self, path, create=False):
  39. self.path = path
  40. self.io = None
  41. self.lock = None
  42. if create:
  43. self.create(path)
  44. self.open(path)
  45. def __del__(self):
  46. self.close()
  47. def create(self, path):
  48. """Create a new empty repository at `path`
  49. """
  50. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  51. raise self.AlreadyExists(path)
  52. if not os.path.exists(path):
  53. os.mkdir(path)
  54. with open(os.path.join(path, 'README'), 'w') as fd:
  55. fd.write('This is an Attic repository\n')
  56. os.mkdir(os.path.join(path, 'data'))
  57. config = RawConfigParser()
  58. config.add_section('repository')
  59. config.set('repository', 'version', '1')
  60. config.set('repository', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR)
  61. config.set('repository', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE)
  62. config.set('repository', 'id', hexlify(os.urandom(32)).decode('ascii'))
  63. with open(os.path.join(path, 'config'), 'w') as fd:
  64. config.write(fd)
  65. def open(self, path):
  66. self.head = None
  67. self.path = path
  68. if not os.path.isdir(path):
  69. raise self.DoesNotExist(path)
  70. self.config = RawConfigParser()
  71. self.config.read(os.path.join(self.path, 'config'))
  72. if not 'repository' in self.config.sections() or self.config.getint('repository', 'version') != 1:
  73. raise self.InvalidRepository(path)
  74. self.lock = UpgradableLock(os.path.join(path, 'config'))
  75. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  76. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  77. self.id = unhexlify(self.config.get('repository', 'id').strip())
  78. self.rollback()
  79. def close(self):
  80. if self.lock:
  81. if self.io:
  82. self.io.close()
  83. self.io = None
  84. self.lock.release()
  85. self.lock = None
  86. def commit(self, rollback=True):
  87. """Commit transaction
  88. """
  89. self.io.write_commit()
  90. self.compact_segments()
  91. self.write_index()
  92. self.rollback()
  93. def _available_indices(self, reverse=False):
  94. names = [int(name[6:]) for name in os.listdir(self.path) if re.match('index\.\d+', name)]
  95. names.sort(reverse=reverse)
  96. return names
  97. def open_index(self, head, read_only=False):
  98. if head is None:
  99. self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8'))
  100. self.segments = {}
  101. self.compact = set()
  102. else:
  103. if read_only:
  104. self.index = NSIndex((os.path.join(self.path, 'index.%d') % head).encode('utf-8'), readonly=True)
  105. else:
  106. shutil.copy(os.path.join(self.path, 'index.%d' % head),
  107. os.path.join(self.path, 'index.tmp'))
  108. self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8'))
  109. hints = read_msgpack(os.path.join(self.path, 'hints.%d' % head))
  110. if hints[b'version'] != 1:
  111. raise ValueError('Unknown hints file version: %d' % hints['version'])
  112. self.segments = hints[b'segments']
  113. self.compact = set(hints[b'compact'])
  114. def write_index(self):
  115. hints = {b'version': 1,
  116. b'segments': self.segments,
  117. b'compact': list(self.compact)}
  118. write_msgpack(os.path.join(self.path, 'hints.%d' % self.io.head), hints)
  119. self.index.flush()
  120. os.rename(os.path.join(self.path, 'index.tmp'),
  121. os.path.join(self.path, 'index.%d' % self.io.head))
  122. # Remove old indices
  123. current = '.%d' % self.io.head
  124. for name in os.listdir(self.path):
  125. if not name.startswith('index.') and not name.startswith('hints.'):
  126. continue
  127. if name.endswith(current):
  128. continue
  129. os.unlink(os.path.join(self.path, name))
  130. def compact_segments(self):
  131. """Compact sparse segments by copying data into new segments
  132. """
  133. if not self.compact:
  134. return
  135. def lookup(tag, key):
  136. return tag == TAG_PUT and self.index.get(key, (-1, -1))[0] == segment
  137. segments = self.segments
  138. for segment in sorted(self.compact):
  139. if segments[segment] > 0:
  140. for tag, key, data in self.io.iter_objects(segment, lookup, include_data=True):
  141. new_segment, offset = self.io.write_put(key, data)
  142. self.index[key] = new_segment, offset
  143. segments.setdefault(new_segment, 0)
  144. segments[new_segment] += 1
  145. segments[segment] -= 1
  146. assert segments[segment] == 0
  147. self.io.write_commit()
  148. for segment in self.compact:
  149. assert self.segments.pop(segment) == 0
  150. self.io.delete_segment(segment)
  151. self.compact = set()
  152. def recover(self, path):
  153. """Recover missing index by replaying logs"""
  154. start = None
  155. available = self._available_indices()
  156. if available:
  157. start = available[-1]
  158. self.open_index(start)
  159. for segment, filename in self.io._segment_names():
  160. if start is not None and segment <= start:
  161. continue
  162. self.segments[segment] = 0
  163. for tag, key, offset in self.io.iter_objects(segment):
  164. if tag == TAG_PUT:
  165. try:
  166. s, _ = self.index[key]
  167. self.compact.add(s)
  168. self.segments[s] -= 1
  169. except KeyError:
  170. pass
  171. self.index[key] = segment, offset
  172. self.segments[segment] += 1
  173. elif tag == TAG_DELETE:
  174. try:
  175. s, _ = self.index.pop(key)
  176. self.segments[s] -= 1
  177. self.compact.add(s)
  178. self.compact.add(segment)
  179. except KeyError:
  180. pass
  181. if self.segments[segment] == 0:
  182. self.compact.add(segment)
  183. if self.io.head is not None:
  184. self.write_index()
  185. def check(self, progress=False):
  186. """Check repository consistency
  187. This method verifies all segment checksums and makes sure
  188. the index is consistent with the data stored in the segments.
  189. """
  190. progress_time = None
  191. error_found = False
  192. def report_progress(msg, error=False):
  193. nonlocal error_found
  194. if error:
  195. error_found = True
  196. if error or progress:
  197. print(msg, file=sys.stderr)
  198. seen = set()
  199. for segment, filename in self.io._segment_names():
  200. if progress:
  201. if int(time.time()) != progress_time:
  202. progress_time = int(time.time())
  203. report_progress('Checking segment {}/{}'.format(segment, self.io.head))
  204. try:
  205. objects = list(self.io.iter_objects(segment))
  206. except (IntegrityError, struct.error):
  207. report_progress('Error reading segment {}'.format(segment), error=True)
  208. objects = []
  209. for tag, key, offset in objects:
  210. if tag == TAG_PUT:
  211. if key in seen:
  212. report_progress('Key found in more than one segment. Segment={}, key={}'.format(segment, hexlify(key)), error=True)
  213. seen.add(key)
  214. if self.index.get(key, (0, 0)) != (segment, offset):
  215. report_progress('Index vs segment header mismatch. Segment={}, key={}'.format(segment, hexlify(key)), error=True)
  216. elif tag == TAG_COMMIT:
  217. continue
  218. else:
  219. report_progress('Unexpected tag {} in segment {}'.format(tag, segment), error=True)
  220. if len(self.index) != len(seen):
  221. report_progress('Index object count mismatch. {} != {}'.format(len(self.index), len(seen)), error=True)
  222. if not error_found:
  223. report_progress('Check complete, no errors found.')
  224. return not error_found
  225. def rollback(self):
  226. """
  227. """
  228. self._active_txn = False
  229. if self.io:
  230. self.io.close()
  231. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  232. if self.io.head is not None and not os.path.exists(os.path.join(self.path, 'index.%d' % self.io.head)):
  233. self.lock.upgrade()
  234. self.recover(self.path)
  235. self.open_index(self.io.head, read_only=True)
  236. def _len(self):
  237. return len(self.index)
  238. def get(self, id):
  239. try:
  240. segment, offset = self.index[id]
  241. return self.io.read(segment, offset, id)
  242. except KeyError:
  243. raise self.DoesNotExist(self.path)
  244. def get_many(self, ids, is_preloaded=False):
  245. for id_ in ids:
  246. yield self.get(id_)
  247. def put(self, id, data, wait=True):
  248. if not self._active_txn:
  249. self._active_txn = True
  250. self.lock.upgrade()
  251. self.open_index(self.io.head)
  252. try:
  253. segment, _ = self.index[id]
  254. self.segments[segment] -= 1
  255. self.compact.add(segment)
  256. segment = self.io.write_delete(id)
  257. self.segments.setdefault(segment, 0)
  258. self.compact.add(segment)
  259. except KeyError:
  260. pass
  261. segment, offset = self.io.write_put(id, data)
  262. self.segments.setdefault(segment, 0)
  263. self.segments[segment] += 1
  264. self.index[id] = segment, offset
  265. def delete(self, id, wait=True):
  266. if not self._active_txn:
  267. self._active_txn = True
  268. self.lock.upgrade()
  269. self.open_index(self.io.head)
  270. try:
  271. segment, offset = self.index.pop(id)
  272. self.segments[segment] -= 1
  273. self.compact.add(segment)
  274. segment = self.io.write_delete(id)
  275. self.compact.add(segment)
  276. self.segments.setdefault(segment, 0)
  277. except KeyError:
  278. raise self.DoesNotExist(self.path)
  279. def preload(self, ids):
  280. """Preload objects (only applies to remote repositories
  281. """
  282. class LoggedIO(object):
  283. header_fmt = struct.Struct('<IIB')
  284. assert header_fmt.size == 9
  285. put_header_fmt = struct.Struct('<IIB32s')
  286. assert put_header_fmt.size == 41
  287. header_no_crc_fmt = struct.Struct('<IB')
  288. assert header_no_crc_fmt.size == 5
  289. crc_fmt = struct.Struct('<I')
  290. assert crc_fmt.size == 4
  291. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  292. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  293. def __init__(self, path, limit, segments_per_dir, capacity=100):
  294. self.path = path
  295. self.fds = LRUCache(capacity)
  296. self.segment = None
  297. self.limit = limit
  298. self.segments_per_dir = segments_per_dir
  299. self.offset = 0
  300. self._write_fd = None
  301. self.head = None
  302. self.cleanup()
  303. def close(self):
  304. for segment in list(self.fds.keys()):
  305. self.fds.pop(segment).close()
  306. self.close_segment()
  307. self.fds = None # Just to make sure we're disabled
  308. def _segment_names(self, reverse=False):
  309. for dirpath, dirs, filenames in os.walk(os.path.join(self.path, 'data')):
  310. dirs.sort(key=int, reverse=reverse)
  311. filenames = sorted((filename for filename in filenames if filename.isdigit()), key=int, reverse=reverse)
  312. for filename in filenames:
  313. yield int(filename), os.path.join(dirpath, filename)
  314. def cleanup(self):
  315. """Delete segment files left by aborted transactions
  316. """
  317. self.head = None
  318. self.segment = 0
  319. to_delete = []
  320. for segment, filename in self._segment_names(reverse=True):
  321. if self.is_complete_segment(filename):
  322. self.head = segment
  323. self.segment = self.head + 1
  324. for filename in to_delete:
  325. os.unlink(filename)
  326. break
  327. else:
  328. to_delete.append(filename)
  329. else:
  330. # Abort if no transaction is found, otherwise all segments
  331. # would be deleted
  332. if to_delete:
  333. raise Repository.CheckNeeded(self.path)
  334. def is_complete_segment(self, filename):
  335. with open(filename, 'rb') as fd:
  336. try:
  337. fd.seek(-self.header_fmt.size, os.SEEK_END)
  338. except Exception as e:
  339. # return False if segment file is empty or too small
  340. if e.errno == errno.EINVAL:
  341. return False
  342. raise e
  343. return fd.read(self.header_fmt.size) == self.COMMIT
  344. def segment_filename(self, segment):
  345. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  346. def get_write_fd(self, no_new=False):
  347. if not no_new and self.offset and self.offset > self.limit:
  348. self.close_segment()
  349. if not self._write_fd:
  350. if self.segment % self.segments_per_dir == 0:
  351. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  352. if not os.path.exists(dirname):
  353. os.mkdir(dirname)
  354. self._write_fd = open(self.segment_filename(self.segment), 'ab')
  355. self._write_fd.write(MAGIC)
  356. self.offset = 8
  357. return self._write_fd
  358. def get_fd(self, segment):
  359. try:
  360. return self.fds[segment]
  361. except KeyError:
  362. fd = open(self.segment_filename(segment), 'rb')
  363. self.fds[segment] = fd
  364. return fd
  365. def delete_segment(self, segment):
  366. try:
  367. os.unlink(self.segment_filename(segment))
  368. except OSError:
  369. pass
  370. def iter_objects(self, segment, lookup=None, include_data=False):
  371. fd = self.get_fd(segment)
  372. fd.seek(0)
  373. if fd.read(8) != MAGIC:
  374. raise IntegrityError('Invalid segment header')
  375. offset = 8
  376. header = fd.read(self.header_fmt.size)
  377. while header:
  378. crc, size, tag = self.header_fmt.unpack(header)
  379. if size > MAX_OBJECT_SIZE:
  380. raise IntegrityError('Invalid segment object size')
  381. rest = fd.read(size - self.header_fmt.size)
  382. if crc32(rest, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  383. raise IntegrityError('Segment checksum mismatch')
  384. if tag not in (TAG_PUT, TAG_DELETE, TAG_COMMIT):
  385. raise IntegrityError('Invalid segment entry header')
  386. key = None
  387. if tag in (TAG_PUT, TAG_DELETE):
  388. key = rest[:32]
  389. if not lookup or lookup(tag, key):
  390. if include_data:
  391. yield tag, key, rest[32:]
  392. else:
  393. yield tag, key, offset
  394. offset += size
  395. header = fd.read(self.header_fmt.size)
  396. def read(self, segment, offset, id):
  397. if segment == self.segment:
  398. self._write_fd.flush()
  399. fd = self.get_fd(segment)
  400. fd.seek(offset)
  401. header = fd.read(self.put_header_fmt.size)
  402. crc, size, tag, key = self.put_header_fmt.unpack(header)
  403. if size > MAX_OBJECT_SIZE:
  404. raise IntegrityError('Invalid segment object size')
  405. data = fd.read(size - self.put_header_fmt.size)
  406. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  407. raise IntegrityError('Segment checksum mismatch')
  408. if tag != TAG_PUT or id != key:
  409. raise IntegrityError('Invalid segment entry header')
  410. return data
  411. def write_put(self, id, data):
  412. size = len(data) + self.put_header_fmt.size
  413. fd = self.get_write_fd()
  414. offset = self.offset
  415. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  416. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  417. fd.write(b''.join((crc, header, id, data)))
  418. self.offset += size
  419. return self.segment, offset
  420. def write_delete(self, id):
  421. fd = self.get_write_fd()
  422. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  423. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  424. fd.write(b''.join((crc, header, id)))
  425. self.offset += self.put_header_fmt.size
  426. return self.segment
  427. def write_commit(self):
  428. fd = self.get_write_fd(no_new=True)
  429. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  430. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  431. fd.write(b''.join((crc, header)))
  432. self.head = self.segment
  433. self.close_segment()
  434. def close_segment(self):
  435. if self._write_fd:
  436. self.segment += 1
  437. self.offset = 0
  438. os.fsync(self._write_fd)
  439. self._write_fd.close()
  440. self._write_fd = None