2
0
Эх сурвалжийг харах

Experimental remote store support

Jonas Borgström 14 жил өмнө
parent
commit
20935f809e
5 өөрчлөгдсөн 147 нэмэгдсэн , 9 устгасан
  1. 18 5
      darc/archiver.py
  2. 0 1
      darc/cache.py
  3. 127 0
      darc/remote.py
  4. 1 2
      darc/store.py
  5. 1 1
      setup.py

+ 18 - 5
darc/archiver.py

@@ -10,15 +10,18 @@ from .cache import Cache
 from .keychain import Keychain
 from .helpers import location_validator, format_file_size, format_time,\
     format_file_mode, walk_path, IncludePattern, ExcludePattern, exclude_path
-
+from .remote import StoreServer, RemoteStore
 
 class Archiver(object):
 
     def __init__(self):
         self.exit_code = 0
 
-    def open_store(self, location):
-        return Store(location.path)
+    def open_store(self, location, create=False):
+        if location.host:
+            return RemoteStore(location, create=create)
+        else:
+            return Store(location.path, create=create)
 
     def print_error(self, msg, *args):
         msg = args and msg % args or msg
@@ -38,9 +41,16 @@ class Archiver(object):
                 print msg,
 
     def do_init(self, args):
-        Store(args.store.path, create=True)
+        self.open_store(args.store, create=True)
         return self.exit_code
 
+    def do_serve(self, args):
+        try:
+            return StoreServer().serve()
+        except Exception, e:
+            self.print_error('eek', repr(e))
+            return 1
+
     def do_create(self, args):
         store = self.open_store(args.archive)
         keychain = Keychain(args.keychain)
@@ -143,7 +153,7 @@ class Archiver(object):
         archive.get_items()
         for item in archive.items:
             if stat.S_ISREG(item['mode']) and not 'source' in item:
-                self.print_verbose('%s ...', item['path'], newline=False)
+                self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False)
                 if archive.verify_file(item):
                     self.print_verbose('OK')
                 else:
@@ -207,6 +217,9 @@ class Archiver(object):
                                type=location_validator(archive=False),
                                help='Store to initialize')
 
+        subparser = subparsers.add_parser('serve')
+        subparser.set_defaults(func=self.do_serve)
+
         subparser = subparsers.add_parser('create')
         subparser.set_defaults(func=self.do_create)
         subparser.add_argument('-i', '--include', dest='patterns',

+ 0 - 1
darc/cache.py

@@ -60,7 +60,6 @@ class Cache(object):
                 yield key, (value[0] + 1,) + value[1:]
 
     def save(self):
-        assert self.store.state == self.store.OPEN
         cache = {'version': 1,
                 'tid': self.store.tid,
                 'chunk_counts': self.chunk_counts,

+ 127 - 0
darc/remote.py

@@ -0,0 +1,127 @@
+import fcntl
+import msgpack
+import os
+import paramiko
+import select
+import sys
+import getpass
+
+from .store import Store
+
+
+BUFSIZE = 1024 * 1024
+
+
+class StoreServer(object):
+
+    def __init__(self):
+        self.store = None
+
+    def serve(self):
+        # Make stdin non-blocking
+        fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
+        fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
+        unpacker = msgpack.Unpacker()
+        while True:
+            r, w, es = select.select([sys.stdin], [], [], 10)
+            if r:
+                data = os.read(sys.stdin.fileno(), BUFSIZE)
+                if not data:
+                    return
+                unpacker.feed(data)
+                for type, msgid, method, args in unpacker:
+                    try:
+                        if method == 'open':
+                            self.store = Store(*args)
+                            res = self.store.id, self.store.tid
+                        else:
+                            res = getattr(self.store, method)(*args)
+                    except Exception, e:
+                        sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
+                    else:
+                        sys.stdout.write(msgpack.packb((1, msgid, None, res)))
+                    sys.stdout.flush()
+            if es:
+                return
+
+
+class RemoteStore(object):
+
+    class DoesNotExist(Exception):
+        pass
+
+    class AlreadyExists(Exception):
+        pass
+
+    class RPCError(Exception):
+
+        def __init__(self, name):
+            self.name = name
+
+
+    def __init__(self, location, create=False):
+        self.client = paramiko.SSHClient()
+        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        params = {'username': location.user or getpass.getuser(),
+                  'hostname': location.host}
+        while True:
+            try:
+                self.client.connect(**params)
+                break
+            except paramiko.PasswordRequiredException:
+                params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
+
+        self.unpacker = msgpack.Unpacker()
+        self.transport = self.client.get_transport()
+        self.channel = self.transport.open_session()
+        self.channel.exec_command('darc serve')
+        self.msgid = 0
+        self.id, self.tid = self._cmd('open', (location.path, create))
+
+    def _cmd(self, cmd, args):
+        self.msgid += 1
+        self.channel.sendall(msgpack.packb((0, self.msgid, cmd, args)))
+        while True:
+            r, w, e = select.select([self.channel], [], [self.channel], 10)
+            if r:
+                if self.channel.recv_stderr_ready():
+                    raise Exception(self.channel.recv_stderr(BUFSIZE))
+                elif self.channel.recv_ready():
+                    self.unpacker.feed(self.channel.recv(BUFSIZE))
+                    for type, msgid, error, res in self.unpacker:
+                        if error:
+                            raise self.RPCError(error)
+                        return res
+                else:
+                    raise Exception('Read event but no data?!?')
+            if e:
+                raise Exception('ssh channel error')
+
+    def commit(self, *args):
+        self._cmd('commit', args)
+        self.tid += 1
+
+    def rollback(self, *args):
+        return self._cmd('rollback', args)
+
+    def get(self, *args):
+        try:
+            return self._cmd('get', args)
+        except self.RPCError, e:
+            if e.name == 'DoesNotExist':
+                raise self.DoesNotExist
+            raise
+
+    def put(self, *args):
+        try:
+            return self._cmd('put', args)
+        except self.RPCError, e:
+            if e.name == 'AlreadyExists':
+                raise self.AlreadyExists
+
+    def delete(self, *args):
+        return self._cmd('delete', args)
+
+    def list(self, *args):
+        return self._cmd('list', args)
+

+ 1 - 2
darc/store.py

@@ -206,8 +206,7 @@ class Store(object):
         if marker:
             sql += ' AND id >= :marker'
             args['marker'] = Binary(marker)
-        for row in self.cursor.execute(sql + ' LIMIT ' + str(max_keys), args):
-            yield str(row[0])
+        return list(str(row[0]) for row in self.cursor.execute(sql + ' LIMIT ' + str(max_keys), args))
 
 
 class StoreTestCase(unittest.TestCase):

+ 1 - 1
setup.py

@@ -3,7 +3,7 @@
 import sys
 from setuptools import setup, Extension
 
-dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr']
+dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr', 'paramiko']
 if sys.version_info < (2, 7):
     dependencies.append('argparse')