repository.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. from configparser import RawConfigParser
  2. from binascii import hexlify
  3. import os
  4. import re
  5. import shutil
  6. import struct
  7. from zlib import crc32
  8. from .hashindex import NSIndex
  9. from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock
  10. from .lrucache import LRUCache
  11. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  12. MAGIC = b'ATTICSEG'
  13. TAG_PUT = 0
  14. TAG_DELETE = 1
  15. TAG_COMMIT = 2
  16. class Repository(object):
  17. """Filesystem based transactional key value store
  18. On disk layout:
  19. dir/README
  20. dir/config
  21. dir/data/<X / SEGMENTS_PER_DIR>/<X>
  22. dir/index.X
  23. dir/hints.X
  24. """
  25. DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
  26. DEFAULT_SEGMENTS_PER_DIR = 10000
  27. class DoesNotExist(Error):
  28. """Repository {} does not exist"""
  29. class AlreadyExists(Error):
  30. """Repository {} already exists"""
  31. class InvalidRepository(Error):
  32. """{} is not a valid repository"""
  33. def __init__(self, path, create=False):
  34. self.io = None
  35. self.lock = None
  36. if create:
  37. self.create(path)
  38. self.open(path)
  39. def __del__(self):
  40. self.close()
  41. def create(self, path):
  42. """Create a new empty repository at `path`
  43. """
  44. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  45. raise self.AlreadyExists(path)
  46. if not os.path.exists(path):
  47. os.mkdir(path)
  48. with open(os.path.join(path, 'README'), 'w') as fd:
  49. fd.write('This is an Attic repository\n')
  50. os.mkdir(os.path.join(path, 'data'))
  51. config = RawConfigParser()
  52. config.add_section('repository')
  53. config.set('repository', 'version', '1')
  54. config.set('repository', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR)
  55. config.set('repository', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE)
  56. config.set('repository', 'id', hexlify(os.urandom(32)).decode('ascii'))
  57. with open(os.path.join(path, 'config'), 'w') as fd:
  58. config.write(fd)
  59. def open(self, path):
  60. self.head = None
  61. self.path = path
  62. if not os.path.isdir(path):
  63. raise self.DoesNotExist(path)
  64. self.config = RawConfigParser()
  65. self.config.read(os.path.join(self.path, 'config'))
  66. if not 'repository' in self.config.sections() or self.config.getint('repository', 'version') != 1:
  67. raise self.InvalidRepository(path)
  68. self.lock = UpgradableLock(os.path.join(path, 'config'))
  69. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  70. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  71. self.id = unhexlify(self.config.get('repository', 'id').strip())
  72. self.rollback()
  73. def close(self):
  74. if self.lock:
  75. self.rollback()
  76. self.lock.release()
  77. self.lock = None
  78. def commit(self, rollback=True):
  79. """Commit transaction
  80. """
  81. self.io.write_commit()
  82. self.compact_segments()
  83. self.write_index()
  84. self.rollback()
  85. def _available_indices(self, reverse=False):
  86. names = [int(name[6:]) for name in os.listdir(self.path) if re.match('index\.\d+', name)]
  87. names.sort(reverse=reverse)
  88. return names
  89. def open_index(self, head, read_only=False):
  90. if head is None:
  91. self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8'))
  92. self.segments = {}
  93. self.compact = set()
  94. else:
  95. if read_only:
  96. self.index = NSIndex((os.path.join(self.path, 'index.%d') % head).encode('utf-8'), readonly=True)
  97. else:
  98. shutil.copy(os.path.join(self.path, 'index.%d' % head),
  99. os.path.join(self.path, 'index.tmp'))
  100. self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8'))
  101. hints = read_msgpack(os.path.join(self.path, 'hints.%d' % head))
  102. if hints[b'version'] != 1:
  103. raise ValueError('Unknown hints file version: %d' % hints['version'])
  104. self.segments = hints[b'segments']
  105. self.compact = set(hints[b'compact'])
  106. def write_index(self):
  107. hints = {b'version': 1,
  108. b'segments': self.segments,
  109. b'compact': list(self.compact)}
  110. write_msgpack(os.path.join(self.path, 'hints.%d' % self.io.head), hints)
  111. self.index.flush()
  112. os.rename(os.path.join(self.path, 'index.tmp'),
  113. os.path.join(self.path, 'index.%d' % self.io.head))
  114. # Remove old indices
  115. current = '.%d' % self.io.head
  116. for name in os.listdir(self.path):
  117. if not name.startswith('index.') and not name.startswith('hints.'):
  118. continue
  119. if name.endswith(current):
  120. continue
  121. os.unlink(os.path.join(self.path, name))
  122. def compact_segments(self):
  123. """Compact sparse segments by copying data into new segments
  124. """
  125. if not self.compact:
  126. return
  127. def lookup(tag, key):
  128. return tag == TAG_PUT and self.index.get(key, (-1, -1))[0] == segment
  129. segments = self.segments
  130. for segment in sorted(self.compact):
  131. if segments[segment] > 0:
  132. for tag, key, data in self.io.iter_objects(segment, lookup, include_data=True):
  133. new_segment, offset = self.io.write_put(key, data)
  134. self.index[key] = new_segment, offset
  135. segments.setdefault(new_segment, 0)
  136. segments[new_segment] += 1
  137. segments[segment] -= 1
  138. assert segments[segment] == 0
  139. self.io.write_commit()
  140. for segment in self.compact:
  141. assert self.segments.pop(segment) == 0
  142. self.io.delete_segment(segment)
  143. self.compact = set()
  144. def recover(self, path):
  145. """Recover missing index by replaying logs"""
  146. start = None
  147. available = self._available_indices()
  148. if available:
  149. start = available[-1]
  150. self.open_index(start)
  151. for segment, filename in self.io._segment_names():
  152. if start is not None and segment <= start:
  153. continue
  154. self.segments[segment] = 0
  155. for tag, key, offset in self.io.iter_objects(segment):
  156. if tag == TAG_PUT:
  157. try:
  158. s, _ = self.index[key]
  159. self.compact.add(s)
  160. self.segments[s] -= 1
  161. except KeyError:
  162. pass
  163. self.index[key] = segment, offset
  164. self.segments[segment] += 1
  165. elif tag == TAG_DELETE:
  166. try:
  167. s, _ = self.index.pop(key)
  168. self.segments[s] -= 1
  169. self.compact.add(s)
  170. self.compact.add(segment)
  171. except KeyError:
  172. pass
  173. if self.segments[segment] == 0:
  174. self.compact.add(segment)
  175. if self.io.head is not None:
  176. self.write_index()
  177. def rollback(self):
  178. """
  179. """
  180. self._active_txn = False
  181. if self.io:
  182. self.io.close()
  183. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  184. if self.io.head is not None and not os.path.exists(os.path.join(self.path, 'index.%d' % self.io.head)):
  185. self.lock.upgrade()
  186. self.recover(self.path)
  187. self.open_index(self.io.head, read_only=True)
  188. def _len(self):
  189. return len(self.index)
  190. def get(self, id):
  191. try:
  192. segment, offset = self.index[id]
  193. return self.io.read(segment, offset, id)
  194. except KeyError:
  195. raise self.DoesNotExist
  196. def get_many(self, ids, peek=None):
  197. for id in ids:
  198. yield self.get(id)
  199. def put(self, id, data, wait=True):
  200. if not self._active_txn:
  201. self._active_txn = True
  202. self.lock.upgrade()
  203. self.open_index(self.io.head)
  204. try:
  205. segment, _ = self.index[id]
  206. self.segments[segment] -= 1
  207. self.compact.add(segment)
  208. segment = self.io.write_delete(id)
  209. self.segments.setdefault(segment, 0)
  210. self.compact.add(segment)
  211. except KeyError:
  212. pass
  213. segment, offset = self.io.write_put(id, data)
  214. self.segments.setdefault(segment, 0)
  215. self.segments[segment] += 1
  216. self.index[id] = segment, offset
  217. def delete(self, id, wait=True):
  218. if not self._active_txn:
  219. self._active_txn = True
  220. self.lock.upgrade()
  221. self.open_index(self.io.head)
  222. try:
  223. segment, offset = self.index.pop(id)
  224. self.segments[segment] -= 1
  225. self.compact.add(segment)
  226. segment = self.io.write_delete(id)
  227. self.compact.add(segment)
  228. self.segments.setdefault(segment, 0)
  229. except KeyError:
  230. raise self.DoesNotExist
  231. def add_callback(self, cb, data):
  232. cb(None, None, data)
  233. class LoggedIO(object):
  234. header_fmt = struct.Struct('<IIB')
  235. assert header_fmt.size == 9
  236. put_header_fmt = struct.Struct('<IIB32s')
  237. assert put_header_fmt.size == 41
  238. header_no_crc_fmt = struct.Struct('<IB')
  239. assert header_no_crc_fmt.size == 5
  240. crc_fmt = struct.Struct('<I')
  241. assert crc_fmt.size == 4
  242. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  243. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  244. def __init__(self, path, limit, segments_per_dir, capacity=100):
  245. self.path = path
  246. self.fds = LRUCache(capacity)
  247. self.segment = None
  248. self.limit = limit
  249. self.segments_per_dir = segments_per_dir
  250. self.offset = 0
  251. self._write_fd = None
  252. self.head = None
  253. self.cleanup()
  254. def close(self):
  255. for segment in list(self.fds.keys()):
  256. self.fds.pop(segment).close()
  257. self.close_segment()
  258. self.fds = None # Just to make sure we're disabled
  259. def _segment_names(self, reverse=False):
  260. for dirpath, dirs, filenames in os.walk(os.path.join(self.path, 'data')):
  261. dirs.sort(key=int, reverse=reverse)
  262. filenames.sort(key=int, reverse=reverse)
  263. for filename in filenames:
  264. yield int(filename), os.path.join(dirpath, filename)
  265. def cleanup(self):
  266. """Delete segment files left by aborted transactions
  267. """
  268. self.head = None
  269. self.segment = 0
  270. for segment, filename in self._segment_names(reverse=True):
  271. if self.is_complete_segment(filename):
  272. self.head = segment
  273. self.segment = self.head + 1
  274. return
  275. else:
  276. os.unlink(filename)
  277. def is_complete_segment(self, filename):
  278. with open(filename, 'rb') as fd:
  279. fd.seek(-self.header_fmt.size, 2)
  280. return fd.read(self.header_fmt.size) == self.COMMIT
  281. def segment_filename(self, segment):
  282. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  283. def get_write_fd(self, no_new=False):
  284. if not no_new and self.offset and self.offset > self.limit:
  285. self.close_segment()
  286. if not self._write_fd:
  287. if self.segment % self.segments_per_dir == 0:
  288. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  289. if not os.path.exists(dirname):
  290. os.mkdir(dirname)
  291. self._write_fd = open(self.segment_filename(self.segment), 'ab')
  292. self._write_fd.write(MAGIC)
  293. self.offset = 8
  294. return self._write_fd
  295. def get_fd(self, segment):
  296. try:
  297. return self.fds[segment]
  298. except KeyError:
  299. fd = open(self.segment_filename(segment), 'rb')
  300. self.fds[segment] = fd
  301. return fd
  302. def delete_segment(self, segment):
  303. try:
  304. os.unlink(self.segment_filename(segment))
  305. except OSError:
  306. pass
  307. def iter_objects(self, segment, lookup=None, include_data=False):
  308. fd = self.get_fd(segment)
  309. fd.seek(0)
  310. if fd.read(8) != MAGIC:
  311. raise IntegrityError('Invalid segment header')
  312. offset = 8
  313. header = fd.read(self.header_fmt.size)
  314. while header:
  315. crc, size, tag = self.header_fmt.unpack(header)
  316. if size > MAX_OBJECT_SIZE:
  317. raise IntegrityError('Invalid segment object size')
  318. rest = fd.read(size - self.header_fmt.size)
  319. if crc32(rest, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  320. raise IntegrityError('Segment checksum mismatch')
  321. if tag not in (TAG_PUT, TAG_DELETE, TAG_COMMIT):
  322. raise IntegrityError('Invalid segment entry header')
  323. key = None
  324. if tag in (TAG_PUT, TAG_DELETE):
  325. key = rest[:32]
  326. if not lookup or lookup(tag, key):
  327. if include_data:
  328. yield tag, key, rest[32:]
  329. else:
  330. yield tag, key, offset
  331. offset += size
  332. header = fd.read(self.header_fmt.size)
  333. def read(self, segment, offset, id):
  334. if segment == self.segment:
  335. self._write_fd.flush()
  336. fd = self.get_fd(segment)
  337. fd.seek(offset)
  338. header = fd.read(self.put_header_fmt.size)
  339. crc, size, tag, key = self.put_header_fmt.unpack(header)
  340. if size > MAX_OBJECT_SIZE:
  341. raise IntegrityError('Invalid segment object size')
  342. data = fd.read(size - self.put_header_fmt.size)
  343. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  344. raise IntegrityError('Segment checksum mismatch')
  345. if tag != TAG_PUT or id != key:
  346. raise IntegrityError('Invalid segment entry header')
  347. return data
  348. def write_put(self, id, data):
  349. size = len(data) + self.put_header_fmt.size
  350. fd = self.get_write_fd()
  351. offset = self.offset
  352. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  353. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  354. fd.write(b''.join((crc, header, id, data)))
  355. self.offset += size
  356. return self.segment, offset
  357. def write_delete(self, id):
  358. fd = self.get_write_fd()
  359. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  360. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  361. fd.write(b''.join((crc, header, id)))
  362. self.offset += self.put_header_fmt.size
  363. return self.segment
  364. def write_commit(self):
  365. fd = self.get_write_fd(no_new=True)
  366. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  367. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  368. fd.write(b''.join((crc, header)))
  369. self.head = self.segment
  370. self.close_segment()
  371. def close_segment(self):
  372. if self._write_fd:
  373. self.segment += 1
  374. self.offset = 0
  375. os.fsync(self._write_fd)
  376. self._write_fd.close()
  377. self._write_fd = None