remote.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from __future__ import with_statement
  2. import fcntl
  3. import msgpack
  4. import os
  5. import select
  6. from subprocess import Popen, PIPE
  7. import sys
  8. import getpass
  9. from .store import Store
  10. from .lrucache import LRUCache
  11. BUFSIZE = 10 * 1024 * 1024
  12. class StoreServer(object):
  13. def __init__(self):
  14. self.store = None
  15. def serve(self):
  16. # Make stdin non-blocking
  17. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  18. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  19. # Make stdout blocking
  20. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  21. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  22. unpacker = msgpack.Unpacker()
  23. while True:
  24. r, w, es = select.select([sys.stdin], [], [], 10)
  25. if r:
  26. data = os.read(sys.stdin.fileno(), BUFSIZE)
  27. if not data:
  28. return
  29. unpacker.feed(data)
  30. for type, msgid, method, args in unpacker:
  31. try:
  32. try:
  33. f = getattr(self, method)
  34. except AttributeError:
  35. f = getattr(self.store, method)
  36. res = f(*args)
  37. except Exception, e:
  38. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  39. else:
  40. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  41. sys.stdout.flush()
  42. if es:
  43. return
  44. def negotiate(self, versions):
  45. return 1
  46. def open(self, path, create=False):
  47. if path.startswith('/~'):
  48. path = path[1:]
  49. self.store = Store(os.path.expanduser(path), create)
  50. return self.store.id
  51. class RemoteStore(object):
  52. class DoesNotExist(Exception):
  53. pass
  54. class AlreadyExists(Exception):
  55. pass
  56. class RPCError(Exception):
  57. def __init__(self, name):
  58. self.name = name
  59. def __init__(self, location, create=False):
  60. self.cache = LRUCache(200)
  61. self.to_send = ''
  62. self.extra = {}
  63. self.pending = {}
  64. self.unpacker = msgpack.Unpacker()
  65. self.msgid = 0
  66. self.received_msgid = 0
  67. args = ['ssh', '-p', str(location.port), '%s@%s' % (location.user or getpass.getuser(), location.host), 'darc', 'serve']
  68. self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE)
  69. self.stdin_fd = self.p.stdin.fileno()
  70. self.stdout_fd = self.p.stdout.fileno()
  71. self.r_fds = [self.stdout_fd]
  72. self.x_fds = [self.stdin_fd, self.stdout_fd]
  73. version = self.call('negotiate', (1,))
  74. if version != 1:
  75. raise Exception('Server insisted on using unsupported protocol version %d' % version)
  76. self.id = self.call('open', (location.path, create))
  77. def __del__(self):
  78. self.p.stdin.close()
  79. self.p.stdout.close()
  80. self.p.wait()
  81. def _read(self):
  82. data = os.read(self.stdout_fd, BUFSIZE)
  83. if not data:
  84. raise Exception('EOF')
  85. self.unpacker.feed(data)
  86. to_yield = []
  87. for type, msgid, error, res in self.unpacker:
  88. self.received_msgid = msgid
  89. if error:
  90. raise self.RPCError(error)
  91. args = self.pending.pop(msgid)
  92. self.cache[args] = msgid, res
  93. for args, resp in self.extra.pop(msgid, []):
  94. to_yield.append(resp or self.cache[args][1])
  95. for res in to_yield:
  96. yield res
  97. def call(self, cmd, args, wait=True):
  98. for res in self.call_multi(cmd, [args], wait=wait):
  99. return res
  100. def gen_request(self, cmd, argsv):
  101. data = []
  102. m = self.received_msgid
  103. for args in argsv:
  104. if not args in self.cache:
  105. self.msgid += 1
  106. msgid = self.msgid
  107. self.pending[msgid] = args
  108. self.cache[args] = msgid, None
  109. data.append(msgpack.packb((1, msgid, cmd, args)))
  110. msgid, resp = self.cache[args]
  111. m = max(m, msgid)
  112. self.extra.setdefault(m, []).append((args, resp))
  113. return ''.join(data)
  114. def gen_cache_requests(self, cmd, peek):
  115. data = []
  116. while True:
  117. try:
  118. args = (peek()[0],)
  119. except StopIteration:
  120. break
  121. if args in self.cache:
  122. continue
  123. self.msgid += 1
  124. msgid = self.msgid
  125. self.pending[msgid] = args
  126. self.cache[args] = msgid, None
  127. data.append(msgpack.packb((1, msgid, cmd, args)))
  128. return ''.join(data)
  129. def call_multi(self, cmd, argsv, wait=True, peek=None):
  130. w_fds = [self.stdin_fd]
  131. left = len(argsv)
  132. data = self.gen_request(cmd, argsv)
  133. self.to_send += data
  134. for args, resp in self.extra.pop(self.received_msgid, []):
  135. left -= 1
  136. yield resp or self.cache[args][1]
  137. while left:
  138. r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
  139. if x:
  140. raise Exception('FD exception occured')
  141. if r:
  142. for res in self._read():
  143. left -= 1
  144. yield res
  145. if w:
  146. if not self.to_send and peek:
  147. self.to_send = self.gen_cache_requests(cmd, peek)
  148. if self.to_send:
  149. n = os.write(self.stdin_fd, self.to_send)
  150. assert n > 0
  151. self.to_send = self.to_send[n:]
  152. else:
  153. w_fds = []
  154. def commit(self, *args):
  155. self.call('commit', args)
  156. def rollback(self, *args):
  157. return self.call('rollback', args)
  158. def get(self, id):
  159. try:
  160. return self.call('get', (id, ))
  161. except self.RPCError, e:
  162. if e.name == 'DoesNotExist':
  163. raise self.DoesNotExist
  164. raise
  165. def get_many(self, ids, peek=None):
  166. return self.call_multi('get', [(id, ) for id in ids], peek=peek)
  167. def put(self, id, data, wait=True):
  168. try:
  169. return self.call('put', (id, data), wait=wait)
  170. except self.RPCError, e:
  171. if e.name == 'AlreadyExists':
  172. raise self.AlreadyExists
  173. def delete(self, id, wait=True):
  174. return self.call('delete', (id, ), wait=wait)