remote.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import with_statement
  2. import fcntl
  3. import msgpack
  4. import os
  5. import select
  6. from subprocess import Popen, PIPE
  7. import sys
  8. import getpass
  9. from .store import Store
  10. BUFSIZE = 10 * 1024 * 1024
  11. class StoreServer(object):
  12. def __init__(self):
  13. self.store = None
  14. def serve(self):
  15. # Make stdin non-blocking
  16. fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
  17. fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
  18. # Make stdout blocking
  19. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  20. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  21. unpacker = msgpack.Unpacker()
  22. while True:
  23. r, w, es = select.select([sys.stdin], [], [], 10)
  24. if r:
  25. data = os.read(sys.stdin.fileno(), BUFSIZE)
  26. if not data:
  27. return
  28. unpacker.feed(data)
  29. for type, msgid, method, args in unpacker:
  30. try:
  31. try:
  32. f = getattr(self, method)
  33. except AttributeError:
  34. f = getattr(self.store, method)
  35. res = f(*args)
  36. except Exception, e:
  37. sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
  38. else:
  39. sys.stdout.write(msgpack.packb((1, msgid, None, res)))
  40. sys.stdout.flush()
  41. if es:
  42. return
  43. def negotiate(self, versions):
  44. return 1
  45. def open(self, path, create=False):
  46. if path.startswith('/~'):
  47. path = path[1:]
  48. self.store = Store(os.path.expanduser(path), create)
  49. return self.store.id
  50. class RemoteStore(object):
  51. class DoesNotExist(Exception):
  52. pass
  53. class AlreadyExists(Exception):
  54. pass
  55. class RPCError(Exception):
  56. def __init__(self, name):
  57. self.name = name
  58. def __init__(self, location, create=False):
  59. self.unpacker = msgpack.Unpacker()
  60. self.msgid = 0
  61. args = ['ssh', '-p', str(location.port), '%s@%s' % (location.user or getpass.getuser(), location.host), 'darc', 'serve']
  62. self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE)
  63. self.stdout_fd = self.p.stdout.fileno()
  64. version = self.call('negotiate', (1,))
  65. if version != 1:
  66. raise Exception('Server insisted on using unsupported protocol version %d' % version)
  67. self.id = self.call('open', (location.path, create))
  68. def __del__(self):
  69. self.p.stdin.close()
  70. self.p.stdout.close()
  71. self.p.wait()
  72. def _read(self, msgids):
  73. data = os.read(self.stdout_fd, BUFSIZE)
  74. self.unpacker.feed(data)
  75. for type, msgid, error, res in self.unpacker:
  76. if error:
  77. raise self.RPCError(error)
  78. if msgid in msgids:
  79. msgids.remove(msgid)
  80. yield res
  81. def call(self, cmd, args, wait=True):
  82. for res in self.call_multi(cmd, [args], wait=wait):
  83. return res
  84. def call_multi(self, cmd, argsv, wait=True):
  85. msgids = set()
  86. for args in argsv:
  87. if select.select([self.stdout_fd], [], [], 0)[0]:
  88. for res in self._read(msgids):
  89. yield res
  90. self.msgid += 1
  91. msgid = self.msgid
  92. msgids.add(msgid)
  93. self.p.stdin.write(msgpack.packb((1, msgid, cmd, args)))
  94. while msgids and wait:
  95. for res in self._read(msgids):
  96. yield res
  97. def commit(self, *args):
  98. self.call('commit', args)
  99. def rollback(self, *args):
  100. return self.call('rollback', args)
  101. def get(self, id):
  102. try:
  103. return self.call('get', (id, ))
  104. except self.RPCError, e:
  105. if e.name == 'DoesNotExist':
  106. raise self.DoesNotExist
  107. raise
  108. def get_many(self, ids):
  109. return self.call_multi('get', [(id, ) for id in ids])
  110. def put(self, id, data, wait=True):
  111. try:
  112. return self.call('put', (id, data), wait=wait)
  113. except self.RPCError, e:
  114. if e.name == 'AlreadyExists':
  115. raise self.AlreadyExists
  116. def delete(self, id, wait=True):
  117. return self.call('delete', (id, ), wait=wait)