from ConfigParser import RawConfigParser import fcntl import os import shutil import struct import tempfile import unittest from zlib import crc32 from .hashindex import NSIndex, BandIndex from .helpers import IntegrityError from .lrucache import LRUCache class Store(object): """Filesystem based transactional key value store On disk layout: dir/README dir/config dir/bands// dir/indexes/ """ DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024 DEFAULT_BANDS_PER_DIR = 10000 class DoesNotExist(KeyError): """Requested key does not exist""" def __init__(self, path, create=False): self.txn_active = False if create: self.create(path) self.open(path) def create(self, path): """Create a new empty store at `path` """ if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)): raise Exception('Path "%s" already exists' % path) if not os.path.exists(path): os.mkdir(path) with open(os.path.join(path, 'README'), 'wb') as fd: fd.write('This is a DARC store') os.mkdir(os.path.join(path, 'bands')) os.mkdir(os.path.join(path, 'indexes')) BandIndex.create(os.path.join(path, 'indexes', 'bands')) config = RawConfigParser() config.add_section('store') config.set('store', 'version', '1') config.set('store', 'id', os.urandom(32).encode('hex')) config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR) config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE) config.add_section('state') config.set('state', 'next_band', '0') config.set('state', 'tid', '0') with open(os.path.join(path, 'config'), 'w') as fd: config.write(fd) def open(self, path): self.path = path if not os.path.isdir(path): raise Exception('%s Does not look like a darc store' % path) self.lock_fd = open(os.path.join(path, 'README'), 'r+') fcntl.flock(self.lock_fd, fcntl.LOCK_EX) self.config = RawConfigParser() self.config.read(os.path.join(path, 'config')) if self.config.getint('store', 'version') != 1: raise Exception('%s Does not look like a darc store') self.id = self.config.get('store', 'id').decode('hex') self.tid = self.config.getint('state', 'tid') next_band = self.config.getint('state', 'next_band') max_band_size = self.config.getint('store', 'max_band_size') bands_per_dir = self.config.getint('store', 'bands_per_dir') self.rollback() self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir) def begin_txn(self): txn_dir = os.path.join(self.path, 'txn.tmp') # Initialize transaction snapshot os.mkdir(txn_dir) shutil.copytree(os.path.join(self.path, 'indexes'), os.path.join(txn_dir, 'indexes')) shutil.copy(os.path.join(self.path, 'config'), txn_dir) os.rename(os.path.join(self.path, 'txn.tmp'), os.path.join(self.path, 'txn.active')) self.compact = set() self.txn_active = True self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands')) def close(self): self.rollback() self.lock_fd.close() def commit(self): """Commit transaction, `tid` will be increased by 1 """ self.compact_bands() self.io.close() self.tid += 1 self.config.set('state', 'tid', self.tid) self.config.set('state', 'next_band', self.io.band + 1) with open(os.path.join(self.path, 'config'), 'w') as fd: self.config.write(fd) for i in self.indexes.values(): i.flush() self.bands.flush() os.rename(os.path.join(self.path, 'txn.active'), os.path.join(self.path, 'txn.tmp')) shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.indexes = {} self.txn_active = False def compact_bands(self): if not self.compact: return self.io.close_band() def lookup(ns, key): return key in self.indexes[ns] for band in self.compact: if self.bands[band] > 0: for ns, key, data in self.io.iter_objects(band, lookup): new_band, offset = self.io.write(ns, key, data) self.indexes[ns][key] = new_band, offset self.bands[band] -= 1 self.bands.setdefault(new_band, 0) self.bands[new_band] += 1 for band in self.compact: assert self.bands.pop(band) == 0 self.io.delete_band(band) def rollback(self): """ """ # Remove partial transaction if os.path.exists(os.path.join(self.path, 'txn.tmp')): shutil.rmtree(os.path.join(self.path, 'txn.tmp')) # Roll back active transaction txn_dir = os.path.join(self.path, 'txn.active') if os.path.exists(txn_dir): shutil.rmtree(os.path.join(self.path, 'indexes')) shutil.copytree(os.path.join(txn_dir, 'indexes'), os.path.join(self.path, 'indexes')) shutil.copy(os.path.join(txn_dir, 'config'), self.path) shutil.rmtree(txn_dir) self.indexes = {} self.txn_active = False def get_index(self, ns): try: return self.indexes[ns] except KeyError: filename = os.path.join(self.path, 'indexes', 'ns%d' % ns) if os.path.exists(filename): self.indexes[ns] = NSIndex(filename) else: self.indexes[ns] = NSIndex.create(filename) return self.indexes[ns] def get(self, ns, id): try: band, offset = self.get_index(ns)[id] return self.io.read(band, offset) except KeyError: raise self.DoesNotExist def put(self, ns, id, data): if not self.txn_active: self.begin_txn() band, offset = self.io.write(ns, id, data) self.bands.setdefault(band, 0) self.bands[band] += 1 self.get_index(ns)[id] = band, offset def delete(self, ns, id): if not self.txn_active: self.begin_txn() try: band, offset = self.get_index(ns).pop(id) self.bands[band] -= 1 self.compact.add(band) except KeyError: raise self.DoesNotExist def list(self, ns, marker=None, limit=1000000): return [key for (key, value) in self.get_index(ns).iteritems(marker=marker, limit=limit)] class BandIO(object): header_fmt = struct.Struct(' self.limit: self.close_band() fd = self.get_fd(self.band, write=True) fd.seek(self.offset) if self.offset == 0: fd.write('DARCBAND') self.offset = 8 offset = self.offset hash = crc32(data) fd.write(self.header_fmt.pack(size, 0, hash, ns, id)) fd.write(data) self.offset += size return self.band, offset def close_band(self): self.band += 1 self.offset = 0 class StoreTestCase(unittest.TestCase): def setUp(self): self.tmppath = tempfile.mkdtemp() self.store = Store(os.path.join(self.tmppath, 'store'), create=True) def tearDown(self): shutil.rmtree(self.tmppath) def test1(self): self.assertEqual(self.store.tid, 0) for x in range(100): self.store.put(0, '%-32d' % x, 'SOMEDATA') key50 = '%-32d' % 50 self.assertEqual(self.store.get(0, key50), 'SOMEDATA') self.store.delete(0, key50) self.assertRaises(self.store.DoesNotExist, lambda: self.store.get(0, key50)) self.store.commit() self.assertEqual(self.store.tid, 1) self.store.close() store2 = Store(os.path.join(self.tmppath, 'store')) self.assertEqual(store2.tid, 1) keys = store2.list(0) for x in range(50): key = '%-32d' % x self.assertEqual(store2.get(0, key), 'SOMEDATA') self.assertRaises(store2.DoesNotExist, lambda: store2.get(0, key50)) assert key50 not in keys for x in range(51, 100): key = '%-32d' % x assert key in keys self.assertEqual(store2.get(0, key), 'SOMEDATA') self.assertEqual(len(keys), 99) for x in range(50): key = '%-32d' % x store2.delete(0, key) self.assertEqual(len(store2.list(0)), 49) for x in range(51, 100): key = '%-32d' % x store2.delete(0, key) self.assertEqual(len(store2.list(0)), 0) def suite(): return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase) if __name__ == '__main__': unittest.main()