store.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import os
  2. import tempfile
  3. import shutil
  4. import unittest
  5. import sqlite3
  6. import fcntl
  7. Binary = sqlite3.Binary
  8. class Store(object):
  9. """
  10. """
  11. class DoesNotExist(KeyError):
  12. """"""
  13. class AlreadyExists(KeyError):
  14. """"""
  15. IDLE = 'Idle'
  16. OPEN = 'Open'
  17. ACTIVE = 'Active'
  18. BAND_LIMIT = 1 * 1024 * 1024
  19. def __init__(self, path, create=False):
  20. self.read_fd = None
  21. self.write_fd = None
  22. if create:
  23. self.create(path)
  24. self.open(path)
  25. def get_option(self, key):
  26. return self.cursor.execute('SELECT value FROM system WHERE key=?', (key,)) \
  27. .fetchone()[0]
  28. def set_option(self, key, value):
  29. return self.cursor.execute('UPDATE system SET value=? WHERE key=?',
  30. (value, key))
  31. def open(self, path):
  32. if not os.path.isdir(path):
  33. raise Exception('%s Does not look like a darc store' % path)
  34. db_path = os.path.join(path, 'darcstore.db')
  35. if not os.path.exists(db_path):
  36. raise Exception('%s Does not look like a darc store')
  37. self.lock_fd = open(os.path.join(path, 'lock'), 'w')
  38. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  39. self.path = path
  40. self.cnx = sqlite3.connect(db_path)
  41. self.cnx.text_factory = str
  42. self.cursor = self.cnx.cursor()
  43. self._begin()
  44. def _begin(self):
  45. if self.read_fd:
  46. self.read_fd.close()
  47. self.read_fd = None
  48. if self.write_fd:
  49. self.write_fd.close()
  50. self.write_fd = None
  51. self.version = self.get_option('version')
  52. self.id = self.get_option('id').decode('hex')
  53. self.tid = self.get_option('tid')
  54. self.nextband = self.get_option('nextband')
  55. self.bandlimit = self.get_option('bandlimit')
  56. assert self.version == 1
  57. self.state = self.OPEN
  58. self.read_band = None
  59. self.write_band = None
  60. self.to_delete = set()
  61. band = self.nextband
  62. while os.path.exists(self.band_filename(band)):
  63. os.unlink(self.band_filename(band))
  64. band += 1
  65. def create(self, path):
  66. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  67. raise Exception('Path "%s" already exists' % path)
  68. if not os.path.exists(path):
  69. os.mkdir(path)
  70. os.mkdir(os.path.join(path, 'bands'))
  71. cnx = sqlite3.connect(os.path.join(path, 'darcstore.db'))
  72. cnx.execute('CREATE TABLE objects(ns BINARY NOT NULL, id BINARY NOT NULL, '
  73. 'band NOT NULL, offset NOT NULL, size NOT NULL)')
  74. cnx.execute('CREATE UNIQUE INDEX objects_pk ON objects(ns, id)')
  75. cnx.execute('CREATE TABLE system(key UNIQUE NOT NULL, value)')
  76. cnx.executemany('INSERT INTO system VALUES(?, ?)',
  77. (('id', os.urandom(32).encode('hex')),
  78. ('version', 1),
  79. ('tid', 0),
  80. ('nextband', 0),
  81. ('bandlimit', self.BAND_LIMIT)))
  82. cnx.commit()
  83. def close(self):
  84. self.rollback()
  85. self.cnx.close()
  86. self.lock_fd.close()
  87. os.unlink(os.path.join(self.path, 'lock'))
  88. def commit(self):
  89. """
  90. """
  91. self.band = None
  92. self.delete_bands()
  93. self.tid += 1
  94. self._begin()
  95. def delete_bands(self):
  96. for b in self.to_delete:
  97. objects = self.cursor.execute('SELECT ns, id, offset, size '
  98. 'FROM objects WHERE band=? ORDER BY offset',
  99. (b,)).fetchall()
  100. for o in objects:
  101. band, offset, size = self.store_data(self.retrieve_data(b, *o[2:]))
  102. self.cursor.execute('UPDATE objects SET band=?, offset=?, size=? '
  103. 'WHERE ns=? AND id=?', (band, offset, size,
  104. Binary(o[0]), Binary(o[1])))
  105. self.set_option('tid', self.tid + 1)
  106. self.set_option('nextband', self.nextband)
  107. self.cnx.commit()
  108. for b in self.to_delete:
  109. os.unlink(self.band_filename(b))
  110. def rollback(self):
  111. """
  112. """
  113. self.cnx.rollback()
  114. self._begin()
  115. def get(self, ns, id):
  116. """
  117. """
  118. self.cursor.execute('SELECT band, offset, size FROM objects WHERE ns=? and id=?',
  119. (Binary(ns), Binary(id)))
  120. row = self.cursor.fetchone()
  121. if row:
  122. return self.retrieve_data(*row)
  123. else:
  124. raise self.DoesNotExist
  125. def band_filename(self, band):
  126. return os.path.join(self.path, 'bands', str(band / 1000), str(band))
  127. def retrieve_data(self, band, offset, size):
  128. if self.write_band == band:
  129. self.write_fd.flush()
  130. if self.read_band != band:
  131. self.read_band = band
  132. if self.read_fd:
  133. self.read_fd.close()
  134. self.read_fd = open(self.band_filename(band), 'rb')
  135. self.read_fd.seek(offset)
  136. return self.read_fd.read(size)
  137. def store_data(self, data):
  138. if self.write_band is None:
  139. self.write_band = self.nextband
  140. self.nextband += 1
  141. if self.write_band % 1000 == 0:
  142. path = os.path.join(self.path, 'bands', str(self.write_band / 1000))
  143. if not os.path.exists(path):
  144. os.mkdir(path)
  145. assert not os.path.exists(self.band_filename(self.write_band))
  146. self.write_fd = open(self.band_filename(self.write_band), 'ab')
  147. band = self.write_band
  148. offset = self.write_fd.tell()
  149. self.write_fd.write(data)
  150. if offset + len(data) > self.bandlimit:
  151. self.write_band = None
  152. return band, offset, len(data)
  153. def put(self, ns, id, data):
  154. """
  155. """
  156. try:
  157. band, offset, size = self.store_data(data)
  158. self.cursor.execute('INSERT INTO objects (ns, id, band, offset, size) '
  159. 'VALUES(?, ?, ?, ?, ?)',
  160. (Binary(ns), Binary(id), band, offset, size))
  161. except sqlite3.IntegrityError:
  162. raise self.AlreadyExists
  163. def delete(self, ns, id):
  164. """
  165. """
  166. self.cursor.execute('SELECT band FROM objects WHERE ns=? and id=?',
  167. (Binary(ns), Binary(id)))
  168. row = self.cursor.fetchone()
  169. if not row:
  170. raise self.DoesNotExist
  171. self.cursor.execute('DELETE FROM objects WHERE ns=? AND id=?',
  172. (Binary(ns), Binary(id)))
  173. self.to_delete.add(row[0])
  174. def list(self, ns, prefix='', marker=None, max_keys=1000000):
  175. """
  176. """
  177. sql = 'SELECT id FROM objects WHERE ns=:ns'
  178. args = dict(ns=Binary(ns))
  179. if prefix:
  180. args['prefix'] = Binary(prefix)
  181. args['end'] = Binary(prefix + chr(255))
  182. sql += ' AND id >= :prefix AND id < :end'
  183. if marker:
  184. sql += ' AND id >= :marker'
  185. args['marker'] = Binary(marker)
  186. for row in self.cursor.execute(sql + ' LIMIT ' + str(max_keys), args):
  187. yield str(row[0])
  188. class StoreTestCase(unittest.TestCase):
  189. def setUp(self):
  190. self.tmppath = tempfile.mkdtemp()
  191. self.store = Store(os.path.join(self.tmppath, 'store'), create=True)
  192. def tearDown(self):
  193. shutil.rmtree(self.tmppath)
  194. def test1(self):
  195. self.assertEqual(self.store.tid, 0)
  196. self.assertEqual(self.store.state, self.store.OPEN)
  197. self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
  198. self.assertRaises(self.store.AlreadyExists, lambda: self.store.put('SOMENS', 'SOMEID', 'SOMEDATA'))
  199. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  200. self.store.rollback()
  201. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  202. self.assertEqual(self.store.tid, 0)
  203. def test2(self):
  204. self.assertEqual(self.store.tid, 0)
  205. self.assertEqual(self.store.state, self.store.OPEN)
  206. self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
  207. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  208. self.store.commit()
  209. self.assertEqual(self.store.tid, 1)
  210. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  211. self.store.delete('SOMENS', 'SOMEID')
  212. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  213. self.store.rollback()
  214. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  215. self.store.delete('SOMENS', 'SOMEID')
  216. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  217. self.store.commit()
  218. self.assertEqual(self.store.tid, 2)
  219. self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  220. def test_list(self):
  221. self.store.put('SOMENS', 'SOMEID12', 'SOMEDATA')
  222. self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
  223. self.store.put('SOMENS', 'SOMEID1', 'SOMEDATA')
  224. self.store.put('SOMENS', 'SOMEID123', 'SOMEDATA')
  225. self.store.commit()
  226. self.assertEqual(list(self.store.list('SOMENS', max_keys=3)),
  227. ['SOMEID', 'SOMEID1', 'SOMEID12'])
  228. self.assertEqual(list(self.store.list('SOMENS', marker='SOMEID12')),
  229. ['SOMEID12', 'SOMEID123'])
  230. self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', max_keys=2)),
  231. ['SOMEID1', 'SOMEID12'])
  232. self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', marker='SOMEID12')),
  233. ['SOMEID12', 'SOMEID123'])
  234. def suite():
  235. return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
  236. if __name__ == '__main__':
  237. unittest.main()