2
0

store.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. from __future__ import with_statement
  2. from ConfigParser import RawConfigParser
  3. import errno
  4. import fcntl
  5. import os
  6. import shutil
  7. import struct
  8. import tempfile
  9. import unittest
  10. from zlib import crc32
  11. from .hashindex import NSIndex, BandIndex
  12. from .helpers import IntegrityError, read_set, write_set, deferrable
  13. from .lrucache import LRUCache
  14. class Store(object):
  15. """Filesystem based transactional key value store
  16. On disk layout:
  17. dir/README
  18. dir/config
  19. dir/bands/<X / BANDS_PER_DIR>/<X>
  20. dir/indexes/<NS>
  21. """
  22. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  23. DEFAULT_BANDS_PER_DIR = 10000
  24. class DoesNotExist(KeyError):
  25. """Requested key does not exist"""
  26. def __init__(self, path, create=False):
  27. self.txn_active = False
  28. if create:
  29. self.create(path)
  30. self.open(path)
  31. def create(self, path):
  32. """Create a new empty store at `path`
  33. """
  34. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  35. raise Exception('Path "%s" already exists' % path)
  36. if not os.path.exists(path):
  37. os.mkdir(path)
  38. with open(os.path.join(path, 'README'), 'wb') as fd:
  39. fd.write('This is a DARC store')
  40. os.mkdir(os.path.join(path, 'bands'))
  41. os.mkdir(os.path.join(path, 'indexes'))
  42. config = RawConfigParser()
  43. config.add_section('store')
  44. config.set('store', 'version', '1')
  45. config.set('store', 'id', os.urandom(32).encode('hex'))
  46. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  47. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  48. config.add_section('state')
  49. config.set('state', 'next_band', '0')
  50. config.set('state', 'tid', '0')
  51. with open(os.path.join(path, 'config'), 'w') as fd:
  52. config.write(fd)
  53. def open(self, path):
  54. self.path = path
  55. if not os.path.isdir(path):
  56. raise Exception('%s Does not look like a darc store' % path)
  57. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  58. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  59. self.rollback()
  60. self.config = RawConfigParser()
  61. self.config.read(os.path.join(path, 'config'))
  62. if self.config.getint('store', 'version') != 1:
  63. raise Exception('%s Does not look like a darc store')
  64. self.id = self.config.get('store', 'id').decode('hex')
  65. self.tid = self.config.getint('state', 'tid')
  66. next_band = self.config.getint('state', 'next_band')
  67. max_band_size = self.config.getint('store', 'max_band_size')
  68. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  69. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  70. self.io.cleanup()
  71. def delete_bands(self):
  72. delete_path = os.path.join(self.path, 'indexes', 'delete')
  73. if os.path.exists(delete_path):
  74. bands = self.get_index('bands')
  75. for band in read_set(delete_path):
  76. assert bands.pop(band, 0) == 0
  77. self.io.delete_band(band, missing_ok=True)
  78. os.unlink(delete_path)
  79. def begin_txn(self):
  80. txn_dir = os.path.join(self.path, 'txn.tmp')
  81. # Initialize transaction snapshot
  82. os.mkdir(txn_dir)
  83. shutil.copytree(os.path.join(self.path, 'indexes'),
  84. os.path.join(txn_dir, 'indexes'))
  85. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  86. os.rename(os.path.join(self.path, 'txn.tmp'),
  87. os.path.join(self.path, 'txn.active'))
  88. self.compact = set()
  89. self.txn_active = True
  90. def close(self):
  91. self.rollback()
  92. self.lock_fd.close()
  93. def commit(self):
  94. """Commit transaction, `tid` will be increased by 1
  95. """
  96. self.compact_bands()
  97. self.io.close()
  98. self.tid += 1
  99. self.config.set('state', 'tid', self.tid)
  100. self.config.set('state', 'next_band', self.io.band + 1)
  101. with open(os.path.join(self.path, 'config'), 'w') as fd:
  102. self.config.write(fd)
  103. for i in self.indexes.values():
  104. i.flush()
  105. # If we crash before this line, the transaction will be
  106. # rolled back by open()
  107. os.rename(os.path.join(self.path, 'txn.active'),
  108. os.path.join(self.path, 'txn.commit'))
  109. self.rollback()
  110. def compact_bands(self):
  111. """Compact sparse bands by copying data into new bands
  112. """
  113. if not self.compact:
  114. return
  115. self.io.close_band()
  116. def lookup(ns, key):
  117. return key in self.get_index(ns)
  118. bands = self.get_index('bands')
  119. for band in self.compact:
  120. if bands[band] > 0:
  121. for ns, key, data in self.io.iter_objects(band, lookup):
  122. new_band, offset = self.io.write(ns, key, data)
  123. self.indexes[ns][key] = new_band, offset
  124. bands[band] -= 1
  125. bands.setdefault(new_band, 0)
  126. bands[new_band] += 1
  127. write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
  128. def rollback(self):
  129. """
  130. """
  131. # Commit any half committed transaction
  132. if os.path.exists(os.path.join(self.path, 'txn.commit')):
  133. self.delete_bands()
  134. os.rename(os.path.join(self.path, 'txn.commit'),
  135. os.path.join(self.path, 'txn.tmp'))
  136. # Roll back active transaction
  137. txn_dir = os.path.join(self.path, 'txn.active')
  138. if os.path.exists(txn_dir):
  139. shutil.rmtree(os.path.join(self.path, 'indexes'))
  140. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  141. os.path.join(self.path, 'indexes'))
  142. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  143. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  144. # Remove partially removed transaction
  145. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  146. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  147. self.indexes = {}
  148. self.txn_active = False
  149. def get_index(self, ns):
  150. try:
  151. return self.indexes[ns]
  152. except KeyError:
  153. if ns == 'bands':
  154. filename = os.path.join(self.path, 'indexes', 'bands')
  155. cls = BandIndex
  156. else:
  157. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  158. cls = NSIndex
  159. if os.path.exists(filename):
  160. self.indexes[ns] = cls(filename)
  161. else:
  162. self.indexes[ns] = cls.create(filename)
  163. return self.indexes[ns]
  164. @deferrable
  165. def get(self, ns, id):
  166. try:
  167. band, offset = self.get_index(ns)[id]
  168. return self.io.read(band, offset, ns, id)
  169. except KeyError:
  170. raise self.DoesNotExist
  171. @deferrable
  172. def put(self, ns, id, data):
  173. if not self.txn_active:
  174. self.begin_txn()
  175. band, offset = self.io.write(ns, id, data)
  176. bands = self.get_index('bands')
  177. bands.setdefault(band, 0)
  178. bands[band] += 1
  179. self.get_index(ns)[id] = band, offset
  180. @deferrable
  181. def delete(self, ns, id):
  182. if not self.txn_active:
  183. self.begin_txn()
  184. try:
  185. band, offset = self.get_index(ns).pop(id)
  186. self.get_index('bands')[band] -= 1
  187. self.compact.add(band)
  188. except KeyError:
  189. raise self.DoesNotExist
  190. @deferrable
  191. def list(self, ns, marker=None, limit=1000000):
  192. return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
  193. def flush_rpc(self, *args):
  194. pass
  195. class BandIO(object):
  196. header_fmt = struct.Struct('<IBIB32s')
  197. assert header_fmt.size == 42
  198. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  199. self.path = path
  200. self.fds = LRUCache(capacity)
  201. self.band = nextband
  202. self.limit = limit
  203. self.bands_per_dir = bands_per_dir
  204. self.offset = 0
  205. def close(self):
  206. for band in self.fds.keys():
  207. self.fds.pop(band).close()
  208. def cleanup(self):
  209. """Delete band files left by aborted transactions
  210. """
  211. band = self.band
  212. while True:
  213. filename = self.band_filename(band)
  214. if not os.path.exists(filename):
  215. break
  216. os.unlink(filename)
  217. band += 1
  218. def band_filename(self, band):
  219. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  220. def get_fd(self, band, write=False):
  221. try:
  222. return self.fds[band]
  223. except KeyError:
  224. if write and band % 1000 == 0:
  225. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  226. if not os.path.exists(dirname):
  227. os.mkdir(dirname)
  228. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  229. self.fds[band] = fd
  230. return fd
  231. def delete_band(self, band, missing_ok=False):
  232. try:
  233. os.unlink(self.band_filename(band))
  234. except OSError, e:
  235. if not missing_ok or e.errno != errno.ENOENT:
  236. raise
  237. def read(self, band, offset, ns, id):
  238. fd = self.get_fd(band)
  239. fd.seek(offset)
  240. data = fd.read(self.header_fmt.size)
  241. size, magic, hash, ns_, id_ = self.header_fmt.unpack(data)
  242. assert magic == 0
  243. assert ns == ns_
  244. assert id == id_
  245. data = fd.read(size - self.header_fmt.size)
  246. if crc32(data) & 0xffffffff != hash:
  247. raise IntegrityError('Band checksum mismatch')
  248. return data
  249. def iter_objects(self, band, lookup):
  250. fd = self.get_fd(band)
  251. fd.seek(0)
  252. assert fd.read(8) == 'DARCBAND'
  253. offset = 8
  254. data = fd.read(self.header_fmt.size)
  255. while data:
  256. size, magic, hash, ns, key = self.header_fmt.unpack(data)
  257. assert magic == 0
  258. offset += size
  259. if lookup(ns, key):
  260. data = fd.read(size - self.header_fmt.size)
  261. if crc32(data) & 0xffffffff != hash:
  262. raise IntegrityError('Band checksum mismatch')
  263. yield ns, key, data
  264. else:
  265. fd.seek(offset)
  266. data = fd.read(self.header_fmt.size)
  267. def write(self, ns, id, data):
  268. size = len(data) + self.header_fmt.size
  269. if self.offset and self.offset + size > self.limit:
  270. self.close_band()
  271. fd = self.get_fd(self.band, write=True)
  272. fd.seek(self.offset)
  273. if self.offset == 0:
  274. fd.write('DARCBAND')
  275. self.offset = 8
  276. offset = self.offset
  277. hash = crc32(data) & 0xffffffff
  278. fd.write(self.header_fmt.pack(size, 0, hash, ns, id))
  279. fd.write(data)
  280. self.offset += size
  281. return self.band, offset
  282. def close_band(self):
  283. self.band += 1
  284. self.offset = 0
  285. class StoreTestCase(unittest.TestCase):
  286. def setUp(self):
  287. self.tmppath = tempfile.mkdtemp()
  288. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  289. def tearDown(self):
  290. shutil.rmtree(self.tmppath)
  291. def test1(self):
  292. self.assertEqual(self.store.tid, 0)
  293. for x in range(100):
  294. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  295. key50 = '%-32d' % 50
  296. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  297. self.store.delete(0, key50)
  298. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  299. self.store.commit()
  300. self.assertEqual(self.store.tid, 1)
  301. self.store.close()
  302. store2 = Store(os.path.join(self.tmppath, 'store'))
  303. self.assertEqual(store2.tid, 1)
  304. keys = list(store2.list(0))
  305. for x in range(50):
  306. key = '%-32d' % x
  307. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  308. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  309. assert key50 not in keys
  310. for x in range(51, 100):
  311. key = '%-32d' % x
  312. assert key in keys
  313. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  314. self.assertEqual(len(keys), 99)
  315. for x in range(50):
  316. key = '%-32d' % x
  317. store2.delete(0, key)
  318. self.assertEqual(len(list(store2.list(0))), 49)
  319. for x in range(51, 100):
  320. key = '%-32d' % x
  321. store2.delete(0, key)
  322. self.assertEqual(len(list(store2.list(0))), 0)
  323. def suite():
  324. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  325. if __name__ == '__main__':
  326. unittest.main()