store.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. from ConfigParser import RawConfigParser
  2. import fcntl
  3. import os
  4. import shutil
  5. import struct
  6. import tempfile
  7. import unittest
  8. from zlib import crc32
  9. from .hashindex import NSIndex, BandIndex
  10. from .helpers import IntegrityError
  11. from .lrucache import LRUCache
  12. class Store(object):
  13. """Filesystem based transactional key value store
  14. On disk layout:
  15. dir/README
  16. dir/config
  17. dir/bands/<X / BANDS_PER_DIR>/<X>
  18. dir/indexes/<NS>
  19. """
  20. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  21. DEFAULT_BANDS_PER_DIR = 10000
  22. class DoesNotExist(KeyError):
  23. """Requested key does not exist"""
  24. def __init__(self, path, create=False):
  25. self.txn_active = False
  26. if create:
  27. self.create(path)
  28. self.open(path)
  29. def create(self, path):
  30. """Create a new empty store at `path`
  31. """
  32. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  33. raise Exception('Path "%s" already exists' % path)
  34. if not os.path.exists(path):
  35. os.mkdir(path)
  36. with open(os.path.join(path, 'README'), 'wb') as fd:
  37. fd.write('This is a DARC store')
  38. os.mkdir(os.path.join(path, 'bands'))
  39. os.mkdir(os.path.join(path, 'indexes'))
  40. BandIndex.create(os.path.join(path, 'indexes', 'bands'))
  41. config = RawConfigParser()
  42. config.add_section('store')
  43. config.set('store', 'version', '1')
  44. config.set('store', 'id', os.urandom(32).encode('hex'))
  45. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  46. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  47. config.add_section('state')
  48. config.set('state', 'next_band', '0')
  49. config.set('state', 'tid', '0')
  50. with open(os.path.join(path, 'config'), 'w') as fd:
  51. config.write(fd)
  52. def open(self, path):
  53. self.path = path
  54. if not os.path.isdir(path):
  55. raise Exception('%s Does not look like a darc store' % path)
  56. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  57. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  58. self.config = RawConfigParser()
  59. self.config.read(os.path.join(path, 'config'))
  60. if self.config.getint('store', 'version') != 1:
  61. raise Exception('%s Does not look like a darc store')
  62. self.id = self.config.get('store', 'id').decode('hex')
  63. self.tid = self.config.getint('state', 'tid')
  64. next_band = self.config.getint('state', 'next_band')
  65. max_band_size = self.config.getint('store', 'max_band_size')
  66. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  67. self.rollback()
  68. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  69. def begin_txn(self):
  70. txn_dir = os.path.join(self.path, 'txn.tmp')
  71. # Initialize transaction snapshot
  72. os.mkdir(txn_dir)
  73. shutil.copytree(os.path.join(self.path, 'indexes'),
  74. os.path.join(txn_dir, 'indexes'))
  75. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  76. os.rename(os.path.join(self.path, 'txn.tmp'),
  77. os.path.join(self.path, 'txn.active'))
  78. self.compact = set()
  79. self.txn_active = True
  80. self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands'))
  81. def close(self):
  82. self.rollback()
  83. self.lock_fd.close()
  84. def commit(self):
  85. """Commit transaction, `tid` will be increased by 1
  86. """
  87. self.compact_bands()
  88. self.io.close()
  89. self.tid += 1
  90. self.config.set('state', 'tid', self.tid)
  91. self.config.set('state', 'next_band', self.io.band + 1)
  92. with open(os.path.join(self.path, 'config'), 'w') as fd:
  93. self.config.write(fd)
  94. for i in self.indexes.values():
  95. i.flush()
  96. self.bands.flush()
  97. os.rename(os.path.join(self.path, 'txn.active'),
  98. os.path.join(self.path, 'txn.tmp'))
  99. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  100. self.indexes = {}
  101. self.txn_active = False
  102. def compact_bands(self):
  103. if not self.compact:
  104. return
  105. self.io.close_band()
  106. def lookup(ns, key):
  107. return key in self.indexes[ns]
  108. for band in self.compact:
  109. if self.bands[band] > 0:
  110. for ns, key, data in self.io.iter_objects(band, lookup):
  111. new_band, offset = self.io.write(ns, key, data)
  112. self.indexes[ns][key] = new_band, offset
  113. self.bands[band] -= 1
  114. self.bands.setdefault(new_band, 0)
  115. self.bands[new_band] += 1
  116. for band in self.compact:
  117. assert self.bands.pop(band) == 0
  118. self.io.delete_band(band)
  119. def rollback(self):
  120. """
  121. """
  122. # Remove partial transaction
  123. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  124. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  125. # Roll back active transaction
  126. txn_dir = os.path.join(self.path, 'txn.active')
  127. if os.path.exists(txn_dir):
  128. shutil.rmtree(os.path.join(self.path, 'indexes'))
  129. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  130. os.path.join(self.path, 'indexes'))
  131. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  132. shutil.rmtree(txn_dir)
  133. self.indexes = {}
  134. self.txn_active = False
  135. def get_index(self, ns):
  136. try:
  137. return self.indexes[ns]
  138. except KeyError:
  139. filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
  140. if os.path.exists(filename):
  141. self.indexes[ns] = NSIndex(filename)
  142. else:
  143. self.indexes[ns] = NSIndex.create(filename)
  144. return self.indexes[ns]
  145. def get(self, ns, id):
  146. try:
  147. band, offset = self.get_index(ns)[id]
  148. return self.io.read(band, offset)
  149. except KeyError:
  150. raise self.DoesNotExist
  151. def put(self, ns, id, data):
  152. if not self.txn_active:
  153. self.begin_txn()
  154. band, offset = self.io.write(ns, id, data)
  155. self.bands.setdefault(band, 0)
  156. self.bands[band] += 1
  157. self.get_index(ns)[id] = band, offset
  158. def delete(self, ns, id):
  159. if not self.txn_active:
  160. self.begin_txn()
  161. try:
  162. band, offset = self.get_index(ns).pop(id)
  163. self.bands[band] -= 1
  164. self.compact.add(band)
  165. except KeyError:
  166. raise self.DoesNotExist
  167. def list(self, ns, marker=None, limit=1000000):
  168. return [key for (key, value) in
  169. self.get_index(ns).iteritems(marker=marker, limit=limit)]
  170. class BandIO(object):
  171. header_fmt = struct.Struct('<iBiB32s')
  172. assert header_fmt.size == 42
  173. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  174. self.path = path
  175. self.fds = LRUCache(capacity)
  176. self.band = nextband
  177. self.limit = limit
  178. self.bands_per_dir = bands_per_dir
  179. self.offset = 0
  180. def close(self):
  181. for band in self.fds.keys():
  182. self.fds.pop(band).close()
  183. def band_filename(self, band):
  184. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  185. def get_fd(self, band, write=False):
  186. try:
  187. return self.fds[band]
  188. except KeyError:
  189. if write and band % 1000 == 0:
  190. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  191. if not os.path.exists(dirname):
  192. os.mkdir(dirname)
  193. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  194. self.fds[band] = fd
  195. return fd
  196. def delete_band(self, band):
  197. os.unlink(self.band_filename(band))
  198. def read(self, band, offset):
  199. fd = self.get_fd(band)
  200. fd.seek(offset)
  201. data = fd.read(self.header_fmt.size)
  202. size, magic, hash, ns, id = self.header_fmt.unpack(data)
  203. assert magic == 0
  204. data = fd.read(size - self.header_fmt.size)
  205. if crc32(data) != hash:
  206. raise IntegrityError('Band checksum mismatch')
  207. return data
  208. def iter_objects(self, band, lookup):
  209. fd = self.get_fd(band)
  210. fd.seek(0)
  211. assert fd.read(8) == 'DARCBAND'
  212. offset = 8
  213. data = fd.read(self.header_fmt.size)
  214. while data:
  215. size, magic, hash, ns, key = self.header_fmt.unpack(data)
  216. assert magic == 0
  217. offset += size
  218. if lookup(ns, key):
  219. data = fd.read(size - self.header_fmt.size)
  220. if crc32(data) != hash:
  221. raise IntegrityError('Band checksum mismatch')
  222. yield ns, key, data
  223. else:
  224. fd.seek(offset)
  225. data = fd.read(self.header_fmt.size)
  226. def write(self, ns, id, data):
  227. size = len(data) + self.header_fmt.size
  228. if self.offset and self.offset + size > self.limit:
  229. self.close_band()
  230. fd = self.get_fd(self.band, write=True)
  231. fd.seek(self.offset)
  232. if self.offset == 0:
  233. fd.write('DARCBAND')
  234. self.offset = 8
  235. offset = self.offset
  236. hash = crc32(data)
  237. fd.write(self.header_fmt.pack(size, 0, hash, ns, id))
  238. fd.write(data)
  239. self.offset += size
  240. return self.band, offset
  241. def close_band(self):
  242. self.band += 1
  243. self.offset = 0
  244. class StoreTestCase(unittest.TestCase):
  245. def setUp(self):
  246. self.tmppath = tempfile.mkdtemp()
  247. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  248. def tearDown(self):
  249. shutil.rmtree(self.tmppath)
  250. def test1(self):
  251. self.assertEqual(self.store.tid, 0)
  252. for x in range(100):
  253. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  254. key50 = '%-32d' % 50
  255. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  256. self.store.delete(0, key50)
  257. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  258. self.store.commit()
  259. self.assertEqual(self.store.tid, 1)
  260. self.store.close()
  261. store2 = Store(os.path.join(self.tmppath, 'store'))
  262. self.assertEqual(store2.tid, 1)
  263. keys = store2.list(0)
  264. for x in range(50):
  265. key = '%-32d' % x
  266. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  267. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  268. assert key50 not in keys
  269. for x in range(51, 100):
  270. key = '%-32d' % x
  271. assert key in keys
  272. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  273. self.assertEqual(len(keys), 99)
  274. for x in range(50):
  275. key = '%-32d' % x
  276. store2.delete(0, key)
  277. self.assertEqual(len(store2.list(0)), 49)
  278. for x in range(51, 100):
  279. key = '%-32d' % x
  280. store2.delete(0, key)
  281. self.assertEqual(len(store2.list(0)), 0)
  282. def suite():
  283. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  284. if __name__ == '__main__':
  285. unittest.main()