store.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. from __future__ import with_statement
  2. from ConfigParser import RawConfigParser
  3. import errno
  4. import fcntl
  5. import os
  6. import shutil
  7. import struct
  8. import tempfile
  9. import unittest
  10. from zlib import crc32
  11. from .hashindex import NSIndex, BandIndex
  12. from .helpers import IntegrityError, read_set, write_set, deferrable
  13. from .lrucache import LRUCache
  14. class Store(object):
  15. """Filesystem based transactional key value store
  16. On disk layout:
  17. dir/README
  18. dir/config
  19. dir/bands/<X / BANDS_PER_DIR>/<X>
  20. dir/indexes/<NS>
  21. """
  22. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  23. DEFAULT_BANDS_PER_DIR = 10000
  24. class DoesNotExist(KeyError):
  25. """Requested key does not exist"""
  26. def __init__(self, path, create=False):
  27. self.txn_active = False
  28. if create:
  29. self.create(path)
  30. self.open(path)
  31. def create(self, path):
  32. """Create a new empty store at `path`
  33. """
  34. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  35. raise Exception('Path "%s" already exists' % path)
  36. if not os.path.exists(path):
  37. os.mkdir(path)
  38. with open(os.path.join(path, 'README'), 'wb') as fd:
  39. fd.write('This is a DARC store')
  40. os.mkdir(os.path.join(path, 'bands'))
  41. os.mkdir(os.path.join(path, 'indexes'))
  42. config = RawConfigParser()
  43. config.add_section('store')
  44. config.set('store', 'version', '1')
  45. config.set('store', 'id', os.urandom(32).encode('hex'))
  46. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  47. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  48. config.add_section('state')
  49. config.set('state', 'next_band', '0')
  50. config.set('state', 'tid', '0')
  51. with open(os.path.join(path, 'config'), 'w') as fd:
  52. config.write(fd)
  53. def open(self, path):
  54. self.path = path
  55. if not os.path.isdir(path):
  56. raise Exception('%s Does not look like a darc store' % path)
  57. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  58. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  59. self.rollback()
  60. self.config = RawConfigParser()
  61. self.config.read(os.path.join(path, 'config'))
  62. if self.config.getint('store', 'version') != 1:
  63. raise Exception('%s Does not look like a darc store')
  64. self.id = self.config.get('store', 'id').decode('hex')
  65. self.tid = self.config.getint('state', 'tid')
  66. next_band = self.config.getint('state', 'next_band')
  67. max_band_size = self.config.getint('store', 'max_band_size')
  68. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  69. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  70. self.io.cleanup()
  71. def delete_bands(self):
  72. delete_path = os.path.join(self.path, 'indexes', 'delete')
  73. if os.path.exists(delete_path):
  74. bands = self.get_index('bands')
  75. for band in read_set(delete_path):
  76. assert bands.pop(band, 0) == 0
  77. self.io.delete_band(band, missing_ok=True)
  78. os.unlink(delete_path)
  79. def begin_txn(self):
  80. txn_dir = os.path.join(self.path, 'txn.tmp')
  81. # Initialize transaction snapshot
  82. os.mkdir(txn_dir)
  83. shutil.copytree(os.path.join(self.path, 'indexes'),
  84. os.path.join(txn_dir, 'indexes'))
  85. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  86. os.rename(os.path.join(self.path, 'txn.tmp'),
  87. os.path.join(self.path, 'txn.active'))
  88. self.compact = set()
  89. self.txn_active = True
  90. def close(self):
  91. self.rollback()
  92. self.lock_fd.close()
  93. def commit(self):
  94. """Commit transaction, `tid` will be increased by 1
  95. """
  96. self.compact_bands()
  97. self.io.close()
  98. self.tid += 1
  99. self.config.set('state', 'tid', self.tid)
  100. self.config.set('state', 'next_band', self.io.band + 1)
  101. with open(os.path.join(self.path, 'config'), 'w') as fd:
  102. self.config.write(fd)
  103. for i in self.indexes.values():
  104. i.flush()
  105. # If we crash before this line, the transaction will be
  106. # rolled back by open()
  107. os.rename(os.path.join(self.path, 'txn.active'),
  108. os.path.join(self.path, 'txn.commit'))
  109. self.rollback()
  110. def compact_bands(self):
  111. """Compact sparse bands by copying data into new bands
  112. """
  113. if not self.compact:
  114. return
  115. self.io.close_band()
  116. def lookup(ns, key):
  117. return key in self.get_index(ns)
  118. bands = self.get_index('bands')
  119. for band in self.compact:
  120. if bands[band] > 0:
  121. for ns, key, data in self.io.iter_objects(band, lookup):
  122. new_band, offset = self.io.write(ns, key, data)
  123. self.indexes[ns][key] = new_band, offset
  124. bands[band] -= 1
  125. bands.setdefault(new_band, 0)
  126. bands[new_band] += 1
  127. write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
  128. def rollback(self):
  129. """
  130. """
  131. # Commit any half committed transaction
  132. if os.path.exists(os.path.join(self.path, 'txn.commit')):
  133. self.delete_bands()
  134. os.rename(os.path.join(self.path, 'txn.commit'),
  135. os.path.join(self.path, 'txn.tmp'))
  136. # Roll back active transaction
  137. txn_dir = os.path.join(self.path, 'txn.active')
  138. if os.path.exists(txn_dir):
  139. shutil.rmtree(os.path.join(self.path, 'indexes'))
  140. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  141. os.path.join(self.path, 'indexes'))
  142. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  143. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  144. # Remove partially removed transaction
  145. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  146. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  147. self.indexes = {}
  148. self.txn_active = False
  149. def get_index(self, ns):
  150. try:
  151. return self.indexes[ns]
  152. except KeyError:
  153. if ns == 'bands':
  154. filename = os.path.join(self.path, 'indexes', 'bands')
  155. cls = BandIndex
  156. else:
  157. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  158. cls = NSIndex
  159. if os.path.exists(filename):
  160. self.indexes[ns] = cls(filename)
  161. else:
  162. self.indexes[ns] = cls.create(filename)
  163. return self.indexes[ns]
  164. @deferrable
  165. def get(self, ns, id):
  166. try:
  167. band, offset = self.get_index(ns)[id]
  168. return self.io.read(band, offset, ns, id)
  169. except KeyError:
  170. raise self.DoesNotExist
  171. @deferrable
  172. def put(self, ns, id, data):
  173. if not self.txn_active:
  174. self.begin_txn()
  175. band, offset = self.io.write(ns, id, data)
  176. bands = self.get_index('bands')
  177. bands.setdefault(band, 0)
  178. bands[band] += 1
  179. self.get_index(ns)[id] = band, offset
  180. @deferrable
  181. def delete(self, ns, id):
  182. if not self.txn_active:
  183. self.begin_txn()
  184. try:
  185. band, offset = self.get_index(ns).pop(id)
  186. self.get_index('bands')[band] -= 1
  187. self.compact.add(band)
  188. except KeyError:
  189. raise self.DoesNotExist
  190. @deferrable
  191. def list(self, ns, marker=None, limit=1000000):
  192. return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
  193. def flush_rpc(self, *args):
  194. pass
  195. class BandIO(object):
  196. header_fmt = struct.Struct('<IBIB32s')
  197. assert header_fmt.size == 42
  198. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  199. self.path = path
  200. self.fds = LRUCache(capacity)
  201. self.band = nextband
  202. self.limit = limit
  203. self.bands_per_dir = bands_per_dir
  204. self.offset = 0
  205. def close(self):
  206. for band in self.fds.keys():
  207. self.fds.pop(band).close()
  208. def cleanup(self):
  209. """Delete band files left by aborted transactions
  210. """
  211. band = self.band
  212. while True:
  213. filename = self.band_filename(band)
  214. if not os.path.exists(filename):
  215. break
  216. os.unlink(filename)
  217. band += 1
  218. def band_filename(self, band):
  219. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  220. def get_fd(self, band, write=False):
  221. try:
  222. return self.fds[band]
  223. except KeyError:
  224. if write and band % 1000 == 0:
  225. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  226. if not os.path.exists(dirname):
  227. os.mkdir(dirname)
  228. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  229. self.fds[band] = fd
  230. return fd
  231. def delete_band(self, band, missing_ok=False):
  232. try:
  233. os.unlink(self.band_filename(band))
  234. except OSError, e:
  235. if not missing_ok or e.errno != errno.ENOENT:
  236. raise
  237. def read(self, band, offset, ns, id):
  238. fd = self.get_fd(band)
  239. fd.seek(offset)
  240. data = fd.read(self.header_fmt.size)
  241. size, magic, hash, ns_, id_ = self.header_fmt.unpack(data)
  242. if magic != 0 or ns != ns_ or id != id_:
  243. raise IntegrityError('Invalid band entry header')
  244. data = fd.read(size - self.header_fmt.size)
  245. if crc32(data) & 0xffffffff != hash:
  246. raise IntegrityError('Band checksum mismatch')
  247. return data
  248. def iter_objects(self, band, lookup):
  249. fd = self.get_fd(band)
  250. fd.seek(0)
  251. if fd.read(8) != 'DARCBAND':
  252. raise IntegrityError('Invalid band header')
  253. offset = 8
  254. data = fd.read(self.header_fmt.size)
  255. while data:
  256. size, magic, hash, ns, key = self.header_fmt.unpack(data)
  257. if magic != 0:
  258. raise IntegrityError('Unknown band entry header')
  259. offset += size
  260. if lookup(ns, key):
  261. data = fd.read(size - self.header_fmt.size)
  262. if crc32(data) & 0xffffffff != hash:
  263. raise IntegrityError('Band checksum mismatch')
  264. yield ns, key, data
  265. else:
  266. fd.seek(offset)
  267. data = fd.read(self.header_fmt.size)
  268. def write(self, ns, id, data):
  269. size = len(data) + self.header_fmt.size
  270. if self.offset and self.offset + size > self.limit:
  271. self.close_band()
  272. fd = self.get_fd(self.band, write=True)
  273. fd.seek(self.offset)
  274. if self.offset == 0:
  275. fd.write('DARCBAND')
  276. self.offset = 8
  277. offset = self.offset
  278. hash = crc32(data) & 0xffffffff
  279. fd.write(self.header_fmt.pack(size, 0, hash, ns, id))
  280. fd.write(data)
  281. self.offset += size
  282. return self.band, offset
  283. def close_band(self):
  284. self.band += 1
  285. self.offset = 0
  286. class StoreTestCase(unittest.TestCase):
  287. def setUp(self):
  288. self.tmppath = tempfile.mkdtemp()
  289. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  290. def tearDown(self):
  291. shutil.rmtree(self.tmppath)
  292. def test1(self):
  293. self.assertEqual(self.store.tid, 0)
  294. for x in range(100):
  295. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  296. key50 = '%-32d' % 50
  297. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  298. self.store.delete(0, key50)
  299. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  300. self.store.commit()
  301. self.assertEqual(self.store.tid, 1)
  302. self.store.close()
  303. store2 = Store(os.path.join(self.tmppath, 'store'))
  304. self.assertEqual(store2.tid, 1)
  305. keys = list(store2.list(0))
  306. for x in range(50):
  307. key = '%-32d' % x
  308. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  309. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  310. assert key50 not in keys
  311. for x in range(51, 100):
  312. key = '%-32d' % x
  313. assert key in keys
  314. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  315. self.assertEqual(len(keys), 99)
  316. for x in range(50):
  317. key = '%-32d' % x
  318. store2.delete(0, key)
  319. self.assertEqual(len(list(store2.list(0))), 49)
  320. for x in range(51, 100):
  321. key = '%-32d' % x
  322. store2.delete(0, key)
  323. self.assertEqual(len(list(store2.list(0))), 0)
  324. def suite():
  325. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  326. if __name__ == '__main__':
  327. unittest.main()