remote.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import fcntl
  2. import msgpack
  3. import os
  4. import paramiko
  5. import select
  6. import sys
  7. import getpass
  8. from .store import Store
  9. BUFSIZE = 1024 * 1024
  10. class ChannelNotifyer(object):
  11. def __init__(self, channel):
  12. self.channel = channel
  13. self.enabled = 0
  14. def set(self):
  15. if self.enabled:
  16. with self.channel.lock:
  17. self.channel.out_buffer_cv.notifyAll()
  18. def clear(self):
  19. pass
  20. class StoreServer(object):
  21. def __init__(self):
  22. self.store = None
  23. def serve(self):
  24. # Make stdin non-blocking
  25. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  26. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  27. unpacker = msgpack.Unpacker()
  28. while True:
  29. r, w, es = select.select([sys.stdin], [], [], 10)
  30. if r:
  31. data = os.read(sys.stdin.fileno(), BUFSIZE)
  32. if not data:
  33. return
  34. unpacker.feed(data)
  35. for type, msgid, method, args in unpacker:
  36. try:
  37. try:
  38. f = getattr(self, method)
  39. except AttributeError:
  40. f = getattr(self.store, method)
  41. res = f(*args)
  42. except Exception, e:
  43. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  44. else:
  45. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  46. sys.stdout.flush()
  47. if es:
  48. return
  49. def open(self, path, create=False):
  50. if path.startswith('/~'):
  51. path = path[1:]
  52. self.store = Store(os.path.expanduser(path), create)
  53. return self.store.id, self.store.tid
  54. class RemoteStore(object):
  55. class DoesNotExist(Exception):
  56. pass
  57. class AlreadyExists(Exception):
  58. pass
  59. class RPCError(Exception):
  60. def __init__(self, name):
  61. self.name = name
  62. def __init__(self, location, create=False):
  63. self.client = paramiko.SSHClient()
  64. self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  65. params = {'username': location.user or getpass.getuser(),
  66. 'hostname': location.host, 'port': location.port}
  67. while True:
  68. try:
  69. self.client.connect(**params)
  70. break
  71. except (paramiko.PasswordRequiredException,
  72. paramiko.AuthenticationException,
  73. paramiko.SSHException):
  74. if not 'password' in params:
  75. params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
  76. else:
  77. raise
  78. self.unpacker = msgpack.Unpacker()
  79. self.transport = self.client.get_transport()
  80. self.channel = self.transport.open_session()
  81. self.notifier = ChannelNotifyer(self.channel)
  82. self.channel.in_buffer.set_event(self.notifier)
  83. self.channel.in_stderr_buffer.set_event(self.notifier)
  84. self.channel.exec_command('darc serve')
  85. self.callbacks = {}
  86. self.msgid = 0
  87. self.id, self.tid = self.cmd('open', (location.path, create))
  88. def wait(self):
  89. with self.channel.lock:
  90. if (self.channel.out_window_size == 0 and
  91. not self.channel.recv_ready() and
  92. not self.channel.recv_stderr_ready()):
  93. self.channel.out_buffer_cv.wait(10)
  94. def cmd(self, cmd, args, callback=None, callback_data=None):
  95. self.msgid += 1
  96. self.notifier.enabled += 1
  97. odata = msgpack.packb((0, self.msgid, cmd, args))
  98. if callback:
  99. self.callbacks[self.msgid] = callback, callback_data
  100. while True:
  101. if self.channel.closed:
  102. raise Exception('Connection closed')
  103. elif self.channel.recv_stderr_ready():
  104. print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
  105. elif self.channel.recv_ready():
  106. self.unpacker.feed(self.channel.recv(BUFSIZE))
  107. for type, msgid, error, res in self.unpacker:
  108. self.notifier.enabled -= 1
  109. if msgid == self.msgid:
  110. if error:
  111. raise self.RPCError(error)
  112. return res
  113. else:
  114. c, d = self.callbacks.pop(msgid, (None, None))
  115. if c:
  116. c(res, error, d)
  117. elif odata and self.channel.send_ready():
  118. n = self.channel.send(odata)
  119. if n > 0:
  120. odata = odata[n:]
  121. if not odata and callback:
  122. return
  123. else:
  124. self.wait()
  125. def commit(self, *args):
  126. self.cmd('commit', args)
  127. self.tid += 1
  128. def rollback(self, *args):
  129. return self.cmd('rollback', args)
  130. def get(self, ns, id, callback=None, callback_data=None):
  131. try:
  132. return self.cmd('get', (ns, id), callback, callback_data)
  133. except self.RPCError, e:
  134. print e.name
  135. if e.name == 'DoesNotExist':
  136. raise self.DoesNotExist
  137. raise
  138. def put(self, ns, id, data, callback=None, callback_data=None):
  139. try:
  140. return self.cmd('put', (ns, id, data), callback, callback_data)
  141. except self.RPCError, e:
  142. if e.name == 'AlreadyExists':
  143. raise self.AlreadyExists
  144. def delete(self, ns, id, callback=None, callback_data=None):
  145. return self.cmd('delete', (ns, id), callback, callback_data)
  146. def list(self, *args):
  147. return self.cmd('list', args)
  148. def flush_rpc(self):
  149. while True:
  150. if self.channel.closed:
  151. raise Exception('Connection closed')
  152. elif self.channel.recv_stderr_ready():
  153. print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
  154. elif self.channel.recv_ready():
  155. self.unpacker.feed(self.channel.recv(BUFSIZE))
  156. for type, msgid, error, res in self.unpacker:
  157. self.notifier.enabled -= 1
  158. c, d = self.callbacks.pop(msgid, (None, None))
  159. if c:
  160. c(res, error, d)
  161. if msgid == self.msgid:
  162. return
  163. else:
  164. self.wait()