store.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. from ConfigParser import RawConfigParser
  2. import fcntl
  3. import numpy
  4. import os
  5. import shutil
  6. import struct
  7. import tempfile
  8. import unittest
  9. from UserDict import DictMixin
  10. from .lrucache import LRUCache
  11. class Store(object):
  12. """Filesystem based transactional key value store
  13. On disk layout:
  14. dir/README
  15. dir/config
  16. dir/bands/<X / BANDS_PER_DIR>/<X>
  17. dir/indexes/<NS>
  18. """
  19. DEFAULT_MAX_BAND_SIZE = 10 * 1024 * 1024
  20. DEFAULT_BANDS_PER_DIR = 10000
  21. class DoesNotExist(KeyError):
  22. """Requested key does not exist"""
  23. def __init__(self, path, create=False):
  24. self.txn_active = False
  25. if create:
  26. self.create(path)
  27. self.open(path)
  28. def create(self, path):
  29. """Create a new empty store at `path`
  30. """
  31. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  32. raise Exception('Path "%s" already exists' % path)
  33. if not os.path.exists(path):
  34. os.mkdir(path)
  35. with open(os.path.join(path, 'README'), 'wb') as fd:
  36. fd.write('This is a DARC store')
  37. os.mkdir(os.path.join(path, 'bands'))
  38. os.mkdir(os.path.join(path, 'indexes'))
  39. config = RawConfigParser()
  40. config.add_section('store')
  41. config.set('store', 'version', '1')
  42. config.set('store', 'id', os.urandom(32).encode('hex'))
  43. config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
  44. config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
  45. config.add_section('state')
  46. config.set('state', 'next_band', '0')
  47. config.set('state', 'tid', '0')
  48. with open(os.path.join(path, 'config'), 'w') as fd:
  49. config.write(fd)
  50. def open(self, path):
  51. self.path = path
  52. if not os.path.isdir(path):
  53. raise Exception('%s Does not look like a darc store' % path)
  54. self.lock_fd = open(os.path.join(path, 'README'), 'r+')
  55. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  56. self.config = RawConfigParser()
  57. self.config.read(os.path.join(path, 'config'))
  58. if self.config.getint('store', 'version') != 1:
  59. raise Exception('%s Does not look like a darc store')
  60. self.id = self.config.get('store', 'id')
  61. self.tid = self.config.getint('state', 'tid')
  62. next_band = self.config.getint('state', 'next_band')
  63. max_band_size = self.config.getint('store', 'max_band_size')
  64. bands_per_dir = self.config.getint('store', 'bands_per_dir')
  65. self.rollback()
  66. self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
  67. def begin_txn(self):
  68. txn_dir = os.path.join(self.path, 'txn.tmp')
  69. # Initialize transaction snapshot
  70. os.mkdir(txn_dir)
  71. shutil.copytree(os.path.join(self.path, 'indexes'),
  72. os.path.join(txn_dir, 'indexes'))
  73. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  74. os.rename(os.path.join(self.path, 'txn.tmp'),
  75. os.path.join(self.path, 'txn.active'))
  76. self.compact = set()
  77. self.txn_active = True
  78. def close(self):
  79. self.rollback()
  80. self.lock_fd.close()
  81. def commit(self):
  82. """Commit transaction, `tid` will be increased by 1
  83. """
  84. self.compact_bands()
  85. self.io.close()
  86. self.tid += 1
  87. self.config.set('state', 'tid', self.tid)
  88. self.config.set('state', 'next_band', self.io.band + 1)
  89. with open(os.path.join(self.path, 'config'), 'w') as fd:
  90. self.config.write(fd)
  91. for i in self.indexes.values():
  92. i.flush()
  93. os.rename(os.path.join(self.path, 'txn.active'),
  94. os.path.join(self.path, 'txn.tmp'))
  95. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  96. self.indexes = {}
  97. self.txn_active = False
  98. def compact_bands(self):
  99. if not self.compact:
  100. return
  101. self.io.close_band()
  102. for band in self.compact:
  103. for ns, key, offset, size in self.io.iter_objects(band):
  104. if key in self.indexes[ns]:
  105. del self.indexes[ns][key]
  106. data = self.io.read(band, offset)
  107. self.indexes[ns][key] = self.io.write(ns, key, data)
  108. for band in self.compact:
  109. self.io.delete_band(band)
  110. def rollback(self):
  111. """
  112. """
  113. # Remove partial transaction
  114. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  115. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  116. # Roll back active transaction
  117. txn_dir = os.path.join(self.path, 'txn.active')
  118. if os.path.exists(txn_dir):
  119. shutil.rmtree(os.path.join(self.path, 'indexes'))
  120. shutil.copytree(os.path.join(txn_dir, 'indexes'),
  121. os.path.join(self.path, 'indexes'))
  122. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  123. shutil.rmtree(txn_dir)
  124. self.indexes = {}
  125. self.txn_active = False
  126. def get_index(self, ns):
  127. try:
  128. return self.indexes[ns]
  129. except KeyError:
  130. filename = os.path.join(self.path, 'indexes', str(ns))
  131. if os.path.exists(filename):
  132. self.indexes[ns] = HashIndex(filename)
  133. else:
  134. self.indexes[ns] = HashIndex.create(filename)
  135. return self.indexes[ns]
  136. def get(self, ns, id):
  137. try:
  138. band, offset = self.get_index(ns)[id]
  139. return self.io.read(band, offset)
  140. except KeyError:
  141. raise self.DoesNotExist
  142. def put(self, ns, id, data):
  143. if not self.txn_active:
  144. self.begin_txn()
  145. band, offset = self.io.write(ns, id, data)
  146. self.get_index(ns)[id] = band, offset
  147. def delete(self, ns, id):
  148. if not self.txn_active:
  149. self.begin_txn()
  150. try:
  151. band, offset = self.get_index(ns).pop(id)
  152. self.compact.add(band)
  153. except KeyError:
  154. raise self.DoesNotExist
  155. def list(self, ns, marker=None, limit=1000000):
  156. return [key for (key, value) in
  157. self.get_index(ns).iteritems(marker=marker, limit=limit)]
  158. class HashIndex(DictMixin):
  159. """Hash Table with open addressing and lazy deletes
  160. """
  161. EMPTY, DELETED = -1, -2
  162. FREE = (EMPTY, DELETED)
  163. i_fmt = struct.Struct('<i')
  164. assert i_fmt.size == 4
  165. idx_type = numpy.dtype('<i,<i,V32')
  166. assert idx_type.itemsize == 40
  167. def __init__(self, path):
  168. self.path = path
  169. self.fd = open(path, 'r+')
  170. assert self.fd.read(8) == 'DARCHASH'
  171. self.num_entries = self.i_fmt.unpack(self.fd.read(4))[0]
  172. self.buckets = numpy.memmap(self.fd, self.idx_type, offset=12)
  173. self.limit = 3 * self.buckets.size / 4 # 75% fill rate
  174. def flush(self):
  175. self.fd.seek(8)
  176. self.fd.write(self.i_fmt.pack(self.num_entries))
  177. self.fd.flush()
  178. self.buckets.flush()
  179. @classmethod
  180. def create(cls, path, capacity=1024):
  181. with open(path, 'wb') as fd:
  182. fd.write('DARCHASH\0\0\0\0')
  183. a = numpy.zeros(capacity, cls.idx_type)
  184. for i in xrange(capacity):
  185. a[i][0] = cls.EMPTY
  186. a.tofile(fd)
  187. return cls(path)
  188. def index(self, key):
  189. hash = self.i_fmt.unpack(key[:4])[0]
  190. return hash % self.buckets.size
  191. def lookup(self, key):
  192. didx = -1
  193. idx = self.index(key)
  194. while True:
  195. while self.buckets[idx][0] == self.DELETED:
  196. if didx == -1:
  197. didx = idx
  198. idx = (idx + 1) % self.buckets.size
  199. if self.buckets[idx][0] == self.EMPTY:
  200. raise KeyError
  201. if str(self.buckets[idx][2]) == key:
  202. if didx != -1:
  203. self.buckets[didx] = self.buckets[idx]
  204. self.buckets[idx][0] = self.DELETED
  205. idx = didx
  206. return idx
  207. idx = (idx + 1) % self.buckets.size
  208. def __contains__(self, key):
  209. try:
  210. self[key]
  211. return True
  212. except KeyError:
  213. return False
  214. def pop(self, key):
  215. idx = self.lookup(key)
  216. band = self.buckets[idx][0]
  217. self.buckets[idx][0] = self.DELETED
  218. self.num_entries -= 1
  219. return band, self.buckets[idx][1]
  220. def __getitem__(self, key):
  221. idx = self.lookup(key)
  222. return self.buckets[idx][0], self.buckets[idx][1]
  223. def __delitem__(self, key):
  224. self.buckets[self.lookup(key)][0] = self.DELETED
  225. self.num_entries -= 1
  226. def __setitem__(self, key, value):
  227. if self.num_entries >= self.limit:
  228. self.resize()
  229. try:
  230. idx = self.lookup(key)
  231. self.buckets[idx][0], self.buckets[idx][1] = value
  232. return
  233. except KeyError:
  234. idx = self.index(key)
  235. while self.buckets[idx][0] not in self.FREE:
  236. idx = (idx + 1) % self.buckets.size
  237. self.buckets[idx][0], self.buckets[idx][1] = value
  238. self.buckets[idx][2] = key
  239. self.num_entries += 1
  240. def iteritems(self, limit=0, marker=None):
  241. n = 0
  242. for idx in xrange(self.buckets.size):
  243. if self.buckets[idx][0] in self.FREE:
  244. continue
  245. key = str(self.buckets[idx][2])
  246. if marker and key != marker:
  247. continue
  248. elif marker:
  249. marker = None
  250. yield key, (self.buckets[idx][0], self.buckets[idx][1])
  251. n += 1
  252. if n == limit:
  253. return
  254. def resize(self, capacity=0):
  255. capacity = capacity or self.buckets.size * 2
  256. print 'resizing to', capacity
  257. if capacity < self.num_entries:
  258. raise ValueError('HashIndex full')
  259. new = HashIndex.create(self.path + '.tmp', capacity)
  260. for key, value in self.iteritems():
  261. new[key] = value
  262. new.flush()
  263. os.unlink(self.path)
  264. os.rename(self.path + '.tmp', self.path)
  265. self.fd = new.fd
  266. self.buckets = new.buckets
  267. self.limit = 3 * self.buckets.size / 4
  268. class BandIO(object):
  269. header_fmt = struct.Struct('<iBB32s')
  270. assert header_fmt.size == 38
  271. def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
  272. self.path = path
  273. self.fds = LRUCache(capacity)
  274. self.band = nextband
  275. self.limit = limit
  276. self.bands_per_dir = bands_per_dir
  277. self.offset = 0
  278. def close(self):
  279. for band in self.fds.keys():
  280. self.fds.pop(band).close()
  281. def band_filename(self, band):
  282. return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
  283. def get_fd(self, band, write=False):
  284. try:
  285. return self.fds[band]
  286. except KeyError:
  287. if write and band % 1000 == 0:
  288. dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
  289. if not os.path.exists(dirname):
  290. os.mkdir(dirname)
  291. fd = open(self.band_filename(band), write and 'w+' or 'rb')
  292. self.fds[band] = fd
  293. return fd
  294. def delete_band(self, band):
  295. os.unlink(self.band_filename(band))
  296. def read(self, band, offset):
  297. fd = self.get_fd(band)
  298. fd.seek(offset)
  299. data = fd.read(self.header_fmt.size)
  300. size, magic, ns, id = self.header_fmt.unpack(data)
  301. return fd.read(size - self.header_fmt.size)
  302. def iter_objects(self, band):
  303. fd = self.get_fd(band)
  304. fd.seek(0)
  305. assert fd.read(8) == 'DARCBAND'
  306. offset = 8
  307. data = fd.read(self.header_fmt.size)
  308. while data:
  309. size, magic, ns, key = self.header_fmt.unpack(data)
  310. size -= self.header_fmt.size
  311. yield ns, key, offset, size
  312. offset += size + self.header_fmt.size
  313. fd.seek(offset)
  314. data = fd.read(self.header_fmt.size)
  315. def write(self, ns, id, data):
  316. size = len(data) + self.header_fmt.size
  317. if self.offset and self.offset + size > self.limit:
  318. self.close_band()
  319. fd = self.get_fd(self.band, write=True)
  320. fd.seek(self.offset)
  321. if self.offset == 0:
  322. fd.write('DARCBAND')
  323. self.offset = 8
  324. offset = self.offset
  325. fd.write(self.header_fmt.pack(size, 0, ns, id))
  326. fd.write(data)
  327. self.offset += size
  328. return self.band, offset
  329. def close_band(self):
  330. self.band += 1
  331. self.offset = 0
  332. class StoreTestCase(unittest.TestCase):
  333. def setUp(self):
  334. self.tmppath = tempfile.mkdtemp()
  335. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  336. def tearDown(self):
  337. shutil.rmtree(self.tmppath)
  338. def test1(self):
  339. self.assertEqual(self.store.tid, 0)
  340. for x in range(100):
  341. self.store.put(0, '%-32d' % x, 'SOMEDATA')
  342. key50 = '%-32d' % 50
  343. self.assertEqual(self.store.get(0, key50), 'SOMEDATA')
  344. self.store.delete(0, key50)
  345. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50))
  346. self.store.commit()
  347. self.assertEqual(self.store.tid, 1)
  348. self.store.close()
  349. store2 = Store(os.path.join(self.tmppath, 'store'))
  350. self.assertEqual(store2.tid, 1)
  351. keys = store2.list(0)
  352. for x in range(50):
  353. key = '%-32d' % x
  354. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  355. self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50))
  356. assert key50 not in keys
  357. for x in range(51, 100):
  358. key = '%-32d' % x
  359. assert key in keys
  360. self.assertEqual(store2.get(0, key), 'SOMEDATA')
  361. self.assertEqual(len(keys), 99)
  362. for x in range(50):
  363. key = '%-32d' % x
  364. store2.delete(0, key)
  365. self.assertEqual(len(store2.list(0)), 49)
  366. for x in range(51, 100):
  367. key = '%-32d' % x
  368. store2.delete(0, key)
  369. self.assertEqual(len(store2.list(0)), 0)
  370. def suite():
  371. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  372. if __name__ == '__main__':
  373. unittest.main()