store.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. from ConfigParser import RawConfigParser
  2. import fcntl
  3. import os
  4. import shutil
  5. import struct
  6. import tempfile
  7. import unittest
  8. from zlib import crc32
  9. from .hashindex import NSIndex, BandIndex
  10. from .helpers import IntegrityError, read_set, write_set
  11. from .lrucache import LRUCache
  12. class Store(object):
  13. """Filesystem based transactional key value store
  14. On disk layout:
  15. dir/README
  16. dir/config
  17. dir/bands/<X / BANDS_PER_DIR>/<X>
  18. dir/indexes/<NS>
  19. """
  20. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  21. DEFAULT_BANDS_PER_DIR = 10000
  22. class DoesNotExist(KeyError):
  23. """Requested key does not exist"""
  24. def __init__(self, path, create=False):
  25. self.txn_active = False
  26. if create:
  27. self.create(path)
  28. self.open(path)
  29. def create(self, path):
  30. """Create a new empty store at `path`
  31. """
  32. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  33. raise Exception('Path "%s" already exists' % path)
  34. if not os.path.exists(path):
  35. os.mkdir(path)
  36. with open(os.path.join(path, 'README'), 'wb') as fd:
  37. fd.write('This is a DARC store')
  38. os.mkdir(os.path.join(path, 'bands'))
  39. os.mkdir(os.path.join(path, 'indexes'))
  40. BandIndex.create(os.path.join(path, 'indexes', 'bands'))
  41. config = RawConfigParser()
  42. config.add_section('store')
  43. config.set('store', 'version', '1')
  44. config.set('store', 'id', os.urandom(32).encode('hex'))
  45. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  46. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  47. config.add_section('state')
  48. config.set('state', 'next_band', '0')
  49. config.set('state', 'tid', '0')
  50. with open(os.path.join(path, 'config'), 'w') as fd:
  51. config.write(fd)
  52. def open(self, path):
  53. self.path = path
  54. if not os.path.isdir(path):
  55. raise Exception('%s Does not look like a darc store' % path)
  56. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  57. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  58. self.rollback()
  59. self.config = RawConfigParser()
  60. self.config.read(os.path.join(path, 'config'))
  61. if self.config.getint('store', 'version') != 1:
  62. raise Exception('%s Does not look like a darc store')
  63. self.id = self.config.get('store', 'id').decode('hex')
  64. self.tid = self.config.getint('state', 'tid')
  65. next_band = self.config.getint('state', 'next_band')
  66. max_band_size = self.config.getint('store', 'max_band_size')
  67. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  68. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  69. def delete_bands(self):
  70. delete_path = os.path.join(self.path, 'indexes', 'delete')
  71. if os.path.exists(delete_path):
  72. for band in read_set(delete_path):
  73. assert self.bands.pop(band) == 0
  74. self.io.delete_band(band)
  75. os.unlink(delete_path)
  76. def begin_txn(self):
  77. self.io.cleanup()
  78. self.delete_bands()
  79. txn_dir = os.path.join(self.path, 'txn.tmp')
  80. # Initialize transaction snapshot
  81. os.mkdir(txn_dir)
  82. shutil.copytree(os.path.join(self.path, 'indexes'),
  83. os.path.join(txn_dir, 'indexes'))
  84. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  85. os.rename(os.path.join(self.path, 'txn.tmp'),
  86. os.path.join(self.path, 'txn.active'))
  87. self.compact = set()
  88. self.txn_active = True
  89. self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands'))
  90. def close(self):
  91. self.rollback()
  92. self.lock_fd.close()
  93. def commit(self):
  94. """Commit transaction, `tid` will be increased by 1
  95. """
  96. self.compact_bands()
  97. self.io.close()
  98. self.tid += 1
  99. self.config.set('state', 'tid', self.tid)
  100. self.config.set('state', 'next_band', self.io.band + 1)
  101. with open(os.path.join(self.path, 'config'), 'w') as fd:
  102. self.config.write(fd)
  103. for i in self.indexes.values():
  104. i.flush()
  105. self.bands.flush()
  106. os.rename(os.path.join(self.path, 'txn.active'),
  107. os.path.join(self.path, 'txn.tmp'))
  108. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  109. self.indexes = {}
  110. self.txn_active = False
  111. self.delete_bands()
  112. def compact_bands(self):
  113. """Compact sparse bands by copying data into new bands
  114. """
  115. if not self.compact:
  116. return
  117. self.io.close_band()
  118. def lookup(ns, key):
  119. return key in self.get_index(ns)
  120. for band in self.compact:
  121. if self.bands[band] > 0:
  122. for ns, key, data in self.io.iter_objects(band, lookup):
  123. new_band, offset = self.io.write(ns, key, data)
  124. self.indexes[ns][key] = new_band, offset
  125. self.bands[band] -= 1
  126. self.bands.setdefault(new_band, 0)
  127. self.bands[new_band] += 1
  128. write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
  129. def rollback(self):
  130. """
  131. """
  132. # Remove partial transaction
  133. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  134. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  135. # Roll back active transaction
  136. txn_dir = os.path.join(self.path, 'txn.active')
  137. if os.path.exists(txn_dir):
  138. shutil.rmtree(os.path.join(self.path, 'indexes'))
  139. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  140. os.path.join(self.path, 'indexes'))
  141. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  142. shutil.rmtree(txn_dir)
  143. self.indexes = {}
  144. self.txn_active = False
  145. def get_index(self, ns):
  146. try:
  147. return self.indexes[ns]
  148. except KeyError:
  149. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  150. if os.path.exists(filename):
  151. self.indexes[ns] = NSIndex(filename)
  152. else:
  153. self.indexes[ns] = NSIndex.create(filename)
  154. return self.indexes[ns]
  155. def get(self, ns, id):
  156. try:
  157. band, offset = self.get_index(ns)[id]
  158. return self.io.read(band, offset)
  159. except KeyError:
  160. raise self.DoesNotExist
  161. def put(self, ns, id, data):
  162. if not self.txn_active:
  163. self.begin_txn()
  164. band, offset = self.io.write(ns, id, data)
  165. self.bands.setdefault(band, 0)
  166. self.bands[band] += 1
  167. self.get_index(ns)[id] = band, offset
  168. def delete(self, ns, id):
  169. if not self.txn_active:
  170. self.begin_txn()
  171. try:
  172. band, offset = self.get_index(ns).pop(id)
  173. self.bands[band] -= 1
  174. self.compact.add(band)
  175. except KeyError:
  176. raise self.DoesNotExist
  177. def list(self, ns, marker=None, limit=1000000):
  178. return [key for (key, value) in
  179. self.get_index(ns).iteritems(marker=marker, limit=limit)]
  180. class BandIO(object):
  181. header_fmt = struct.Struct('<iBiB32s')
  182. assert header_fmt.size == 42
  183. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  184. self.path = path
  185. self.fds = LRUCache(capacity)
  186. self.band = nextband
  187. self.limit = limit
  188. self.bands_per_dir = bands_per_dir
  189. self.offset = 0
  190. def close(self):
  191. for band in self.fds.keys():
  192. self.fds.pop(band).close()
  193. def cleanup(self):
  194. """Delete band files left by aborted transactions
  195. """
  196. band = self.band
  197. while True:
  198. filename = self.band_filename(band)
  199. if not os.path.exists(filename):
  200. break
  201. os.unlink(filename)
  202. band += 1
  203. def band_filename(self, band):
  204. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  205. def get_fd(self, band, write=False):
  206. try:
  207. return self.fds[band]
  208. except KeyError:
  209. if write and band % 1000 == 0:
  210. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  211. if not os.path.exists(dirname):
  212. os.mkdir(dirname)
  213. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  214. self.fds[band] = fd
  215. return fd
  216. def delete_band(self, band):
  217. os.unlink(self.band_filename(band))
  218. def read(self, band, offset):
  219. fd = self.get_fd(band)
  220. fd.seek(offset)
  221. data = fd.read(self.header_fmt.size)
  222. size, magic, hash, ns, id = self.header_fmt.unpack(data)
  223. assert magic == 0
  224. data = fd.read(size - self.header_fmt.size)
  225. if crc32(data) != hash:
  226. raise IntegrityError('Band checksum mismatch')
  227. return data
  228. def iter_objects(self, band, lookup):
  229. fd = self.get_fd(band)
  230. fd.seek(0)
  231. assert fd.read(8) == 'DARCBAND'
  232. offset = 8
  233. data = fd.read(self.header_fmt.size)
  234. while data:
  235. size, magic, hash, ns, key = self.header_fmt.unpack(data)
  236. assert magic == 0
  237. offset += size
  238. if lookup(ns, key):
  239. data = fd.read(size - self.header_fmt.size)
  240. if crc32(data) != hash:
  241. raise IntegrityError('Band checksum mismatch')
  242. yield ns, key, data
  243. else:
  244. fd.seek(offset)
  245. data = fd.read(self.header_fmt.size)
  246. def write(self, ns, id, data):
  247. size = len(data) + self.header_fmt.size
  248. if self.offset and self.offset + size > self.limit:
  249. self.close_band()
  250. fd = self.get_fd(self.band, write=True)
  251. fd.seek(self.offset)
  252. if self.offset == 0:
  253. fd.write('DARCBAND')
  254. self.offset = 8
  255. offset = self.offset
  256. hash = crc32(data)
  257. fd.write(self.header_fmt.pack(size, 0, hash, ns, id))
  258. fd.write(data)
  259. self.offset += size
  260. return self.band, offset
  261. def close_band(self):
  262. self.band += 1
  263. self.offset = 0
  264. class StoreTestCase(unittest.TestCase):
  265. def setUp(self):
  266. self.tmppath = tempfile.mkdtemp()
  267. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  268. def tearDown(self):
  269. shutil.rmtree(self.tmppath)
  270. def test1(self):
  271. self.assertEqual(self.store.tid, 0)
  272. for x in range(100):
  273. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  274. key50 = '%-32d' % 50
  275. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  276. self.store.delete(0, key50)
  277. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  278. self.store.commit()
  279. self.assertEqual(self.store.tid, 1)
  280. self.store.close()
  281. store2 = Store(os.path.join(self.tmppath, 'store'))
  282. self.assertEqual(store2.tid, 1)
  283. keys = store2.list(0)
  284. for x in range(50):
  285. key = '%-32d' % x
  286. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  287. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  288. assert key50 not in keys
  289. for x in range(51, 100):
  290. key = '%-32d' % x
  291. assert key in keys
  292. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  293. self.assertEqual(len(keys), 99)
  294. for x in range(50):
  295. key = '%-32d' % x
  296. store2.delete(0, key)
  297. self.assertEqual(len(store2.list(0)), 49)
  298. for x in range(51, 100):
  299. key = '%-32d' % x
  300. store2.delete(0, key)
  301. self.assertEqual(len(store2.list(0)), 0)
  302. def suite():
  303. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  304. if __name__ == '__main__':
  305. unittest.main()