remote.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import fcntl
  2. import msgpack
  3. import os
  4. import paramiko
  5. import select
  6. import sys
  7. import getpass
  8. from .store import Store
  9. BUFSIZE = 1024 * 1024
  10. class ChannelNotifyer(object):
  11. def __init__(self, channel):
  12. self.channel = channel
  13. self.enabled = True
  14. def set(self):
  15. if self.enabled:
  16. with self.channel.lock:
  17. self.channel.out_buffer_cv.notifyAll()
  18. def clear(self):
  19. pass
  20. class StoreServer(object):
  21. def __init__(self):
  22. self.store = None
  23. def serve(self):
  24. # Make stdin non-blocking
  25. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  26. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  27. unpacker = msgpack.Unpacker()
  28. while True:
  29. r, w, es = select.select([sys.stdin], [], [], 10)
  30. if r:
  31. data = os.read(sys.stdin.fileno(), BUFSIZE)
  32. if not data:
  33. return
  34. unpacker.feed(data)
  35. for type, msgid, method, args in unpacker:
  36. try:
  37. try:
  38. f = getattr(self, method)
  39. except AttributeError:
  40. f = getattr(self.store, method)
  41. res = f(*args)
  42. except Exception, e:
  43. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  44. else:
  45. if method not in ('put', 'delete'):
  46. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  47. sys.stdout.flush()
  48. if es:
  49. return
  50. def open(self, path, create=False):
  51. if path.startswith('/~'):
  52. path = path[1:]
  53. self.store = Store(os.path.expanduser(path), create)
  54. return self.store.id, self.store.tid
  55. class RemoteStore(object):
  56. class DoesNotExist(Exception):
  57. pass
  58. class AlreadyExists(Exception):
  59. pass
  60. class RPCError(Exception):
  61. def __init__(self, name):
  62. self.name = name
  63. def __init__(self, location, create=False):
  64. self.client = paramiko.SSHClient()
  65. self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  66. params = {'username': location.user or getpass.getuser(),
  67. 'hostname': location.host, 'port': location.port}
  68. while True:
  69. try:
  70. self.client.connect(**params)
  71. break
  72. except (paramiko.PasswordRequiredException,
  73. paramiko.AuthenticationException,
  74. paramiko.SSHException):
  75. if not 'password' in params:
  76. params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
  77. else:
  78. raise
  79. self.unpacker = msgpack.Unpacker()
  80. self.transport = self.client.get_transport()
  81. self.channel = self.transport.open_session()
  82. self.notifier = ChannelNotifyer(self.channel)
  83. self.channel.in_buffer.set_event(self.notifier)
  84. self.channel.in_stderr_buffer.set_event(self.notifier)
  85. self.channel.exec_command('darc serve')
  86. self.msgid = 0
  87. self.id, self.tid = self._cmd('open', (location.path, create))
  88. def _cmd(self, *args, **kw):
  89. self.notifier.enabled = True
  90. try:
  91. return self._cmd2(*args, **kw)
  92. finally:
  93. self.notifier.enabled = False
  94. def _cmd2(self, cmd, args, defer=False):
  95. self.msgid += 1
  96. odata = msgpack.packb((0, self.msgid, cmd, args))
  97. while True:
  98. if self.channel.closed:
  99. raise Exception('Connection closed')
  100. if odata and self.channel.send_ready():
  101. n = self.channel.send(odata)
  102. if n > 0:
  103. odata = odata[n:]
  104. if not odata and defer:
  105. return
  106. elif self.channel.recv_stderr_ready():
  107. print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
  108. elif self.channel.recv_ready():
  109. self.unpacker.feed(self.channel.recv(BUFSIZE))
  110. for type, msgid, error, res in self.unpacker:
  111. if error:
  112. raise self.RPCError(error)
  113. return res
  114. else:
  115. with self.channel.lock:
  116. self.channel.out_buffer_cv.wait(10)
  117. def commit(self, *args):
  118. self._cmd('commit', args)
  119. self.tid += 1
  120. def rollback(self, *args):
  121. return self._cmd('rollback', args)
  122. def get(self, *args):
  123. try:
  124. return self._cmd('get', args)
  125. except self.RPCError, e:
  126. if e.name == 'DoesNotExist':
  127. raise self.DoesNotExist
  128. raise
  129. def put(self, *args):
  130. try:
  131. return self._cmd('put', args, defer=True)
  132. except self.RPCError, e:
  133. if e.name == 'AlreadyExists':
  134. raise self.AlreadyExists
  135. def delete(self, *args):
  136. return self._cmd('delete', args, defer=True)
  137. def list(self, *args):
  138. return self._cmd('list', args)