store.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. from __future__ import with_statement
  2. from ConfigParser import RawConfigParser
  3. import errno
  4. import fcntl
  5. import os
  6. import msgpack
  7. import shutil
  8. import struct
  9. import tempfile
  10. import unittest
  11. from zlib import crc32
  12. from .hashindex import NSIndex
  13. from .helpers import IntegrityError, deferrable
  14. from .lrucache import LRUCache
  15. class Store(object):
  16. """Filesystem based transactional key value store
  17. On disk layout:
  18. dir/README
  19. dir/config
  20. dir/bands/<X / BANDS_PER_DIR>/<X>
  21. dir/band
  22. dir/index
  23. """
  24. DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
  25. DEFAULT_BANDS_PER_DIR = 10000
  26. class DoesNotExist(KeyError):
  27. """Requested key does not exist"""
  28. def __init__(self, path, create=False):
  29. self.txn_active = False
  30. if create:
  31. self.create(path)
  32. self.open(path)
  33. def create(self, path):
  34. """Create a new empty store at `path`
  35. """
  36. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  37. raise Exception('Path "%s" already exists' % path)
  38. if not os.path.exists(path):
  39. os.mkdir(path)
  40. with open(os.path.join(path, 'README'), 'wb') as fd:
  41. fd.write('This is a DARC store')
  42. os.mkdir(os.path.join(path, 'bands'))
  43. config = RawConfigParser()
  44. config.add_section('store')
  45. config.set('store', 'version', '1')
  46. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  47. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  48. config.set('store', 'next_band', '0')
  49. config.add_section('meta')
  50. config.set('meta', 'manifest', '')
  51. config.set('meta', 'id', os.urandom(32).encode('hex'))
  52. NSIndex.create(os.path.join(path, 'index'))
  53. self.write_dict(os.path.join(path, 'band'), {})
  54. with open(os.path.join(path, 'config'), 'w') as fd:
  55. config.write(fd)
  56. def open(self, path):
  57. self.path = path
  58. if not os.path.isdir(path):
  59. raise Exception('%s Does not look like a darc store' % path)
  60. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  61. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  62. self.rollback()
  63. self.config = RawConfigParser()
  64. self.config.read(os.path.join(path, 'config'))
  65. if self.config.getint('store', 'version') != 1:
  66. raise Exception('%s Does not look like a darc store')
  67. next_band = self.config.getint('store', 'next_band')
  68. max_band_size = self.config.getint('store', 'max_band_size')
  69. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  70. self.meta = dict(self.config.items('meta'))
  71. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  72. self.io.cleanup()
  73. def read_dict(self, filename):
  74. with open(filename, 'rb') as fd:
  75. return msgpack.unpackb(fd.read())
  76. def write_dict(self, filename, d):
  77. with open(filename+'.tmp', 'wb') as fd:
  78. fd.write(msgpack.packb(d))
  79. os.rename(filename+'.tmp', filename)
  80. def delete_bands(self):
  81. delete_path = os.path.join(self.path, 'delete')
  82. if os.path.exists(delete_path):
  83. bands = self.read_dict(os.path.join(self.path, 'band'))
  84. for band in self.read_dict(delete_path):
  85. assert bands.pop(band, 0) == 0
  86. self.io.delete_band(band, missing_ok=True)
  87. self.write_dict(os.path.join(self.path, 'band'), bands)
  88. def begin_txn(self):
  89. txn_dir = os.path.join(self.path, 'txn.tmp')
  90. # Initialize transaction snapshot
  91. os.mkdir(txn_dir)
  92. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  93. shutil.copy(os.path.join(self.path, 'index'), txn_dir)
  94. shutil.copy(os.path.join(self.path, 'band'), txn_dir)
  95. os.rename(os.path.join(self.path, 'txn.tmp'),
  96. os.path.join(self.path, 'txn.active'))
  97. self.compact = set()
  98. self.txn_active = True
  99. def close(self):
  100. self.rollback()
  101. self.lock_fd.close()
  102. def commit(self, meta=None):
  103. """Commit transaction
  104. """
  105. meta = meta or self.meta
  106. self.compact_bands()
  107. self.io.close()
  108. self.config.set('store', 'next_band', self.io.band + 1)
  109. self.config.remove_section('meta')
  110. self.config.add_section('meta')
  111. for k, v in meta.items():
  112. self.config.set('meta', k, v)
  113. with open(os.path.join(self.path, 'config'), 'w') as fd:
  114. self.config.write(fd)
  115. self.index.flush()
  116. self.write_dict(os.path.join(self.path, 'band'), self.bands)
  117. # If we crash before this line, the transaction will be
  118. # rolled back by open()
  119. os.rename(os.path.join(self.path, 'txn.active'),
  120. os.path.join(self.path, 'txn.commit'))
  121. self.rollback()
  122. def compact_bands(self):
  123. """Compact sparse bands by copying data into new bands
  124. """
  125. if not self.compact:
  126. return
  127. self.io.close_band()
  128. def lookup(key):
  129. return key in self.index
  130. bands = self.bands
  131. for band in self.compact:
  132. if bands[band] > 0:
  133. for key, data in self.io.iter_objects(band, lookup):
  134. new_band, offset = self.io.write(key, data)
  135. self.index[key] = new_band, offset
  136. bands[band] -= 1
  137. bands.setdefault(new_band, 0)
  138. bands[new_band] += 1
  139. self.write_dict(os.path.join(self.path, 'delete'), tuple(self.compact))
  140. def rollback(self):
  141. """
  142. """
  143. # Commit any half committed transaction
  144. if os.path.exists(os.path.join(self.path, 'txn.commit')):
  145. self.delete_bands()
  146. os.rename(os.path.join(self.path, 'txn.commit'),
  147. os.path.join(self.path, 'txn.tmp'))
  148. delete_path = os.path.join(self.path, 'delete')
  149. if os.path.exists(delete_path):
  150. os.unlink(delete_path)
  151. # Roll back active transaction
  152. txn_dir = os.path.join(self.path, 'txn.active')
  153. if os.path.exists(txn_dir):
  154. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  155. shutil.copy(os.path.join(txn_dir, 'index'), self.path)
  156. shutil.copy(os.path.join(txn_dir, 'band'), self.path)
  157. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  158. # Remove partially removed transaction
  159. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  160. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  161. self.index = NSIndex(os.path.join(self.path, 'index'))
  162. self.bands = self.read_dict(os.path.join(self.path, 'band'))
  163. self.txn_active = False
  164. @deferrable
  165. def get(self, id):
  166. try:
  167. band, offset = self.index[id]
  168. return self.io.read(band, offset, id)
  169. except KeyError:
  170. raise self.DoesNotExist
  171. @deferrable
  172. def put(self, id, data):
  173. if not self.txn_active:
  174. self.begin_txn()
  175. band, offset = self.io.write(id, data)
  176. self.bands.setdefault(band, 0)
  177. self.bands[band] += 1
  178. self.index[id] = band, offset
  179. @deferrable
  180. def delete(self, id):
  181. if not self.txn_active:
  182. self.begin_txn()
  183. try:
  184. band, offset = self.index.pop(id)
  185. self.bands[band] -= 1
  186. self.compact.add(band)
  187. except KeyError:
  188. raise self.DoesNotExist
  189. def flush_rpc(self, *args):
  190. pass
  191. class BandIO(object):
  192. header_fmt = struct.Struct('<IBI32s')
  193. assert header_fmt.size == 41
  194. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  195. self.path = path
  196. self.fds = LRUCache(capacity)
  197. self.band = nextband
  198. self.limit = limit
  199. self.bands_per_dir = bands_per_dir
  200. self.offset = 0
  201. def close(self):
  202. for band in self.fds.keys():
  203. self.fds.pop(band).close()
  204. def cleanup(self):
  205. """Delete band files left by aborted transactions
  206. """
  207. band = self.band
  208. while True:
  209. filename = self.band_filename(band)
  210. if not os.path.exists(filename):
  211. break
  212. os.unlink(filename)
  213. band += 1
  214. def band_filename(self, band):
  215. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  216. def get_fd(self, band, write=False):
  217. try:
  218. return self.fds[band]
  219. except KeyError:
  220. if write and band % 1000 == 0:
  221. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  222. if not os.path.exists(dirname):
  223. os.mkdir(dirname)
  224. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  225. self.fds[band] = fd
  226. return fd
  227. def delete_band(self, band, missing_ok=False):
  228. try:
  229. os.unlink(self.band_filename(band))
  230. except OSError, e:
  231. if not missing_ok or e.errno != errno.ENOENT:
  232. raise
  233. def read(self, band, offset, id):
  234. fd = self.get_fd(band)
  235. fd.seek(offset)
  236. data = fd.read(self.header_fmt.size)
  237. size, magic, hash, id_ = self.header_fmt.unpack(data)
  238. if magic != 0 or id != id_:
  239. raise IntegrityError('Invalid band entry header')
  240. data = fd.read(size - self.header_fmt.size)
  241. if crc32(data) & 0xffffffff != hash:
  242. raise IntegrityError('Band checksum mismatch')
  243. return data
  244. def iter_objects(self, band, lookup):
  245. fd = self.get_fd(band)
  246. fd.seek(0)
  247. if fd.read(8) != 'DARCBAND':
  248. raise IntegrityError('Invalid band header')
  249. offset = 8
  250. data = fd.read(self.header_fmt.size)
  251. while data:
  252. size, magic, hash, key = self.header_fmt.unpack(data)
  253. if magic != 0:
  254. raise IntegrityError('Unknown band entry header')
  255. offset += size
  256. if lookup(key):
  257. data = fd.read(size - self.header_fmt.size)
  258. if crc32(data) & 0xffffffff != hash:
  259. raise IntegrityError('Band checksum mismatch')
  260. yield key, data
  261. else:
  262. fd.seek(offset)
  263. data = fd.read(self.header_fmt.size)
  264. def write(self, id, data):
  265. size = len(data) + self.header_fmt.size
  266. if self.offset and self.offset + size > self.limit:
  267. self.close_band()
  268. fd = self.get_fd(self.band, write=True)
  269. fd.seek(self.offset)
  270. if self.offset == 0:
  271. fd.write('DARCBAND')
  272. self.offset = 8
  273. offset = self.offset
  274. hash = crc32(data) & 0xffffffff
  275. fd.write(self.header_fmt.pack(size, 0, hash, id))
  276. fd.write(data)
  277. self.offset += size
  278. return self.band, offset
  279. def close_band(self):
  280. self.band += 1
  281. self.offset = 0
  282. class StoreTestCase(unittest.TestCase):
  283. def setUp(self):
  284. self.tmppath = tempfile.mkdtemp()
  285. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  286. def tearDown(self):
  287. shutil.rmtree(self.tmppath)
  288. def test1(self):
  289. for x in range(100):
  290. self.store.put('%-32d' % x, 'SOMEDATA')
  291. key50 = '%-32d' % 50
  292. self.assertEqual(self.store.get(key50), 'SOMEDATA')
  293. self.store.delete(key50)
  294. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(key50))
  295. self.store.commit()
  296. self.store.close()
  297. store2 = Store(os.path.join(self.tmppath, 'store'))
  298. def suite():
  299. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  300. if __name__ == '__main__':
  301. unittest.main()