remote.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import fcntl
  2. import msgpack
  3. import os
  4. import paramiko
  5. import select
  6. import sys
  7. import getpass
  8. from .store import Store
  9. BUFSIZE = 1024 * 1024
  10. class StoreServer(object):
  11. def __init__(self):
  12. self.store = None
  13. def serve(self):
  14. # Make stdin non-blocking
  15. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  16. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  17. unpacker = msgpack.Unpacker()
  18. while True:
  19. r, w, es = select.select([sys.stdin], [], [], 10)
  20. if r:
  21. data = os.read(sys.stdin.fileno(), BUFSIZE)
  22. if not data:
  23. return
  24. unpacker.feed(data)
  25. for type, msgid, method, args in unpacker:
  26. try:
  27. try:
  28. f = getattr(self, method)
  29. except AttributeError:
  30. f = getattr(self.store, method)
  31. res = f(*args)
  32. except Exception, e:
  33. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  34. else:
  35. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  36. sys.stdout.flush()
  37. if es:
  38. return
  39. def open(self, path, create=False):
  40. if path.startswith('/~'):
  41. path = path[1:]
  42. self.store = Store(os.path.expanduser(path), create)
  43. return self.store.id, self.store.tid
  44. class RemoteStore(object):
  45. class DoesNotExist(Exception):
  46. pass
  47. class AlreadyExists(Exception):
  48. pass
  49. class RPCError(Exception):
  50. def __init__(self, name):
  51. self.name = name
  52. def __init__(self, location, create=False):
  53. self.client = paramiko.SSHClient()
  54. self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  55. params = {'username': location.user or getpass.getuser(),
  56. 'hostname': location.host, 'port': location.port}
  57. while True:
  58. try:
  59. self.client.connect(**params)
  60. break
  61. except paramiko.PasswordRequiredException:
  62. params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
  63. self.unpacker = msgpack.Unpacker()
  64. self.transport = self.client.get_transport()
  65. self.channel = self.transport.open_session()
  66. self.channel.exec_command('darc serve')
  67. self.msgid = 0
  68. self.id, self.tid = self._cmd('open', (location.path, create))
  69. def _cmd(self, cmd, args):
  70. self.msgid += 1
  71. self.channel.sendall(msgpack.packb((0, self.msgid, cmd, args)))
  72. while True:
  73. r, w, e = select.select([self.channel], [], [self.channel], 10)
  74. if r:
  75. if self.channel.closed:
  76. raise Exception('Connection closed')
  77. if self.channel.recv_stderr_ready():
  78. print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
  79. elif self.channel.recv_ready():
  80. self.unpacker.feed(self.channel.recv(BUFSIZE))
  81. for type, msgid, error, res in self.unpacker:
  82. if error:
  83. raise self.RPCError(error)
  84. return res
  85. if e:
  86. raise Exception('ssh channel error')
  87. def commit(self, *args):
  88. self._cmd('commit', args)
  89. self.tid += 1
  90. def rollback(self, *args):
  91. return self._cmd('rollback', args)
  92. def get(self, *args):
  93. try:
  94. return self._cmd('get', args)
  95. except self.RPCError, e:
  96. if e.name == 'DoesNotExist':
  97. raise self.DoesNotExist
  98. raise
  99. def put(self, *args):
  100. try:
  101. return self._cmd('put', args)
  102. except self.RPCError, e:
  103. if e.name == 'AlreadyExists':
  104. raise self.AlreadyExists
  105. def delete(self, *args):
  106. return self._cmd('delete', args)
  107. def list(self, *args):
  108. return self._cmd('list', args)