store.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. from ConfigParser import RawConfigParser
  2. import errno
  3. import fcntl
  4. import os
  5. import shutil
  6. import struct
  7. import tempfile
  8. import unittest
  9. from zlib import crc32
  10. from .hashindex import NSIndex, BandIndex
  11. from .helpers import IntegrityError, read_set, write_set
  12. from .lrucache import LRUCache
  13. class Store(object):
  14. """Filesystem based transactional key value store
  15. On disk layout:
  16. dir/README
  17. dir/config
  18. dir/bands/<X / BANDS_PER_DIR>/<X>
  19. dir/indexes/<NS>
  20. """
  21. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  22. DEFAULT_BANDS_PER_DIR = 10000
  23. class DoesNotExist(KeyError):
  24. """Requested key does not exist"""
  25. def __init__(self, path, create=False):
  26. self.txn_active = False
  27. if create:
  28. self.create(path)
  29. self.open(path)
  30. def create(self, path):
  31. """Create a new empty store at `path`
  32. """
  33. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  34. raise Exception('Path "%s" already exists' % path)
  35. if not os.path.exists(path):
  36. os.mkdir(path)
  37. with open(os.path.join(path, 'README'), 'wb') as fd:
  38. fd.write('This is a DARC store')
  39. os.mkdir(os.path.join(path, 'bands'))
  40. os.mkdir(os.path.join(path, 'indexes'))
  41. config = RawConfigParser()
  42. config.add_section('store')
  43. config.set('store', 'version', '1')
  44. config.set('store', 'id', os.urandom(32).encode('hex'))
  45. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  46. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  47. config.add_section('state')
  48. config.set('state', 'next_band', '0')
  49. config.set('state', 'tid', '0')
  50. with open(os.path.join(path, 'config'), 'w') as fd:
  51. config.write(fd)
  52. def open(self, path):
  53. self.path = path
  54. if not os.path.isdir(path):
  55. raise Exception('%s Does not look like a darc store' % path)
  56. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  57. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  58. self.rollback()
  59. self.config = RawConfigParser()
  60. self.config.read(os.path.join(path, 'config'))
  61. if self.config.getint('store', 'version') != 1:
  62. raise Exception('%s Does not look like a darc store')
  63. self.id = self.config.get('store', 'id').decode('hex')
  64. self.tid = self.config.getint('state', 'tid')
  65. next_band = self.config.getint('state', 'next_band')
  66. max_band_size = self.config.getint('store', 'max_band_size')
  67. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  68. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  69. self.io.cleanup()
  70. def delete_bands(self):
  71. delete_path = os.path.join(self.path, 'indexes', 'delete')
  72. if os.path.exists(delete_path):
  73. bands = self.get_index('bands')
  74. for band in read_set(delete_path):
  75. assert bands.pop(band, 0) == 0
  76. self.io.delete_band(band, missing_ok=True)
  77. os.unlink(delete_path)
  78. def begin_txn(self):
  79. txn_dir = os.path.join(self.path, 'txn.tmp')
  80. # Initialize transaction snapshot
  81. os.mkdir(txn_dir)
  82. shutil.copytree(os.path.join(self.path, 'indexes'),
  83. os.path.join(txn_dir, 'indexes'))
  84. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  85. os.rename(os.path.join(self.path, 'txn.tmp'),
  86. os.path.join(self.path, 'txn.active'))
  87. self.compact = set()
  88. self.txn_active = True
  89. def close(self):
  90. self.rollback()
  91. self.lock_fd.close()
  92. def commit(self):
  93. """Commit transaction, `tid` will be increased by 1
  94. """
  95. self.compact_bands()
  96. self.io.close()
  97. self.tid += 1
  98. self.config.set('state', 'tid', self.tid)
  99. self.config.set('state', 'next_band', self.io.band + 1)
  100. with open(os.path.join(self.path, 'config'), 'w') as fd:
  101. self.config.write(fd)
  102. for i in self.indexes.values():
  103. i.flush()
  104. # If we crash before this line, the transaction will be
  105. # rolled back by open()
  106. os.rename(os.path.join(self.path, 'txn.active'),
  107. os.path.join(self.path, 'txn.commit'))
  108. self.rollback()
  109. def compact_bands(self):
  110. """Compact sparse bands by copying data into new bands
  111. """
  112. if not self.compact:
  113. return
  114. self.io.close_band()
  115. def lookup(ns, key):
  116. return key in self.get_index(ns)
  117. bands = self.get_index('bands')
  118. for band in self.compact:
  119. if bands[band] > 0:
  120. for ns, key, data in self.io.iter_objects(band, lookup):
  121. new_band, offset = self.io.write(ns, key, data)
  122. self.indexes[ns][key] = new_band, offset
  123. bands[band] -= 1
  124. bands.setdefault(new_band, 0)
  125. bands[new_band] += 1
  126. write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
  127. def rollback(self):
  128. """
  129. """
  130. # Commit any half committed transaction
  131. if os.path.exists(os.path.join(self.path, 'txn.commit')):
  132. self.delete_bands()
  133. os.rename(os.path.join(self.path, 'txn.commit'),
  134. os.path.join(self.path, 'txn.tmp'))
  135. # Roll back active transaction
  136. txn_dir = os.path.join(self.path, 'txn.active')
  137. if os.path.exists(txn_dir):
  138. shutil.rmtree(os.path.join(self.path, 'indexes'))
  139. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  140. os.path.join(self.path, 'indexes'))
  141. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  142. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  143. # Remove partially removed transaction
  144. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  145. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  146. self.indexes = {}
  147. self.txn_active = False
  148. def get_index(self, ns):
  149. try:
  150. return self.indexes[ns]
  151. except KeyError:
  152. if ns == 'bands':
  153. filename = os.path.join(self.path, 'indexes', 'bands')
  154. cls = BandIndex
  155. else:
  156. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  157. cls = NSIndex
  158. if os.path.exists(filename):
  159. self.indexes[ns] = cls(filename)
  160. else:
  161. self.indexes[ns] = cls.create(filename)
  162. return self.indexes[ns]
  163. def get(self, ns, id):
  164. try:
  165. band, offset = self.get_index(ns)[id]
  166. return self.io.read(band, offset)
  167. except KeyError:
  168. raise self.DoesNotExist
  169. def put(self, ns, id, data):
  170. if not self.txn_active:
  171. self.begin_txn()
  172. band, offset = self.io.write(ns, id, data)
  173. bands = self.get_index('bands')
  174. bands.setdefault(band, 0)
  175. bands[band] += 1
  176. self.get_index(ns)[id] = band, offset
  177. def delete(self, ns, id):
  178. if not self.txn_active:
  179. self.begin_txn()
  180. try:
  181. band, offset = self.get_index(ns).pop(id)
  182. self.get_index('bands')[band] -= 1
  183. self.compact.add(band)
  184. except KeyError:
  185. raise self.DoesNotExist
  186. def list(self, ns, marker=None, limit=1000000):
  187. return [key for (key, value) in
  188. self.get_index(ns).iteritems(marker=marker, limit=limit)]
  189. class BandIO(object):
  190. header_fmt = struct.Struct('<iBiB32s')
  191. assert header_fmt.size == 42
  192. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  193. self.path = path
  194. self.fds = LRUCache(capacity)
  195. self.band = nextband
  196. self.limit = limit
  197. self.bands_per_dir = bands_per_dir
  198. self.offset = 0
  199. def close(self):
  200. for band in self.fds.keys():
  201. self.fds.pop(band).close()
  202. def cleanup(self):
  203. """Delete band files left by aborted transactions
  204. """
  205. band = self.band
  206. while True:
  207. filename = self.band_filename(band)
  208. if not os.path.exists(filename):
  209. break
  210. os.unlink(filename)
  211. band += 1
  212. def band_filename(self, band):
  213. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  214. def get_fd(self, band, write=False):
  215. try:
  216. return self.fds[band]
  217. except KeyError:
  218. if write and band % 1000 == 0:
  219. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  220. if not os.path.exists(dirname):
  221. os.mkdir(dirname)
  222. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  223. self.fds[band] = fd
  224. return fd
  225. def delete_band(self, band, missing_ok=False):
  226. try:
  227. os.unlink(self.band_filename(band))
  228. except OSError, e:
  229. if not missing_ok or e.errno != errno.ENOENT:
  230. raise
  231. def read(self, band, offset):
  232. fd = self.get_fd(band)
  233. fd.seek(offset)
  234. data = fd.read(self.header_fmt.size)
  235. size, magic, hash, ns, id = self.header_fmt.unpack(data)
  236. assert magic == 0
  237. data = fd.read(size - self.header_fmt.size)
  238. if crc32(data) != hash:
  239. raise IntegrityError('Band checksum mismatch')
  240. return data
  241. def iter_objects(self, band, lookup):
  242. fd = self.get_fd(band)
  243. fd.seek(0)
  244. assert fd.read(8) == 'DARCBAND'
  245. offset = 8
  246. data = fd.read(self.header_fmt.size)
  247. while data:
  248. size, magic, hash, ns, key = self.header_fmt.unpack(data)
  249. assert magic == 0
  250. offset += size
  251. if lookup(ns, key):
  252. data = fd.read(size - self.header_fmt.size)
  253. if crc32(data) != hash:
  254. raise IntegrityError('Band checksum mismatch')
  255. yield ns, key, data
  256. else:
  257. fd.seek(offset)
  258. data = fd.read(self.header_fmt.size)
  259. def write(self, ns, id, data):
  260. size = len(data) + self.header_fmt.size
  261. if self.offset and self.offset + size > self.limit:
  262. self.close_band()
  263. fd = self.get_fd(self.band, write=True)
  264. fd.seek(self.offset)
  265. if self.offset == 0:
  266. fd.write('DARCBAND')
  267. self.offset = 8
  268. offset = self.offset
  269. hash = crc32(data)
  270. fd.write(self.header_fmt.pack(size, 0, hash, ns, id))
  271. fd.write(data)
  272. self.offset += size
  273. return self.band, offset
  274. def close_band(self):
  275. self.band += 1
  276. self.offset = 0
  277. class StoreTestCase(unittest.TestCase):
  278. def setUp(self):
  279. self.tmppath = tempfile.mkdtemp()
  280. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  281. def tearDown(self):
  282. shutil.rmtree(self.tmppath)
  283. def test1(self):
  284. self.assertEqual(self.store.tid, 0)
  285. for x in range(100):
  286. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  287. key50 = '%-32d' % 50
  288. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  289. self.store.delete(0, key50)
  290. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  291. self.store.commit()
  292. self.assertEqual(self.store.tid, 1)
  293. self.store.close()
  294. store2 = Store(os.path.join(self.tmppath, 'store'))
  295. self.assertEqual(store2.tid, 1)
  296. keys = store2.list(0)
  297. for x in range(50):
  298. key = '%-32d' % x
  299. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  300. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  301. assert key50 not in keys
  302. for x in range(51, 100):
  303. key = '%-32d' % x
  304. assert key in keys
  305. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  306. self.assertEqual(len(keys), 99)
  307. for x in range(50):
  308. key = '%-32d' % x
  309. store2.delete(0, key)
  310. self.assertEqual(len(store2.list(0)), 49)
  311. for x in range(51, 100):
  312. key = '%-32d' % x
  313. store2.delete(0, key)
  314. self.assertEqual(len(store2.list(0)), 0)
  315. def suite():
  316. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  317. if __name__ == '__main__':
  318. unittest.main()