remote.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import fcntl
  2. import msgpack
  3. import os
  4. import paramiko
  5. import select
  6. import sys
  7. import getpass
  8. from .store import Store
  9. BUFSIZE = 1024 * 1024
  10. class StoreServer(object):
  11. def __init__(self):
  12. self.store = None
  13. def serve(self):
  14. # Make stdin non-blocking
  15. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  16. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  17. unpacker = msgpack.Unpacker()
  18. while True:
  19. r, w, es = select.select([sys.stdin], [], [], 10)
  20. if r:
  21. data = os.read(sys.stdin.fileno(), BUFSIZE)
  22. if not data:
  23. return
  24. unpacker.feed(data)
  25. for type, msgid, method, args in unpacker:
  26. try:
  27. try:
  28. f = getattr(self, method)
  29. except AttributeError:
  30. f = getattr(self.store, method)
  31. res = f(*args)
  32. except Exception, e:
  33. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  34. else:
  35. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  36. sys.stdout.flush()
  37. if es:
  38. return
  39. def open(self, path, create=False):
  40. if path.startswith('/~'):
  41. path = path[1:]
  42. self.store = Store(os.path.expanduser(path), create)
  43. return self.store.id, self.store.tid
  44. class RemoteStore(object):
  45. class DoesNotExist(Exception):
  46. pass
  47. class AlreadyExists(Exception):
  48. pass
  49. class RPCError(Exception):
  50. def __init__(self, name):
  51. self.name = name
  52. def __init__(self, location, create=False):
  53. self.client = paramiko.SSHClient()
  54. self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  55. params = {'username': location.user or getpass.getuser(),
  56. 'hostname': location.host, 'port': location.port}
  57. while True:
  58. try:
  59. self.client.connect(**params)
  60. break
  61. except (paramiko.PasswordRequiredException,
  62. paramiko.AuthenticationException,
  63. paramiko.SSHException):
  64. if not 'password' in params:
  65. params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
  66. else:
  67. raise
  68. self.unpacker = msgpack.Unpacker()
  69. self.transport = self.client.get_transport()
  70. self.channel = self.transport.open_session()
  71. self.channel.exec_command('darc serve')
  72. self.msgid = 0
  73. self.id, self.tid = self._cmd('open', (location.path, create))
  74. def _cmd(self, cmd, args):
  75. self.msgid += 1
  76. self.channel.sendall(msgpack.packb((0, self.msgid, cmd, args)))
  77. while True:
  78. r, w, e = select.select([self.channel], [], [self.channel], 10)
  79. if r:
  80. if self.channel.closed:
  81. raise Exception('Connection closed')
  82. if self.channel.recv_stderr_ready():
  83. print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
  84. elif self.channel.recv_ready():
  85. self.unpacker.feed(self.channel.recv(BUFSIZE))
  86. for type, msgid, error, res in self.unpacker:
  87. if error:
  88. raise self.RPCError(error)
  89. return res
  90. if e:
  91. raise Exception('ssh channel error')
  92. def commit(self, *args):
  93. self._cmd('commit', args)
  94. self.tid += 1
  95. def rollback(self, *args):
  96. return self._cmd('rollback', args)
  97. def get(self, *args):
  98. try:
  99. return self._cmd('get', args)
  100. except self.RPCError, e:
  101. if e.name == 'DoesNotExist':
  102. raise self.DoesNotExist
  103. raise
  104. def put(self, *args):
  105. try:
  106. return self._cmd('put', args)
  107. except self.RPCError, e:
  108. if e.name == 'AlreadyExists':
  109. raise self.AlreadyExists
  110. def delete(self, *args):
  111. return self._cmd('delete', args)
  112. def list(self, *args):
  113. return self._cmd('list', args)