repository.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. from configparser import RawConfigParser
  2. from binascii import hexlify
  3. import errno
  4. import os
  5. import re
  6. import shutil
  7. import struct
  8. import sys
  9. from zlib import crc32
  10. from .hashindex import NSIndex
  11. from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock
  12. from .lrucache import LRUCache
  13. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  14. MAGIC = b'ATTICSEG'
  15. TAG_PUT = 0
  16. TAG_DELETE = 1
  17. TAG_COMMIT = 2
  18. class Repository(object):
  19. """Filesystem based transactional key value store
  20. On disk layout:
  21. dir/README
  22. dir/config
  23. dir/data/<X / SEGMENTS_PER_DIR>/<X>
  24. dir/index.X
  25. dir/hints.X
  26. """
  27. DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
  28. DEFAULT_SEGMENTS_PER_DIR = 10000
  29. class DoesNotExist(Error):
  30. """Repository {} does not exist"""
  31. class AlreadyExists(Error):
  32. """Repository {} already exists"""
  33. class InvalidRepository(Error):
  34. """{} is not a valid repository"""
  35. def __init__(self, path, create=False):
  36. self.path = path
  37. self.io = None
  38. self.lock = None
  39. if create:
  40. self.create(path)
  41. self.open(path)
  42. def __del__(self):
  43. self.close()
  44. def create(self, path):
  45. """Create a new empty repository at `path`
  46. """
  47. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  48. raise self.AlreadyExists(path)
  49. if not os.path.exists(path):
  50. os.mkdir(path)
  51. with open(os.path.join(path, 'README'), 'w') as fd:
  52. fd.write('This is an Attic repository\n')
  53. os.mkdir(os.path.join(path, 'data'))
  54. config = RawConfigParser()
  55. config.add_section('repository')
  56. config.set('repository', 'version', '1')
  57. config.set('repository', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR)
  58. config.set('repository', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE)
  59. config.set('repository', 'id', hexlify(os.urandom(32)).decode('ascii'))
  60. with open(os.path.join(path, 'config'), 'w') as fd:
  61. config.write(fd)
  62. def open(self, path):
  63. self.head = None
  64. self.path = path
  65. if not os.path.isdir(path):
  66. raise self.DoesNotExist(path)
  67. self.config = RawConfigParser()
  68. self.config.read(os.path.join(self.path, 'config'))
  69. if not 'repository' in self.config.sections() or self.config.getint('repository', 'version') != 1:
  70. raise self.InvalidRepository(path)
  71. self.lock = UpgradableLock(os.path.join(path, 'config'))
  72. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  73. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  74. self.id = unhexlify(self.config.get('repository', 'id').strip())
  75. self.rollback()
  76. def close(self):
  77. if self.lock:
  78. self.rollback()
  79. self.lock.release()
  80. self.lock = None
  81. def commit(self, rollback=True):
  82. """Commit transaction
  83. """
  84. self.io.write_commit()
  85. self.compact_segments()
  86. self.write_index()
  87. self.rollback()
  88. def _available_indices(self, reverse=False):
  89. names = [int(name[6:]) for name in os.listdir(self.path) if re.match('index\.\d+', name)]
  90. names.sort(reverse=reverse)
  91. return names
  92. def open_index(self, head, read_only=False):
  93. if head is None:
  94. self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8'))
  95. self.segments = {}
  96. self.compact = set()
  97. else:
  98. if read_only:
  99. self.index = NSIndex((os.path.join(self.path, 'index.%d') % head).encode('utf-8'), readonly=True)
  100. else:
  101. shutil.copy(os.path.join(self.path, 'index.%d' % head),
  102. os.path.join(self.path, 'index.tmp'))
  103. self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8'))
  104. hints = read_msgpack(os.path.join(self.path, 'hints.%d' % head))
  105. if hints[b'version'] != 1:
  106. raise ValueError('Unknown hints file version: %d' % hints['version'])
  107. self.segments = hints[b'segments']
  108. self.compact = set(hints[b'compact'])
  109. def write_index(self):
  110. hints = {b'version': 1,
  111. b'segments': self.segments,
  112. b'compact': list(self.compact)}
  113. write_msgpack(os.path.join(self.path, 'hints.%d' % self.io.head), hints)
  114. self.index.flush()
  115. os.rename(os.path.join(self.path, 'index.tmp'),
  116. os.path.join(self.path, 'index.%d' % self.io.head))
  117. # Remove old indices
  118. current = '.%d' % self.io.head
  119. for name in os.listdir(self.path):
  120. if not name.startswith('index.') and not name.startswith('hints.'):
  121. continue
  122. if name.endswith(current):
  123. continue
  124. os.unlink(os.path.join(self.path, name))
  125. def compact_segments(self):
  126. """Compact sparse segments by copying data into new segments
  127. """
  128. if not self.compact:
  129. return
  130. def lookup(tag, key):
  131. return tag == TAG_PUT and self.index.get(key, (-1, -1))[0] == segment
  132. segments = self.segments
  133. for segment in sorted(self.compact):
  134. if segments[segment] > 0:
  135. for tag, key, data in self.io.iter_objects(segment, lookup, include_data=True):
  136. new_segment, offset = self.io.write_put(key, data)
  137. self.index[key] = new_segment, offset
  138. segments.setdefault(new_segment, 0)
  139. segments[new_segment] += 1
  140. segments[segment] -= 1
  141. assert segments[segment] == 0
  142. self.io.write_commit()
  143. for segment in self.compact:
  144. assert self.segments.pop(segment) == 0
  145. self.io.delete_segment(segment)
  146. self.compact = set()
  147. def recover(self, path):
  148. """Recover missing index by replaying logs"""
  149. start = None
  150. available = self._available_indices()
  151. if available:
  152. start = available[-1]
  153. self.open_index(start)
  154. for segment, filename in self.io._segment_names():
  155. if start is not None and segment <= start:
  156. continue
  157. self.segments[segment] = 0
  158. for tag, key, offset in self.io.iter_objects(segment):
  159. if tag == TAG_PUT:
  160. try:
  161. s, _ = self.index[key]
  162. self.compact.add(s)
  163. self.segments[s] -= 1
  164. except KeyError:
  165. pass
  166. self.index[key] = segment, offset
  167. self.segments[segment] += 1
  168. elif tag == TAG_DELETE:
  169. try:
  170. s, _ = self.index.pop(key)
  171. self.segments[s] -= 1
  172. self.compact.add(s)
  173. self.compact.add(segment)
  174. except KeyError:
  175. pass
  176. if self.segments[segment] == 0:
  177. self.compact.add(segment)
  178. if self.io.head is not None:
  179. self.write_index()
  180. def check(self, progress=False):
  181. """Check repository consistency
  182. This method verifies all segment checksums and makes sure
  183. the index is consistent with the data stored in the segments.
  184. """
  185. error_found = False
  186. def report_error(msg):
  187. nonlocal error_found
  188. error_found = True
  189. print(msg, file=sys.stderr)
  190. seen = set()
  191. for segment, filename in self.io._segment_names():
  192. if progress:
  193. print('Checking segment {}/{}'.format(segment, self.io.head))
  194. try:
  195. objects = list(self.io.iter_objects(segment))
  196. except (IntegrityError, struct.error):
  197. report_error('Error reading segment {}'.format(segment))
  198. objects = []
  199. for tag, key, offset in objects:
  200. if tag == TAG_PUT:
  201. if key in seen:
  202. report_error('Key found in more than one segment. Segment={}, key={}'.format(segment, hexlify(key)))
  203. seen.add(key)
  204. if self.index.get(key, (0, 0)) != (segment, offset):
  205. report_error('Index vs segment header mismatch. Segment={}, key={}'.format(segment, hexlify(key)))
  206. elif tag == TAG_COMMIT:
  207. continue
  208. else:
  209. raise self.RepositoryCheckFailed(self.path, 'Unexpected tag {} in segment {}'.format(tag, segment))
  210. if len(self.index) != len(seen):
  211. report_error('Index object count mismatch. {} != {}'.format(len(self.index), len(seen)))
  212. return not error_found
  213. def rollback(self):
  214. """
  215. """
  216. self._active_txn = False
  217. if self.io:
  218. self.io.close()
  219. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  220. if self.io.head is not None and not os.path.exists(os.path.join(self.path, 'index.%d' % self.io.head)):
  221. self.lock.upgrade()
  222. self.recover(self.path)
  223. self.open_index(self.io.head, read_only=True)
  224. def _len(self):
  225. return len(self.index)
  226. def get(self, id):
  227. try:
  228. segment, offset = self.index[id]
  229. return self.io.read(segment, offset, id)
  230. except KeyError:
  231. raise self.DoesNotExist(self.path)
  232. def get_many(self, ids, is_preloaded=False):
  233. for id_ in ids:
  234. yield self.get(id_)
  235. def put(self, id, data, wait=True):
  236. if not self._active_txn:
  237. self._active_txn = True
  238. self.lock.upgrade()
  239. self.open_index(self.io.head)
  240. try:
  241. segment, _ = self.index[id]
  242. self.segments[segment] -= 1
  243. self.compact.add(segment)
  244. segment = self.io.write_delete(id)
  245. self.segments.setdefault(segment, 0)
  246. self.compact.add(segment)
  247. except KeyError:
  248. pass
  249. segment, offset = self.io.write_put(id, data)
  250. self.segments.setdefault(segment, 0)
  251. self.segments[segment] += 1
  252. self.index[id] = segment, offset
  253. def delete(self, id, wait=True):
  254. if not self._active_txn:
  255. self._active_txn = True
  256. self.lock.upgrade()
  257. self.open_index(self.io.head)
  258. try:
  259. segment, offset = self.index.pop(id)
  260. self.segments[segment] -= 1
  261. self.compact.add(segment)
  262. segment = self.io.write_delete(id)
  263. self.compact.add(segment)
  264. self.segments.setdefault(segment, 0)
  265. except KeyError:
  266. raise self.DoesNotExist(self.path)
  267. def preload(self, ids):
  268. """Preload objects (only applies to remote repositories
  269. """
  270. class LoggedIO(object):
  271. header_fmt = struct.Struct('<IIB')
  272. assert header_fmt.size == 9
  273. put_header_fmt = struct.Struct('<IIB32s')
  274. assert put_header_fmt.size == 41
  275. header_no_crc_fmt = struct.Struct('<IB')
  276. assert header_no_crc_fmt.size == 5
  277. crc_fmt = struct.Struct('<I')
  278. assert crc_fmt.size == 4
  279. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  280. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  281. def __init__(self, path, limit, segments_per_dir, capacity=100):
  282. self.path = path
  283. self.fds = LRUCache(capacity)
  284. self.segment = None
  285. self.limit = limit
  286. self.segments_per_dir = segments_per_dir
  287. self.offset = 0
  288. self._write_fd = None
  289. self.head = None
  290. self.cleanup()
  291. def close(self):
  292. for segment in list(self.fds.keys()):
  293. self.fds.pop(segment).close()
  294. self.close_segment()
  295. self.fds = None # Just to make sure we're disabled
  296. def _segment_names(self, reverse=False):
  297. for dirpath, dirs, filenames in os.walk(os.path.join(self.path, 'data')):
  298. dirs.sort(key=int, reverse=reverse)
  299. filenames = sorted((filename for filename in filenames if filename.isdigit()), key=int, reverse=reverse)
  300. for filename in filenames:
  301. yield int(filename), os.path.join(dirpath, filename)
  302. def cleanup(self):
  303. """Delete segment files left by aborted transactions
  304. """
  305. self.head = None
  306. self.segment = 0
  307. # FIXME: Only delete segments if we're sure there's at least
  308. # one complete segment somewhere
  309. for segment, filename in self._segment_names(reverse=True):
  310. if self.is_complete_segment(filename):
  311. self.head = segment
  312. self.segment = self.head + 1
  313. return
  314. else:
  315. os.unlink(filename)
  316. def is_complete_segment(self, filename):
  317. with open(filename, 'rb') as fd:
  318. try:
  319. fd.seek(-self.header_fmt.size, os.SEEK_END)
  320. except Exception as e:
  321. # return False if segment file is empty or too small
  322. if e.errno == errno.EINVAL:
  323. return False
  324. raise e
  325. return fd.read(self.header_fmt.size) == self.COMMIT
  326. def segment_filename(self, segment):
  327. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  328. def get_write_fd(self, no_new=False):
  329. if not no_new and self.offset and self.offset > self.limit:
  330. self.close_segment()
  331. if not self._write_fd:
  332. if self.segment % self.segments_per_dir == 0:
  333. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  334. if not os.path.exists(dirname):
  335. os.mkdir(dirname)
  336. self._write_fd = open(self.segment_filename(self.segment), 'ab')
  337. self._write_fd.write(MAGIC)
  338. self.offset = 8
  339. return self._write_fd
  340. def get_fd(self, segment):
  341. try:
  342. return self.fds[segment]
  343. except KeyError:
  344. fd = open(self.segment_filename(segment), 'rb')
  345. self.fds[segment] = fd
  346. return fd
  347. def delete_segment(self, segment):
  348. try:
  349. os.unlink(self.segment_filename(segment))
  350. except OSError:
  351. pass
  352. def iter_objects(self, segment, lookup=None, include_data=False):
  353. fd = self.get_fd(segment)
  354. fd.seek(0)
  355. if fd.read(8) != MAGIC:
  356. raise IntegrityError('Invalid segment header')
  357. offset = 8
  358. header = fd.read(self.header_fmt.size)
  359. while header:
  360. crc, size, tag = self.header_fmt.unpack(header)
  361. if size > MAX_OBJECT_SIZE:
  362. raise IntegrityError('Invalid segment object size')
  363. rest = fd.read(size - self.header_fmt.size)
  364. if crc32(rest, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  365. raise IntegrityError('Segment checksum mismatch')
  366. if tag not in (TAG_PUT, TAG_DELETE, TAG_COMMIT):
  367. raise IntegrityError('Invalid segment entry header')
  368. key = None
  369. if tag in (TAG_PUT, TAG_DELETE):
  370. key = rest[:32]
  371. if not lookup or lookup(tag, key):
  372. if include_data:
  373. yield tag, key, rest[32:]
  374. else:
  375. yield tag, key, offset
  376. offset += size
  377. header = fd.read(self.header_fmt.size)
  378. def read(self, segment, offset, id):
  379. if segment == self.segment:
  380. self._write_fd.flush()
  381. fd = self.get_fd(segment)
  382. fd.seek(offset)
  383. header = fd.read(self.put_header_fmt.size)
  384. crc, size, tag, key = self.put_header_fmt.unpack(header)
  385. if size > MAX_OBJECT_SIZE:
  386. raise IntegrityError('Invalid segment object size')
  387. data = fd.read(size - self.put_header_fmt.size)
  388. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  389. raise IntegrityError('Segment checksum mismatch')
  390. if tag != TAG_PUT or id != key:
  391. raise IntegrityError('Invalid segment entry header')
  392. return data
  393. def write_put(self, id, data):
  394. size = len(data) + self.put_header_fmt.size
  395. fd = self.get_write_fd()
  396. offset = self.offset
  397. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  398. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  399. fd.write(b''.join((crc, header, id, data)))
  400. self.offset += size
  401. return self.segment, offset
  402. def write_delete(self, id):
  403. fd = self.get_write_fd()
  404. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  405. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  406. fd.write(b''.join((crc, header, id)))
  407. self.offset += self.put_header_fmt.size
  408. return self.segment
  409. def write_commit(self):
  410. fd = self.get_write_fd(no_new=True)
  411. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  412. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  413. fd.write(b''.join((crc, header)))
  414. self.head = self.segment
  415. self.close_segment()
  416. def close_segment(self):
  417. if self._write_fd:
  418. self.segment += 1
  419. self.offset = 0
  420. os.fsync(self._write_fd)
  421. self._write_fd.close()
  422. self._write_fd = None