Procházet zdrojové kódy

Add remote upload buffer (--remote-buffer) (#5574)

add remote upload buffer (--remote-buffer)

- added new option --remote-buffer
- allow to_send to grow to selected size
- don't grow if wait is specified
- fill pipe on any command (including 'async_response')
- add new option to docs
- create EfficientBytesQueue to prevent recreation of buffer each time we send something
- add tests for EfficientBytesQueue
axapaxa před 4 roky
rodič
revize
b291b91962

+ 1 - 0
docs/usage/common-options.rst.inc

@@ -14,6 +14,7 @@
 --umask M          set umask to M (local only, default: 0077)
 --remote-path PATH    use PATH as borg executable on the remote (default: "borg")
 --remote-ratelimit RATE    set remote network upload rate limit in kiByte/s (default: 0=unlimited)
+--remote-buffer UPLOAD_BUFFER   set upload buffer size in MiB. (default: 0=no buffer)
 --consider-part-files    treat part files like normal files (e.g. to list/extract them)
 --debug-profile FILE    Write execution profile in Borg format into FILE. For local use a Python-compatible file can be generated by suffixing FILE with ".pyprof".
 --rsh RSH          Use this command to connect to the 'borg serve' process (default: 'ssh')

+ 1 - 0
scripts/shell_completions/zsh/_borg

@@ -650,6 +650,7 @@ __borg_setup_common_options() {
     '--umask=[set umask to M (local only, default: 0077)]:M'
     '--remote-path=[set remote path to executable (default: "borg")]: :_cmdstring'
     '--remote-ratelimit=[set remote network upload rate limit in kiByte/s (default: 0=unlimited)]: : _borg_guard_unsigned_number "RATE"'
+    '--remote-buffer=[set upload buffer size in MiB. (default: 0=no buffer)]: : _borg_guard_unsigned_number "UPLOAD_BUFFER"'
     '--consider-part-files[treat part files like normal files (e.g. to list/extract them)]'
     '--debug-profile=[write execution profile in Borg format into FILE]:FILE:_files'
     '--rsh=[use COMMAND instead of ssh]: :_cmdstring'

+ 2 - 0
src/borg/archiver.py

@@ -2708,6 +2708,8 @@ class Archiver:
                               help='use PATH as borg executable on the remote (default: "borg")')
             add_common_option('--remote-ratelimit', metavar='RATE', dest='remote_ratelimit', type=int,
                               help='set remote network upload rate limit in kiByte/s (default: 0=unlimited)')
+            add_common_option('--remote-buffer', metavar='UPLOAD_BUFFER', dest='remote_buffer', type=int,
+                              help='set upload buffer size in MiB. (default: 0=no buffer)')
             add_common_option('--consider-part-files', dest='consider_part_files', action='store_true',
                               help='treat part files like normal files (e.g. to list/extract them)')
             add_common_option('--debug-profile', metavar='FILE', dest='debug_profile', default=None,

+ 80 - 0
src/borg/helpers/datastruct.py

@@ -49,3 +49,83 @@ class Buffer:
         if size is not None:
             self.resize(size, init)
         return self.buffer
+
+
+class EfficientCollectionQueue:
+    """
+    An efficient FIFO queue that splits received elements into chunks.
+    """
+
+    class SizeUnderflow(Error):
+        """Could not pop_front first {} elements, collection only has {} elements.."""
+
+    def __init__(self, split_size, member_type):
+        """
+        Initializes empty queue.
+        Requires split_size to define maximum chunk size.
+        Requires member_type to be type defining what base collection looks like.
+        """
+        self.buffers = []
+        self.size = 0
+        self.split_size = split_size
+        self.member_type = member_type
+
+    def peek_front(self):
+        """
+        Returns first chunk from queue without removing it.
+        Returned collection will have between 1 and split_size length.
+        Returns empty collection when nothing is queued.
+        """
+        if not self.buffers:
+            return self.member_type()
+        buffer = self.buffers[0]
+        return buffer
+
+    def pop_front(self, size):
+        """
+        Removes first size elements from queue.
+        Throws if requested removal size is larger than whole queue.
+        """
+        if size > self.size:
+            raise EfficientCollectionQueue.SizeUnderflow(size, self.size)
+        while size > 0:
+            buffer = self.buffers[0]
+            to_remove = min(size, len(buffer))
+            buffer = buffer[to_remove:]
+            if buffer:
+                self.buffers[0] = buffer
+            else:
+                del self.buffers[0]
+            size -= to_remove
+            self.size -= to_remove
+
+    def push_back(self, data):
+        """
+        Adds data at end of queue.
+        Takes care to chunk data into split_size sized elements.
+        """
+        if not self.buffers:
+            self.buffers = [self.member_type()]
+        while data:
+            buffer = self.buffers[-1]
+            if len(buffer) >= self.split_size:
+                buffer = self.member_type()
+                self.buffers.append(buffer)
+
+            to_add = min(len(data), self.split_size - len(buffer))
+            buffer += data[:to_add]
+            data = data[to_add:]
+            self.buffers[-1] = buffer
+            self.size += to_add
+
+    def __len__(self):
+        """
+        Current queue length for all elements in all chunks.
+        """
+        return self.size
+
+    def __bool__(self):
+        """
+        Returns true if queue isn't empty.
+        """
+        return self.size != 0

+ 24 - 16
src/borg/remote.py

@@ -32,6 +32,7 @@ from .helpers import msgpack
 from .repository import Repository
 from .version import parse_version, format_version
 from .algorithms.checksums import xxh64
+from .helpers.datastruct import EfficientCollectionQueue
 
 logger = create_logger(__name__)
 
@@ -535,7 +536,7 @@ class RemoteRepository:
         self.msgid = 0
         self.rx_bytes = 0
         self.tx_bytes = 0
-        self.to_send = b''
+        self.to_send = EfficientCollectionQueue(1024 * 1024, bytes)
         self.stderr_received = b''  # incomplete stderr line bytes received (no \n yet)
         self.chunkid_to_msgids = {}
         self.ignore_responses = set()
@@ -543,6 +544,7 @@ class RemoteRepository:
         self.async_responses = {}
         self.shutdown_time = None
         self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
+        self.upload_buffer_size_limit = args.remote_buffer * 1024 * 1024 if args and args.remote_buffer else 0
         self.unpacker = get_limited_unpacker('client')
         self.server_version = parse_version('1.0.8')  # fallback version if server is too old to send version information
         self.p = None
@@ -711,6 +713,19 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
         if not calls and cmd != 'async_responses':
             return
 
+        def send_buffer():
+            if self.to_send:
+                try:
+                    written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front())
+                    self.tx_bytes += written
+                    self.to_send.pop_front(written)
+                except OSError as e:
+                    # io.write might raise EAGAIN even though select indicates
+                    # that the fd should be writable.
+                    # EWOULDBLOCK is added for defensive programming sake.
+                    if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
+                        raise
+
         def pop_preload_msgid(chunkid):
             msgid = self.chunkid_to_msgids[chunkid].pop(0)
             if not self.chunkid_to_msgids[chunkid]:
@@ -760,6 +775,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
 
         calls = list(calls)
         waiting_for = []
+        maximum_to_send = 0 if wait else self.upload_buffer_size_limit
+        send_buffer()  # Try to send data, as some cases (async_response) will never try to send data otherwise.
         while wait or calls:
             if self.shutdown_time and time.monotonic() > self.shutdown_time:
                 # we are shutting this RemoteRepository down already, make sure we do not waste
@@ -850,7 +867,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                     for line in lines:
                         handle_remote_line(line.decode())  # decode late, avoid partial utf-8 sequences
             if w:
-                while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
+                while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
                     if calls:
                         if is_preloaded:
                             assert cmd == 'get', "is_preload is only supported for 'get'"
@@ -864,29 +881,20 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                                 self.msgid += 1
                                 waiting_for.append(self.msgid)
                                 if self.dictFormat:
-                                    self.to_send = msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})
+                                    self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
                                 else:
-                                    self.to_send = msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))
+                                    self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args))))
                     if not self.to_send and self.preload_ids:
                         chunk_id = self.preload_ids.pop(0)
                         args = {'id': chunk_id}
                         self.msgid += 1
                         self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
                         if self.dictFormat:
-                            self.to_send = msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args})
+                            self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args}))
                         else:
-                            self.to_send = msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args)))
+                            self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args))))
 
-                if self.to_send:
-                    try:
-                        written = self.ratelimit.write(self.stdin_fd, self.to_send)
-                        self.tx_bytes += written
-                        self.to_send = self.to_send[written:]
-                    except OSError as e:
-                        # io.write might raise EAGAIN even though select indicates
-                        # that the fd should be writable
-                        if e.errno != errno.EAGAIN:
-                            raise
+                send_buffer()
         self.ignore_responses |= set(waiting_for)  # we lose order here
 
     @api(since=parse_version('1.0.0'),

+ 51 - 0
src/borg/testsuite/efficient_collection_queue.py

@@ -0,0 +1,51 @@
+import pytest
+
+from ..helpers.datastruct import EfficientCollectionQueue
+
+
+class TestEfficientQueue:
+    def test_base_usage(self):
+        queue = EfficientCollectionQueue(100, bytes)
+        assert queue.peek_front() == b''
+        queue.push_back(b'1234')
+        assert queue.peek_front() == b'1234'
+        assert len(queue) == 4
+        assert queue
+        queue.pop_front(4)
+        assert queue.peek_front() == b''
+        assert len(queue) == 0
+        assert not queue
+
+    def test_usage_with_arrays(self):
+        queue = EfficientCollectionQueue(100, list)
+        assert queue.peek_front() == []
+        queue.push_back([1, 2, 3, 4])
+        assert queue.peek_front() == [1, 2, 3, 4]
+        assert len(queue) == 4
+        assert queue
+        queue.pop_front(4)
+        assert queue.peek_front() == []
+        assert len(queue) == 0
+        assert not queue
+
+    def test_chunking(self):
+        queue = EfficientCollectionQueue(2, bytes)
+        queue.push_back(b'1')
+        queue.push_back(b'23')
+        queue.push_back(b'4567')
+        assert len(queue) == 7
+        assert queue.peek_front() == b'12'
+        queue.pop_front(3)
+        assert queue.peek_front() == b'4'
+        queue.pop_front(1)
+        assert queue.peek_front() == b'56'
+        queue.pop_front(2)
+        assert len(queue) == 1
+        assert queue
+        with pytest.raises(EfficientCollectionQueue.SizeUnderflow):
+            queue.pop_front(2)
+        assert queue.peek_front() == b'7'
+        queue.pop_front(1)
+        assert queue.peek_front() == b''
+        assert len(queue) == 0
+        assert not queue