remote.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from __future__ import with_statement
  2. import fcntl
  3. import msgpack
  4. import os
  5. import select
  6. from subprocess import Popen, PIPE
  7. import sys
  8. import getpass
  9. from .store import Store
  10. from .lrucache import LRUCache
  11. BUFSIZE = 10 * 1024 * 1024
  12. class StoreServer(object):
  13. def __init__(self):
  14. self.store = None
  15. def serve(self):
  16. # Make stdin non-blocking
  17. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  18. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  19. # Make stdout blocking
  20. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  21. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  22. unpacker = msgpack.Unpacker()
  23. while True:
  24. r, w, es = select.select([sys.stdin], [], [], 10)
  25. if r:
  26. data = os.read(sys.stdin.fileno(), BUFSIZE)
  27. if not data:
  28. return
  29. unpacker.feed(data)
  30. for type, msgid, method, args in unpacker:
  31. try:
  32. try:
  33. f = getattr(self, method)
  34. except AttributeError:
  35. f = getattr(self.store, method)
  36. res = f(*args)
  37. except Exception, e:
  38. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  39. else:
  40. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  41. sys.stdout.flush()
  42. if es:
  43. return
  44. def negotiate(self, versions):
  45. return 1
  46. def open(self, path, create=False):
  47. if path.startswith('/~'):
  48. path = path[1:]
  49. self.store = Store(os.path.expanduser(path), create)
  50. return self.store.id
  51. class RemoteStore(object):
  52. class DoesNotExist(Exception):
  53. pass
  54. class AlreadyExists(Exception):
  55. pass
  56. class RPCError(Exception):
  57. def __init__(self, name):
  58. self.name = name
  59. def __init__(self, location, create=False):
  60. self.cache = LRUCache(200)
  61. self.to_send = ''
  62. self.extra = {}
  63. self.pending_cache = {}
  64. self.unpacker = msgpack.Unpacker()
  65. self.msgid = 0
  66. self.received_msgid = 0
  67. args = ['ssh', '-p', str(location.port), '%s@%s' % (location.user or getpass.getuser(), location.host), 'darc', 'serve']
  68. self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE)
  69. self.stdin_fd = self.p.stdin.fileno()
  70. self.stdout_fd = self.p.stdout.fileno()
  71. self.r_fds = [self.stdout_fd]
  72. self.x_fds = [self.stdin_fd, self.stdout_fd]
  73. version = self.call('negotiate', (1,))
  74. if version != 1:
  75. raise Exception('Server insisted on using unsupported protocol version %d' % version)
  76. self.id = self.call('open', (location.path, create))
  77. def __del__(self):
  78. self.p.stdin.close()
  79. self.p.stdout.close()
  80. self.p.wait()
  81. def _read(self):
  82. data = os.read(self.stdout_fd, BUFSIZE)
  83. if not data:
  84. raise Exception('EOF')
  85. self.unpacker.feed(data)
  86. to_yield = []
  87. for type, msgid, error, res in self.unpacker:
  88. self.received_msgid = msgid
  89. if error:
  90. raise self.RPCError(error)
  91. if msgid in self.pending_cache:
  92. args = self.pending_cache.pop(msgid)
  93. self.cache[args] = msgid, res
  94. else:
  95. print 'unknown response'
  96. for args in self.extra.pop(msgid, []):
  97. to_yield.append(self.cache[args][1])
  98. for res in to_yield:
  99. yield res
  100. def call(self, cmd, args, wait=True):
  101. for res in self.call_multi(cmd, [args], wait=wait):
  102. return res
  103. def gen_request(self, cmd, argsv):
  104. data = []
  105. m = self.received_msgid
  106. for args in argsv:
  107. if not args in self.cache:
  108. self.msgid += 1
  109. msgid = self.msgid
  110. self.pending_cache[msgid] = args
  111. self.cache[args] = msgid, None
  112. data.append(msgpack.packb((1, msgid, cmd, args)))
  113. msgid, resp = self.cache[args]
  114. m = max(m, msgid)
  115. self.extra.setdefault(m, []).append(args)
  116. return ''.join(data)
  117. def gen_cache_requests(self, cmd, peek):
  118. data = []
  119. while True:
  120. try:
  121. args = (peek()[0],)
  122. except StopIteration:
  123. break
  124. if args in self.cache:
  125. continue
  126. self.msgid += 1
  127. msgid = self.msgid
  128. self.pending_cache[msgid] = args
  129. self.cache[args] = msgid, None
  130. data.append(msgpack.packb((1, msgid, cmd, args)))
  131. return ''.join(data)
  132. def call_multi(self, cmd, argsv, wait=True, peek=None):
  133. w_fds = [self.stdin_fd]
  134. left = len(argsv)
  135. data = self.gen_request(cmd, argsv)
  136. self.to_send += data
  137. for args in self.extra.pop(self.received_msgid, []):
  138. left -= 1
  139. yield self.cache[args][1]
  140. while left:
  141. r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
  142. if x:
  143. raise Exception('FD exception occured')
  144. if r:
  145. for res in self._read():
  146. left -= 1
  147. yield res
  148. if w:
  149. if not self.to_send and peek:
  150. self.to_send = self.gen_cache_requests(cmd, peek)
  151. if self.to_send:
  152. n = os.write(self.stdin_fd, self.to_send)
  153. assert n > 0
  154. self.to_send = self.to_send[n:]
  155. else:
  156. w_fds = []
  157. def commit(self, *args):
  158. self.call('commit', args)
  159. def rollback(self, *args):
  160. return self.call('rollback', args)
  161. def get(self, id):
  162. try:
  163. return self.call('get', (id, ))
  164. except self.RPCError, e:
  165. if e.name == 'DoesNotExist':
  166. raise self.DoesNotExist
  167. raise
  168. def get_many(self, ids, peek=None):
  169. return self.call_multi('get', [(id, ) for id in ids], peek=peek)
  170. def put(self, id, data, wait=True):
  171. try:
  172. return self.call('put', (id, data), wait=wait)
  173. except self.RPCError, e:
  174. if e.name == 'AlreadyExists':
  175. raise self.AlreadyExists
  176. def delete(self, id, wait=True):
  177. return self.call('delete', (id, ), wait=wait)