store.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. from ConfigParser import RawConfigParser
  2. import fcntl
  3. import os
  4. import shutil
  5. import struct
  6. import tempfile
  7. import unittest
  8. from .lrucache import LRUCache
  9. from .hashindex import NSIndex, BandIndex
  10. class Store(object):
  11. """Filesystem based transactional key value store
  12. On disk layout:
  13. dir/README
  14. dir/config
  15. dir/bands/<X / BANDS_PER_DIR>/<X>
  16. dir/indexes/<NS>
  17. """
  18. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  19. DEFAULT_BANDS_PER_DIR = 10000
  20. class DoesNotExist(KeyError):
  21. """Requested key does not exist"""
  22. def __init__(self, path, create=False):
  23. self.txn_active = False
  24. if create:
  25. self.create(path)
  26. self.open(path)
  27. def create(self, path):
  28. """Create a new empty store at `path`
  29. """
  30. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  31. raise Exception('Path "%s" already exists' % path)
  32. if not os.path.exists(path):
  33. os.mkdir(path)
  34. with open(os.path.join(path, 'README'), 'wb') as fd:
  35. fd.write('This is a DARC store')
  36. os.mkdir(os.path.join(path, 'bands'))
  37. os.mkdir(os.path.join(path, 'indexes'))
  38. BandIndex.create(os.path.join(path, 'indexes', 'bands'))
  39. config = RawConfigParser()
  40. config.add_section('store')
  41. config.set('store', 'version', '1')
  42. config.set('store', 'id', os.urandom(32).encode('hex'))
  43. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  44. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  45. config.add_section('state')
  46. config.set('state', 'next_band', '0')
  47. config.set('state', 'tid', '0')
  48. with open(os.path.join(path, 'config'), 'w') as fd:
  49. config.write(fd)
  50. def open(self, path):
  51. self.path = path
  52. if not os.path.isdir(path):
  53. raise Exception('%s Does not look like a darc store' % path)
  54. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  55. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  56. self.config = RawConfigParser()
  57. self.config.read(os.path.join(path, 'config'))
  58. if self.config.getint('store', 'version') != 1:
  59. raise Exception('%s Does not look like a darc store')
  60. self.id = self.config.get('store', 'id').decode('hex')
  61. self.tid = self.config.getint('state', 'tid')
  62. next_band = self.config.getint('state', 'next_band')
  63. max_band_size = self.config.getint('store', 'max_band_size')
  64. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  65. self.rollback()
  66. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  67. def begin_txn(self):
  68. txn_dir = os.path.join(self.path, 'txn.tmp')
  69. # Initialize transaction snapshot
  70. os.mkdir(txn_dir)
  71. shutil.copytree(os.path.join(self.path, 'indexes'),
  72. os.path.join(txn_dir, 'indexes'))
  73. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  74. os.rename(os.path.join(self.path, 'txn.tmp'),
  75. os.path.join(self.path, 'txn.active'))
  76. self.compact = set()
  77. self.txn_active = True
  78. self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands'))
  79. def close(self):
  80. self.rollback()
  81. self.lock_fd.close()
  82. def commit(self):
  83. """Commit transaction, `tid` will be increased by 1
  84. """
  85. self.compact_bands()
  86. self.io.close()
  87. self.tid += 1
  88. self.config.set('state', 'tid', self.tid)
  89. self.config.set('state', 'next_band', self.io.band + 1)
  90. with open(os.path.join(self.path, 'config'), 'w') as fd:
  91. self.config.write(fd)
  92. for i in self.indexes.values():
  93. i.flush()
  94. self.bands.flush()
  95. os.rename(os.path.join(self.path, 'txn.active'),
  96. os.path.join(self.path, 'txn.tmp'))
  97. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  98. self.indexes = {}
  99. self.txn_active = False
  100. def compact_bands(self):
  101. if not self.compact:
  102. return
  103. self.io.close_band()
  104. def lookup(ns, key):
  105. return key in self.indexes[ns]
  106. for band in self.compact:
  107. if self.bands[band] > 0:
  108. for ns, key, data in self.io.iter_objects(band, lookup):
  109. new_band, offset = self.io.write(ns, key, data)
  110. self.indexes[ns][key] = new_band, offset
  111. self.bands[band] -= 1
  112. self.bands.setdefault(new_band, 0)
  113. self.bands[new_band] += 1
  114. for band in self.compact:
  115. assert self.bands.pop(band) == 0
  116. self.io.delete_band(band)
  117. def rollback(self):
  118. """
  119. """
  120. # Remove partial transaction
  121. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  122. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  123. # Roll back active transaction
  124. txn_dir = os.path.join(self.path, 'txn.active')
  125. if os.path.exists(txn_dir):
  126. shutil.rmtree(os.path.join(self.path, 'indexes'))
  127. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  128. os.path.join(self.path, 'indexes'))
  129. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  130. shutil.rmtree(txn_dir)
  131. self.indexes = {}
  132. self.txn_active = False
  133. def get_index(self, ns):
  134. try:
  135. return self.indexes[ns]
  136. except KeyError:
  137. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  138. if os.path.exists(filename):
  139. self.indexes[ns] = NSIndex(filename)
  140. else:
  141. self.indexes[ns] = NSIndex.create(filename)
  142. return self.indexes[ns]
  143. def get(self, ns, id):
  144. try:
  145. band, offset = self.get_index(ns)[id]
  146. return self.io.read(band, offset)
  147. except KeyError:
  148. raise self.DoesNotExist
  149. def put(self, ns, id, data):
  150. if not self.txn_active:
  151. self.begin_txn()
  152. band, offset = self.io.write(ns, id, data)
  153. self.bands.setdefault(band, 0)
  154. self.bands[band] += 1
  155. self.get_index(ns)[id] = band, offset
  156. def delete(self, ns, id):
  157. if not self.txn_active:
  158. self.begin_txn()
  159. try:
  160. band, offset = self.get_index(ns).pop(id)
  161. self.bands[band] -= 1
  162. self.compact.add(band)
  163. except KeyError:
  164. raise self.DoesNotExist
  165. def list(self, ns, marker=None, limit=1000000):
  166. return [key for (key, value) in
  167. self.get_index(ns).iteritems(marker=marker, limit=limit)]
  168. class BandIO(object):
  169. header_fmt = struct.Struct('<iBB32s')
  170. assert header_fmt.size == 38
  171. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  172. self.path = path
  173. self.fds = LRUCache(capacity)
  174. self.band = nextband
  175. self.limit = limit
  176. self.bands_per_dir = bands_per_dir
  177. self.offset = 0
  178. def close(self):
  179. for band in self.fds.keys():
  180. self.fds.pop(band).close()
  181. def band_filename(self, band):
  182. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  183. def get_fd(self, band, write=False):
  184. try:
  185. return self.fds[band]
  186. except KeyError:
  187. if write and band % 1000 == 0:
  188. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  189. if not os.path.exists(dirname):
  190. os.mkdir(dirname)
  191. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  192. self.fds[band] = fd
  193. return fd
  194. def delete_band(self, band):
  195. os.unlink(self.band_filename(band))
  196. def read(self, band, offset):
  197. fd = self.get_fd(band)
  198. fd.seek(offset)
  199. data = fd.read(self.header_fmt.size)
  200. size, magic, ns, id = self.header_fmt.unpack(data)
  201. assert magic == 0
  202. return fd.read(size - self.header_fmt.size)
  203. def iter_objects(self, band, lookup):
  204. fd = self.get_fd(band)
  205. fd.seek(0)
  206. assert fd.read(8) == 'DARCBAND'
  207. offset = 8
  208. data = fd.read(self.header_fmt.size)
  209. while data:
  210. size, magic, ns, key = self.header_fmt.unpack(data)
  211. offset += size
  212. if lookup(ns, key):
  213. yield ns, key, fd.read(size - self.header_fmt.size)
  214. else:
  215. fd.seek(offset)
  216. data = fd.read(self.header_fmt.size)
  217. def write(self, ns, id, data):
  218. size = len(data) + self.header_fmt.size
  219. if self.offset and self.offset + size > self.limit:
  220. self.close_band()
  221. fd = self.get_fd(self.band, write=True)
  222. fd.seek(self.offset)
  223. if self.offset == 0:
  224. fd.write('DARCBAND')
  225. self.offset = 8
  226. offset = self.offset
  227. fd.write(self.header_fmt.pack(size, 0, ns, id))
  228. fd.write(data)
  229. self.offset += size
  230. return self.band, offset
  231. def close_band(self):
  232. self.band += 1
  233. self.offset = 0
  234. class StoreTestCase(unittest.TestCase):
  235. def setUp(self):
  236. self.tmppath = tempfile.mkdtemp()
  237. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  238. def tearDown(self):
  239. shutil.rmtree(self.tmppath)
  240. def test1(self):
  241. self.assertEqual(self.store.tid, 0)
  242. for x in range(100):
  243. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  244. key50 = '%-32d' % 50
  245. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  246. self.store.delete(0, key50)
  247. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  248. self.store.commit()
  249. self.assertEqual(self.store.tid, 1)
  250. self.store.close()
  251. store2 = Store(os.path.join(self.tmppath, 'store'))
  252. self.assertEqual(store2.tid, 1)
  253. keys = store2.list(0)
  254. for x in range(50):
  255. key = '%-32d' % x
  256. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  257. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  258. assert key50 not in keys
  259. for x in range(51, 100):
  260. key = '%-32d' % x
  261. assert key in keys
  262. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  263. self.assertEqual(len(keys), 99)
  264. for x in range(50):
  265. key = '%-32d' % x
  266. store2.delete(0, key)
  267. self.assertEqual(len(store2.list(0)), 49)
  268. for x in range(51, 100):
  269. key = '%-32d' % x
  270. store2.delete(0, key)
  271. self.assertEqual(len(store2.list(0)), 0)
  272. def suite():
  273. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  274. if __name__ == '__main__':
  275. unittest.main()