store.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. from __future__ import with_statement
  2. from ConfigParser import RawConfigParser
  3. import errno
  4. import fcntl
  5. import os
  6. import shutil
  7. import struct
  8. import tempfile
  9. import unittest
  10. from zlib import crc32
  11. from .hashindex import NSIndex, BandIndex
  12. from .helpers import IntegrityError, read_set, write_set
  13. from .lrucache import LRUCache
  14. class Store(object):
  15. """Filesystem based transactional key value store
  16. On disk layout:
  17. dir/README
  18. dir/config
  19. dir/bands/<X / BANDS_PER_DIR>/<X>
  20. dir/indexes/<NS>
  21. """
  22. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  23. DEFAULT_BANDS_PER_DIR = 10000
  24. class DoesNotExist(KeyError):
  25. """Requested key does not exist"""
  26. def __init__(self, path, create=False):
  27. self.txn_active = False
  28. if not os.path.exists(path) and create:
  29. self.create(path)
  30. self.open(path)
  31. def create(self, path):
  32. """Create a new empty store at `path`
  33. """
  34. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  35. raise Exception('Path "%s" already exists' % path)
  36. if not os.path.exists(path):
  37. os.mkdir(path)
  38. with open(os.path.join(path, 'README'), 'wb') as fd:
  39. fd.write('This is a DARC store')
  40. os.mkdir(os.path.join(path, 'bands'))
  41. os.mkdir(os.path.join(path, 'indexes'))
  42. config = RawConfigParser()
  43. config.add_section('store')
  44. config.set('store', 'version', '1')
  45. config.set('store', 'id', os.urandom(32).encode('hex'))
  46. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  47. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  48. config.add_section('state')
  49. config.set('state', 'next_band', '0')
  50. config.set('state', 'tid', '0')
  51. with open(os.path.join(path, 'config'), 'w') as fd:
  52. config.write(fd)
  53. def open(self, path):
  54. self.path = path
  55. if not os.path.isdir(path):
  56. raise Exception('%s Does not look like a darc store' % path)
  57. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  58. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  59. self.rollback()
  60. self.config = RawConfigParser()
  61. self.config.read(os.path.join(path, 'config'))
  62. if self.config.getint('store', 'version') != 1:
  63. raise Exception('%s Does not look like a darc store')
  64. self.id = self.config.get('store', 'id').decode('hex')
  65. self.tid = self.config.getint('state', 'tid')
  66. next_band = self.config.getint('state', 'next_band')
  67. max_band_size = self.config.getint('store', 'max_band_size')
  68. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  69. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  70. self.io.cleanup()
  71. def delete_bands(self):
  72. delete_path = os.path.join(self.path, 'indexes', 'delete')
  73. if os.path.exists(delete_path):
  74. bands = self.get_index('bands')
  75. for band in read_set(delete_path):
  76. assert bands.pop(band, 0) == 0
  77. self.io.delete_band(band, missing_ok=True)
  78. os.unlink(delete_path)
  79. def begin_txn(self):
  80. txn_dir = os.path.join(self.path, 'txn.tmp')
  81. # Initialize transaction snapshot
  82. os.mkdir(txn_dir)
  83. shutil.copytree(os.path.join(self.path, 'indexes'),
  84. os.path.join(txn_dir, 'indexes'))
  85. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  86. os.rename(os.path.join(self.path, 'txn.tmp'),
  87. os.path.join(self.path, 'txn.active'))
  88. self.compact = set()
  89. self.txn_active = True
  90. def close(self):
  91. self.rollback()
  92. self.lock_fd.close()
  93. def commit(self):
  94. """Commit transaction, `tid` will be increased by 1
  95. """
  96. self.compact_bands()
  97. self.io.close()
  98. self.tid += 1
  99. self.config.set('state', 'tid', self.tid)
  100. self.config.set('state', 'next_band', self.io.band + 1)
  101. with open(os.path.join(self.path, 'config'), 'w') as fd:
  102. self.config.write(fd)
  103. for i in self.indexes.values():
  104. i.flush()
  105. # If we crash before this line, the transaction will be
  106. # rolled back by open()
  107. os.rename(os.path.join(self.path, 'txn.active'),
  108. os.path.join(self.path, 'txn.commit'))
  109. self.rollback()
  110. def compact_bands(self):
  111. """Compact sparse bands by copying data into new bands
  112. """
  113. if not self.compact:
  114. return
  115. self.io.close_band()
  116. def lookup(ns, key):
  117. return key in self.get_index(ns)
  118. bands = self.get_index('bands')
  119. for band in self.compact:
  120. if bands[band] > 0:
  121. for ns, key, data in self.io.iter_objects(band, lookup):
  122. new_band, offset = self.io.write(ns, key, data)
  123. self.indexes[ns][key] = new_band, offset
  124. bands[band] -= 1
  125. bands.setdefault(new_band, 0)
  126. bands[new_band] += 1
  127. write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
  128. def rollback(self):
  129. """
  130. """
  131. # Commit any half committed transaction
  132. if os.path.exists(os.path.join(self.path, 'txn.commit')):
  133. self.delete_bands()
  134. os.rename(os.path.join(self.path, 'txn.commit'),
  135. os.path.join(self.path, 'txn.tmp'))
  136. # Roll back active transaction
  137. txn_dir = os.path.join(self.path, 'txn.active')
  138. if os.path.exists(txn_dir):
  139. shutil.rmtree(os.path.join(self.path, 'indexes'))
  140. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  141. os.path.join(self.path, 'indexes'))
  142. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  143. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  144. # Remove partially removed transaction
  145. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  146. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  147. self.indexes = {}
  148. self.txn_active = False
  149. def get_index(self, ns):
  150. try:
  151. return self.indexes[ns]
  152. except KeyError:
  153. if ns == 'bands':
  154. filename = os.path.join(self.path, 'indexes', 'bands')
  155. cls = BandIndex
  156. else:
  157. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  158. cls = NSIndex
  159. if os.path.exists(filename):
  160. self.indexes[ns] = cls(filename)
  161. else:
  162. self.indexes[ns] = cls.create(filename)
  163. return self.indexes[ns]
  164. def get(self, ns, id):
  165. try:
  166. band, offset = self.get_index(ns)[id]
  167. return self.io.read(band, offset, ns, id)
  168. except KeyError:
  169. raise self.DoesNotExist
  170. def put(self, ns, id, data):
  171. if not self.txn_active:
  172. self.begin_txn()
  173. band, offset = self.io.write(ns, id, data)
  174. bands = self.get_index('bands')
  175. bands.setdefault(band, 0)
  176. bands[band] += 1
  177. self.get_index(ns)[id] = band, offset
  178. def delete(self, ns, id):
  179. if not self.txn_active:
  180. self.begin_txn()
  181. try:
  182. band, offset = self.get_index(ns).pop(id)
  183. self.get_index('bands')[band] -= 1
  184. self.compact.add(band)
  185. except KeyError:
  186. raise self.DoesNotExist
  187. def list(self, ns, marker=None, limit=1000000):
  188. return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
  189. class BandIO(object):
  190. header_fmt = struct.Struct('<IBIB32s')
  191. assert header_fmt.size == 42
  192. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  193. self.path = path
  194. self.fds = LRUCache(capacity)
  195. self.band = nextband
  196. self.limit = limit
  197. self.bands_per_dir = bands_per_dir
  198. self.offset = 0
  199. def close(self):
  200. for band in self.fds.keys():
  201. self.fds.pop(band).close()
  202. def cleanup(self):
  203. """Delete band files left by aborted transactions
  204. """
  205. band = self.band
  206. while True:
  207. filename = self.band_filename(band)
  208. if not os.path.exists(filename):
  209. break
  210. os.unlink(filename)
  211. band += 1
  212. def band_filename(self, band):
  213. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  214. def get_fd(self, band, write=False):
  215. try:
  216. return self.fds[band]
  217. except KeyError:
  218. if write and band % 1000 == 0:
  219. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  220. if not os.path.exists(dirname):
  221. os.mkdir(dirname)
  222. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  223. self.fds[band] = fd
  224. return fd
  225. def delete_band(self, band, missing_ok=False):
  226. try:
  227. os.unlink(self.band_filename(band))
  228. except OSError, e:
  229. if not missing_ok or e.errno != errno.ENOENT:
  230. raise
  231. def read(self, band, offset, ns, id):
  232. fd = self.get_fd(band)
  233. fd.seek(offset)
  234. data = fd.read(self.header_fmt.size)
  235. size, magic, hash, ns_, id_ = self.header_fmt.unpack(data)
  236. assert magic == 0
  237. assert ns == ns_
  238. assert id == id_
  239. data = fd.read(size - self.header_fmt.size)
  240. if crc32(data) & 0xffffffff != hash:
  241. raise IntegrityError('Band checksum mismatch')
  242. return data
  243. def iter_objects(self, band, lookup):
  244. fd = self.get_fd(band)
  245. fd.seek(0)
  246. assert fd.read(8) == 'DARCBAND'
  247. offset = 8
  248. data = fd.read(self.header_fmt.size)
  249. while data:
  250. size, magic, hash, ns, key = self.header_fmt.unpack(data)
  251. assert magic == 0
  252. offset += size
  253. if lookup(ns, key):
  254. data = fd.read(size - self.header_fmt.size)
  255. if crc32(data) & 0xffffffff != hash:
  256. raise IntegrityError('Band checksum mismatch')
  257. yield ns, key, data
  258. else:
  259. fd.seek(offset)
  260. data = fd.read(self.header_fmt.size)
  261. def write(self, ns, id, data):
  262. size = len(data) + self.header_fmt.size
  263. if self.offset and self.offset + size > self.limit:
  264. self.close_band()
  265. fd = self.get_fd(self.band, write=True)
  266. fd.seek(self.offset)
  267. if self.offset == 0:
  268. fd.write('DARCBAND')
  269. self.offset = 8
  270. offset = self.offset
  271. hash = crc32(data) & 0xffffffff
  272. fd.write(self.header_fmt.pack(size, 0, hash, ns, id))
  273. fd.write(data)
  274. self.offset += size
  275. return self.band, offset
  276. def close_band(self):
  277. self.band += 1
  278. self.offset = 0
  279. class StoreTestCase(unittest.TestCase):
  280. def setUp(self):
  281. self.tmppath = tempfile.mkdtemp()
  282. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  283. def tearDown(self):
  284. shutil.rmtree(self.tmppath)
  285. def test1(self):
  286. self.assertEqual(self.store.tid, 0)
  287. for x in range(100):
  288. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  289. key50 = '%-32d' % 50
  290. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  291. self.store.delete(0, key50)
  292. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  293. self.store.commit()
  294. self.assertEqual(self.store.tid, 1)
  295. self.store.close()
  296. store2 = Store(os.path.join(self.tmppath, 'store'))
  297. self.assertEqual(store2.tid, 1)
  298. keys = list(store2.list(0))
  299. for x in range(50):
  300. key = '%-32d' % x
  301. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  302. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  303. assert key50 not in keys
  304. for x in range(51, 100):
  305. key = '%-32d' % x
  306. assert key in keys
  307. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  308. self.assertEqual(len(keys), 99)
  309. for x in range(50):
  310. key = '%-32d' % x
  311. store2.delete(0, key)
  312. self.assertEqual(len(list(store2.list(0))), 49)
  313. for x in range(51, 100):
  314. key = '%-32d' % x
  315. store2.delete(0, key)
  316. self.assertEqual(len(list(store2.list(0))), 0)
  317. def suite():
  318. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  319. if __name__ == '__main__':
  320. unittest.main()