store.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. from __future__ import with_statement
  2. from ConfigParser import RawConfigParser
  3. import errno
  4. import fcntl
  5. import os
  6. import msgpack
  7. import shutil
  8. import struct
  9. import tempfile
  10. import unittest
  11. from zlib import crc32
  12. from .hashindex import NSIndex
  13. from .helpers import IntegrityError, deferrable
  14. from .lrucache import LRUCache
  15. class Store(object):
  16. """Filesystem based transactional key value store
  17. On disk layout:
  18. dir/README
  19. dir/config
  20. dir/data/<X / SEGMENTS_PER_DIR>/<X>
  21. dir/segments
  22. dir/index
  23. """
  24. DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
  25. DEFAULT_SEGMENTS_PER_DIR = 10000
  26. class DoesNotExist(KeyError):
  27. """Requested key does not exist"""
  28. def __init__(self, path, create=False):
  29. self.txn_active = False
  30. if create:
  31. self.create(path)
  32. self.open(path)
  33. def create(self, path):
  34. """Create a new empty store at `path`
  35. """
  36. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  37. raise Exception('Path "%s" already exists' % path)
  38. if not os.path.exists(path):
  39. os.mkdir(path)
  40. with open(os.path.join(path, 'README'), 'wb') as fd:
  41. fd.write('This is a DARC store')
  42. os.mkdir(os.path.join(path, 'data'))
  43. config = RawConfigParser()
  44. config.add_section('store')
  45. config.set('store', 'version', '1')
  46. config.set('store', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR)
  47. config.set('store', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE)
  48. config.set('store', 'next_segment', '0')
  49. config.add_section('meta')
  50. config.set('meta', 'manifest', '')
  51. config.set('meta', 'id', os.urandom(32).encode('hex'))
  52. NSIndex.create(os.path.join(path, 'index'))
  53. self.write_dict(os.path.join(path, 'segments'), {})
  54. with open(os.path.join(path, 'config'), 'w') as fd:
  55. config.write(fd)
  56. def open(self, path):
  57. self.path = path
  58. if not os.path.isdir(path):
  59. raise Exception('%s Does not look like a darc store' % path)
  60. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  61. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  62. self.rollback()
  63. def read_dict(self, filename):
  64. with open(filename, 'rb') as fd:
  65. return msgpack.unpackb(fd.read())
  66. def write_dict(self, filename, d):
  67. with open(filename+'.tmp', 'wb') as fd:
  68. fd.write(msgpack.packb(d))
  69. os.rename(filename+'.tmp', filename)
  70. def delete_segments(self):
  71. delete_path = os.path.join(self.path, 'delete')
  72. if os.path.exists(delete_path):
  73. segments = self.read_dict(os.path.join(self.path, 'segments'))
  74. for segment in self.read_dict(delete_path):
  75. assert segments.pop(segment, 0) == 0
  76. self.io.delete_segment(segment, missing_ok=True)
  77. self.write_dict(os.path.join(self.path, 'segments'), segments)
  78. def begin_txn(self):
  79. txn_dir = os.path.join(self.path, 'txn.tmp')
  80. # Initialize transaction snapshot
  81. os.mkdir(txn_dir)
  82. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  83. shutil.copy(os.path.join(self.path, 'index'), txn_dir)
  84. shutil.copy(os.path.join(self.path, 'segments'), txn_dir)
  85. os.rename(os.path.join(self.path, 'txn.tmp'),
  86. os.path.join(self.path, 'txn.active'))
  87. self.compact = set()
  88. self.txn_active = True
  89. def close(self):
  90. self.rollback()
  91. self.lock_fd.close()
  92. def commit(self, meta=None):
  93. """Commit transaction
  94. """
  95. meta = meta or self.meta
  96. self.compact_segments()
  97. self.io.close()
  98. self.config.set('store', 'next_segment', self.io.segment + 1)
  99. self.config.remove_section('meta')
  100. self.config.add_section('meta')
  101. for k, v in meta.items():
  102. self.config.set('meta', k, v)
  103. with open(os.path.join(self.path, 'config'), 'w') as fd:
  104. self.config.write(fd)
  105. self.index.flush()
  106. self.write_dict(os.path.join(self.path, 'segments'), self.segments)
  107. # If we crash before this line, the transaction will be
  108. # rolled back by open()
  109. os.rename(os.path.join(self.path, 'txn.active'),
  110. os.path.join(self.path, 'txn.commit'))
  111. self.rollback()
  112. def compact_segments(self):
  113. """Compact sparse segments by copying data into new segments
  114. """
  115. if not self.compact:
  116. return
  117. self.io.close_segment()
  118. def lookup(key):
  119. return self.index.get(key, (-1, -1))[0] == segment
  120. segments = self.segments
  121. for segment in self.compact:
  122. if segments[segment] > 0:
  123. for key, data in self.io.iter_objects(segment, lookup):
  124. new_segment, offset = self.io.write(key, data)
  125. self.index[key] = new_segment, offset
  126. segments.setdefault(new_segment, 0)
  127. segments[new_segment] += 1
  128. segments[segment] -= 1
  129. self.write_dict(os.path.join(self.path, 'delete'), tuple(self.compact))
  130. def rollback(self):
  131. """
  132. """
  133. # Commit any half committed transaction
  134. if os.path.exists(os.path.join(self.path, 'txn.commit')):
  135. self.delete_segments()
  136. os.rename(os.path.join(self.path, 'txn.commit'),
  137. os.path.join(self.path, 'txn.tmp'))
  138. delete_path = os.path.join(self.path, 'delete')
  139. if os.path.exists(delete_path):
  140. os.unlink(delete_path)
  141. # Roll back active transaction
  142. txn_dir = os.path.join(self.path, 'txn.active')
  143. if os.path.exists(txn_dir):
  144. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  145. shutil.copy(os.path.join(txn_dir, 'index'), self.path)
  146. shutil.copy(os.path.join(txn_dir, 'segments'), self.path)
  147. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  148. # Remove partially removed transaction
  149. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  150. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  151. self.index = NSIndex(os.path.join(self.path, 'index'))
  152. self.segments = self.read_dict(os.path.join(self.path, 'segments'))
  153. self.config = RawConfigParser()
  154. self.config.read(os.path.join(self.path, 'config'))
  155. if self.config.getint('store', 'version') != 1:
  156. raise Exception('%s Does not look like a darc store')
  157. next_segment = self.config.getint('store', 'next_segment')
  158. max_segment_size = self.config.getint('store', 'max_segment_size')
  159. segments_per_dir = self.config.getint('store', 'segments_per_dir')
  160. self.meta = dict(self.config.items('meta'))
  161. self.io = SegmentIO(self.path, next_segment, max_segment_size, segments_per_dir)
  162. self.io.cleanup()
  163. self.txn_active = False
  164. @deferrable
  165. def get(self, id):
  166. try:
  167. segment, offset = self.index[id]
  168. return self.io.read(segment, offset, id)
  169. except KeyError:
  170. raise self.DoesNotExist
  171. @deferrable
  172. def put(self, id, data):
  173. if not self.txn_active:
  174. self.begin_txn()
  175. try:
  176. segment, _ = self.index[id]
  177. self.segments[segment] -= 1
  178. self.compact.add(segment)
  179. except KeyError:
  180. pass
  181. segment, offset = self.io.write(id, data)
  182. self.segments.setdefault(segment, 0)
  183. self.segments[segment] += 1
  184. self.index[id] = segment, offset
  185. @deferrable
  186. def delete(self, id):
  187. if not self.txn_active:
  188. self.begin_txn()
  189. try:
  190. segment, offset = self.index.pop(id)
  191. self.segments[segment] -= 1
  192. self.compact.add(segment)
  193. except KeyError:
  194. raise self.DoesNotExist
  195. def flush_rpc(self, *args):
  196. pass
  197. class SegmentIO(object):
  198. header_fmt = struct.Struct('<IBI32s')
  199. assert header_fmt.size == 41
  200. def __init__(self, path, next_segment, limit, segments_per_dir, capacity=100):
  201. self.path = path
  202. self.fds = LRUCache(capacity)
  203. self.segment = next_segment
  204. self.limit = limit
  205. self.segments_per_dir = segments_per_dir
  206. self.offset = 0
  207. def close(self):
  208. for segment in self.fds.keys():
  209. self.fds.pop(segment).close()
  210. self.fds = None # Just to make sure we're disabled
  211. def cleanup(self):
  212. """Delete segment files left by aborted transactions
  213. """
  214. segment = self.segment
  215. while True:
  216. filename = self.segment_filename(segment)
  217. if not os.path.exists(filename):
  218. break
  219. os.unlink(filename)
  220. segment += 1
  221. def segment_filename(self, segment):
  222. return os.path.join(self.path, 'data', str(segment / self.segments_per_dir), str(segment))
  223. def get_fd(self, segment, write=False):
  224. try:
  225. return self.fds[segment]
  226. except KeyError:
  227. if write and segment % self.segments_per_dir == 0:
  228. dirname = os.path.join(self.path, 'data', str(segment / self.segments_per_dir))
  229. if not os.path.exists(dirname):
  230. os.mkdir(dirname)
  231. fd = open(self.segment_filename(segment), write and 'w+' or 'rb')
  232. self.fds[segment] = fd
  233. return fd
  234. def delete_segment(self, segment, missing_ok=False):
  235. try:
  236. os.unlink(self.segment_filename(segment))
  237. except OSError, e:
  238. if not missing_ok or e.errno != errno.ENOENT:
  239. raise
  240. def read(self, segment, offset, id):
  241. fd = self.get_fd(segment)
  242. fd.seek(offset)
  243. data = fd.read(self.header_fmt.size)
  244. size, magic, hash, id_ = self.header_fmt.unpack(data)
  245. if magic != 0 or id != id_:
  246. raise IntegrityError('Invalid segment entry header')
  247. data = fd.read(size - self.header_fmt.size)
  248. if crc32(data) & 0xffffffff != hash:
  249. raise IntegrityError('Segment checksum mismatch')
  250. return data
  251. def iter_objects(self, segment, lookup):
  252. fd = self.get_fd(segment)
  253. fd.seek(0)
  254. if fd.read(8) != 'DSEGMENT':
  255. raise IntegrityError('Invalid segment header')
  256. offset = 8
  257. data = fd.read(self.header_fmt.size)
  258. while data:
  259. size, magic, hash, key = self.header_fmt.unpack(data)
  260. if magic != 0:
  261. raise IntegrityError('Unknown segment entry header')
  262. offset += size
  263. if lookup(key):
  264. data = fd.read(size - self.header_fmt.size)
  265. if crc32(data) & 0xffffffff != hash:
  266. raise IntegrityError('Segment checksum mismatch')
  267. yield key, data
  268. else:
  269. fd.seek(offset)
  270. data = fd.read(self.header_fmt.size)
  271. def write(self, id, data):
  272. size = len(data) + self.header_fmt.size
  273. if self.offset and self.offset + size > self.limit:
  274. self.close_segment()
  275. fd = self.get_fd(self.segment, write=True)
  276. fd.seek(self.offset)
  277. if self.offset == 0:
  278. fd.write('DSEGMENT')
  279. self.offset = 8
  280. offset = self.offset
  281. hash = crc32(data) & 0xffffffff
  282. fd.write(self.header_fmt.pack(size, 0, hash, id))
  283. fd.write(data)
  284. self.offset += size
  285. return self.segment, offset
  286. def close_segment(self):
  287. self.segment += 1
  288. self.offset = 0
  289. class StoreTestCase(unittest.TestCase):
  290. def setUp(self):
  291. self.tmppath = tempfile.mkdtemp()
  292. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  293. def tearDown(self):
  294. shutil.rmtree(self.tmppath)
  295. def test1(self):
  296. for x in range(100):
  297. self.store.put('%-32d' % x, 'SOMEDATA')
  298. key50 = '%-32d' % 50
  299. self.assertEqual(self.store.get(key50), 'SOMEDATA')
  300. self.store.delete(key50)
  301. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(key50))
  302. self.store.commit()
  303. self.store.close()
  304. store2 = Store(os.path.join(self.tmppath, 'store'))
  305. def test2(self):
  306. """Test multiple sequential transactions
  307. """
  308. self.store.put('00000000000000000000000000000000', 'foo')
  309. self.store.put('00000000000000000000000000000001', 'foo')
  310. self.store.commit()
  311. self.store.delete('00000000000000000000000000000000')
  312. self.store.put('00000000000000000000000000000001', 'bar')
  313. self.store.commit()
  314. self.assertEqual(self.store.get('00000000000000000000000000000001'), 'bar')
  315. def suite():
  316. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  317. if __name__ == '__main__':
  318. unittest.main()