fsstore.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. #!/usr/bin/env python
  2. import os
  3. import fcntl
  4. import tempfile
  5. import shutil
  6. import unittest
  7. import uuid
  8. class Store(object):
  9. """
  10. """
  11. class DoesNotExist(KeyError):
  12. """"""
  13. class AlreadyExists(KeyError):
  14. """"""
  15. IDLE = 'Idle'
  16. OPEN = 'Open'
  17. ACTIVE = 'Active'
  18. VERSION = 'DEDUPESTORE VERSION 1'
  19. def __init__(self, path):
  20. self.tid = '-1'
  21. self.state = Store.IDLE
  22. if not os.path.exists(path):
  23. self.create(path)
  24. self.open(path)
  25. def create(self, path):
  26. os.mkdir(path)
  27. open(os.path.join(path, 'version'), 'wb').write(self.VERSION)
  28. open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4()))
  29. open(os.path.join(path, 'tid'), 'wb').write('0')
  30. os.mkdir(os.path.join(path, 'data'))
  31. def open(self, path):
  32. self.path = path
  33. if not os.path.isdir(path):
  34. raise Exception('%s Does not look like a store')
  35. version_path = os.path.join(path, 'version')
  36. if not os.path.exists(version_path) or open(version_path, 'rb').read() != self.VERSION:
  37. raise Exception('%s Does not look like a store2')
  38. self.uuid = open(os.path.join(path, 'uuid'), 'rb').read()
  39. self.lock_fd = open(os.path.join(path, 'lock'), 'w')
  40. fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
  41. self.tid = int(open(os.path.join(path, 'tid'), 'r').read())
  42. self.recover()
  43. def recover(self):
  44. if os.path.exists(os.path.join(self.path, 'txn-active')):
  45. shutil.rmtree(os.path.join(self.path, 'txn-active'))
  46. if os.path.exists(os.path.join(self.path, 'txn-commit')):
  47. self.apply_txn()
  48. if os.path.exists(os.path.join(self.path, 'txn-applied')):
  49. shutil.rmtree(os.path.join(self.path, 'txn-applied'))
  50. self.state = Store.OPEN
  51. self.txn_delete = []
  52. self.txn_write = []
  53. def close(self):
  54. self.recover()
  55. self.lock_fd.close()
  56. self.state = Store.IDLE
  57. def commit(self):
  58. """
  59. """
  60. if self.state == Store.OPEN:
  61. return
  62. assert self.state == Store.ACTIVE
  63. with open(os.path.join(self.path, 'txn-active', 'delete_index'), 'wb') as fd:
  64. fd.write('\n'.join(self.txn_delete))
  65. with open(os.path.join(self.path, 'txn-active', 'write_index'), 'wb') as fd:
  66. fd.write('\n'.join(self.txn_write))
  67. with open(os.path.join(self.path, 'txn-active', 'tid'), 'wb') as fd:
  68. fd.write(str(self.tid + 1))
  69. os.rename(os.path.join(self.path, 'txn-active'),
  70. os.path.join(self.path, 'txn-commit'))
  71. self.recover()
  72. def apply_txn(self):
  73. assert os.path.isdir(os.path.join(self.path, 'txn-commit'))
  74. tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read())
  75. assert tid == self.tid + 1
  76. delete_list = [line.strip() for line in
  77. open(os.path.join(self.path, 'txn-commit', 'delete_index'), 'rb').readlines()]
  78. for name in delete_list:
  79. path = os.path.join(self.path, 'data', name)
  80. os.unlink(path)
  81. write_list = [line.strip() for line in
  82. open(os.path.join(self.path, 'txn-commit', 'write_index'), 'rb').readlines()]
  83. for name in write_list:
  84. destname = os.path.join(self.path, 'data', name)
  85. if not os.path.exists(os.path.dirname(destname)):
  86. os.makedirs(os.path.dirname(destname))
  87. os.rename(os.path.join(self.path, 'txn-commit', 'write', name), destname)
  88. with open(os.path.join(self.path, 'tid'), 'wb') as fd:
  89. fd.write(str(tid))
  90. os.rename(os.path.join(self.path, 'txn-commit'),
  91. os.path.join(self.path, 'txn-applied'))
  92. shutil.rmtree(os.path.join(self.path, 'txn-applied'))
  93. self.tid = tid
  94. def rollback(self):
  95. """
  96. """
  97. self.recover()
  98. def prepare_txn(self):
  99. if self.state == Store.ACTIVE:
  100. return os.path.join(self.path, 'txn-active')
  101. elif self.state == Store.OPEN:
  102. os.mkdir(os.path.join(self.path, 'txn-active'))
  103. os.mkdir(os.path.join(self.path, 'txn-active', 'write'))
  104. self.state = Store.ACTIVE
  105. def _filename(self, ns, id, base=''):
  106. ns = ns.encode('hex')
  107. id = id.encode('hex')
  108. return os.path.join(base, ns, id[:2], id[2:4], id[4:])
  109. def get(self, ns, id):
  110. """
  111. """
  112. path = self._filename(ns, id)
  113. if path in self.txn_write:
  114. filename = os.path.join(self.path, 'txn-active', 'write', path)
  115. return open(filename, 'rb').read()
  116. if path in self.txn_delete:
  117. raise Store.DoesNotExist('Object %s:%s does not exist' % (ns.encode('hex'), id.encode('hex')))
  118. filename = self._filename(ns, id, os.path.join(self.path, 'data'))
  119. if os.path.exists(filename):
  120. return open(filename, 'rb').read()
  121. else:
  122. raise Store.DoesNotExist('Object %s:%s does not exist' % (ns.encode('hex'), id.encode('hex')))
  123. def put(self, ns, id, data):
  124. """
  125. """
  126. self.prepare_txn()
  127. path = self._filename(ns, id)
  128. filename = self._filename(ns, id, os.path.join(self.path, 'data'))
  129. if (path in self.txn_write or
  130. (path not in self.txn_delete and os.path.exists(filename))):
  131. raise Store.AlreadyExists('Object already exists: %s:%s' % (ns.encode('hex'), id.encode('hex')))
  132. if path in self.txn_delete:
  133. self.txn_delete.remove(path)
  134. if path not in self.txn_write:
  135. self.txn_write.append(path)
  136. filename = self._filename(ns, id, os.path.join(self.path, 'txn-active', 'write'))
  137. if not os.path.exists(os.path.dirname(filename)):
  138. os.makedirs(os.path.dirname(filename))
  139. with open(filename, 'wb') as fd:
  140. fd.write(data)
  141. def delete(self, ns, id):
  142. """
  143. """
  144. self.prepare_txn()
  145. path = self._filename(ns, id)
  146. if path in self.txn_write:
  147. filename = self._filename(ns, id, os.path.join(self.path, 'txn-active', 'write'))
  148. self.txn_write.remove(path)
  149. os.unlink(filename)
  150. else:
  151. filename = os.path.join(self.path, 'data', path)
  152. if os.path.exists(filename):
  153. self.txn_delete.append(path)
  154. else:
  155. raise Store.DoesNotExist('Object does not exist: %s' % hash.encode('hex'))
  156. def list(self, ns, prefix='', marker=None, max_keys=1000000):
  157. for x in self._walker(os.path.join(self.path, 'data', ns.encode('hex')),
  158. prefix, marker, '', max_keys):
  159. yield x
  160. def _walker(self, path, prefix, marker, base, max_keys):
  161. n = 0
  162. for name in sorted(os.listdir(path)):
  163. if n >= max_keys:
  164. return
  165. id = name.decode('hex')
  166. if os.path.isdir(os.path.join(path, name)):
  167. if prefix and not id.startswith(prefix[:len(id)]):
  168. continue
  169. for x in self.foo(os.path.join(path, name),
  170. prefix[len(id):], marker,
  171. base + id, max_keys - n):
  172. yield x
  173. n += 1
  174. else:
  175. if prefix and not id.startswith(prefix):
  176. continue
  177. if not marker or base + id >= marker:
  178. yield base + id
  179. n += 1
  180. class StoreTestCase(unittest.TestCase):
  181. def setUp(self):
  182. self.tmppath = tempfile.mkdtemp()
  183. self.store = Store(os.path.join(self.tmppath, 'store'))
  184. def tearDown(self):
  185. shutil.rmtree(self.tmppath)
  186. def test1(self):
  187. self.assertEqual(self.store.tid, 0)
  188. self.assertEqual(self.store.state, Store.OPEN)
  189. self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
  190. self.assertRaises(Store.AlreadyExists, lambda: self.store.put('SOMENS', 'SOMEID', 'SOMEDATA'))
  191. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  192. self.store.rollback()
  193. self.assertRaises(Store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  194. self.assertEqual(self.store.tid, 0)
  195. def test2(self):
  196. self.assertEqual(self.store.tid, 0)
  197. self.assertEqual(self.store.state, Store.OPEN)
  198. self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
  199. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  200. self.store.commit()
  201. self.assertEqual(self.store.tid, 1)
  202. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  203. self.store.delete('SOMENS', 'SOMEID')
  204. self.assertRaises(Store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  205. self.store.rollback()
  206. self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
  207. self.store.delete('SOMENS', 'SOMEID')
  208. self.assertRaises(Store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  209. self.store.commit()
  210. self.assertEqual(self.store.tid, 2)
  211. self.assertRaises(Store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
  212. def test_list(self):
  213. self.store.put('SOMENS', 'SOMEID12', 'SOMEDATA')
  214. self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
  215. self.store.put('SOMENS', 'SOMEID1', 'SOMEDATA')
  216. self.store.put('SOMENS', 'SOMEID123', 'SOMEDATA')
  217. self.store.commit()
  218. self.assertEqual(list(self.store.list('SOMENS', max_keys=3)),
  219. ['SOMEID', 'SOMEID1', 'SOMEID12'])
  220. self.assertEqual(list(self.store.list('SOMENS', marker='SOMEID12')),
  221. ['SOMEID12', 'SOMEID123'])
  222. self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', max_keys=2)),
  223. ['SOMEID1', 'SOMEID12'])
  224. self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', marker='SOMEID12')),
  225. ['SOMEID12', 'SOMEID123'])
  226. if __name__ == '__main__':
  227. unittest.main()