浏览代码

Archiver,RemoteRepository: Add --remote-ratelimit

The --remote-ratelimit option adds a very simple rate limit for the
sending data to the remote.

Currently implemented by sleeping if the transmission speed is greater
than the limit.
Martin Hostettler 9 年之前
父节点
当前提交
84b3295a0d
共有 3 个文件被更改,包括 98 次插入1 次删除
  1. 2 0
      src/borg/archiver.py
  2. 36 1
      src/borg/remote.py
  3. 60 0
      src/borg/testsuite/remote.py

+ 2 - 0
src/borg/archiver.py

@@ -1322,6 +1322,8 @@ class Archiver:
                                   help='set umask to M (local and remote, default: %(default)04o)')
         common_group.add_argument('--remote-path', dest='remote_path', metavar='PATH',
                                   help='set remote path to executable (default: "borg")')
+        common_group.add_argument('--remote-ratelimit', dest='remote_ratelimit', type=int, metavar='rate',
+                                  help='set remote network upload rate limit in kiByte/s (default: 0=unlimited)')
         common_group.add_argument('--consider-part-files', dest='consider_part_files',
                                   action='store_true', default=False,
                                   help='treat part files like normal files (e.g. to list/extract them)')

+ 36 - 1
src/borg/remote.py

@@ -6,6 +6,7 @@ import select
 import shlex
 import sys
 import tempfile
+import time
 import traceback
 from subprocess import Popen, PIPE
 
@@ -25,6 +26,8 @@ BUFSIZE = 10 * 1024 * 1024
 
 MAX_INFLIGHT = 100
 
+RATELIMIT_PERIOD = 0.1
+
 
 class ConnectionClosed(Error):
     """Connection closed by remote host"""
@@ -166,6 +169,36 @@ class RepositoryServer:  # pragma: no cover
         return self.repository.id
 
 
+class SleepingBandwidthLimiter:
+    def __init__(self, limit):
+        if limit:
+            self.ratelimit = int(limit * RATELIMIT_PERIOD)
+            self.ratelimit_last = time.monotonic()
+            self.ratelimit_quota = self.ratelimit
+        else:
+            self.ratelimit = None
+
+    def write(self, fd, to_send):
+        if self.ratelimit:
+            now = time.monotonic()
+            if self.ratelimit_last + RATELIMIT_PERIOD <= now:
+                self.ratelimit_quota += self.ratelimit
+                if self.ratelimit_quota > 2 * self.ratelimit:
+                    self.ratelimit_quota = 2 * self.ratelimit
+                self.ratelimit_last = now
+            if self.ratelimit_quota == 0:
+                tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
+                time.sleep(tosleep)
+                self.ratelimit_quota += self.ratelimit
+                self.ratelimit_last = time.monotonic()
+            if len(to_send) > self.ratelimit_quota:
+                to_send = to_send[:self.ratelimit_quota]
+        written = os.write(fd, to_send)
+        if self.ratelimit:
+            self.ratelimit_quota -= written
+        return written
+
+
 class RemoteRepository:
     extra_test_args = []
 
@@ -185,6 +218,8 @@ class RemoteRepository:
         self.cache = {}
         self.ignore_responses = set()
         self.responses = {}
+        self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
+
         self.unpacker = msgpack.Unpacker(use_list=False)
         self.p = None
         testing = location.host == '__testsuite__'
@@ -406,7 +441,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
 
                 if self.to_send:
                     try:
-                        self.to_send = self.to_send[os.write(self.stdin_fd, self.to_send):]
+                        self.to_send = self.to_send[self.ratelimit.write(self.stdin_fd, self.to_send):]
                     except OSError as e:
                         # io.write might raise EAGAIN even though select indicates
                         # that the fd should be writable

+ 60 - 0
src/borg/testsuite/remote.py

@@ -0,0 +1,60 @@
+import os
+import time
+
+import pytest
+
+from ..remote import SleepingBandwidthLimiter
+
+
+class TestSleepingBandwidthLimiter:
+    def expect_write(self, fd, data):
+        self.expected_fd = fd
+        self.expected_data = data
+
+    def check_write(self, fd, data):
+        assert fd == self.expected_fd
+        assert data == self.expected_data
+        return len(data)
+
+    def test_write_unlimited(self, monkeypatch):
+        monkeypatch.setattr(os, "write", self.check_write)
+
+        it = SleepingBandwidthLimiter(0)
+        self.expect_write(5, b"test")
+        it.write(5, b"test")
+
+    def test_write(self, monkeypatch):
+        monkeypatch.setattr(os, "write", self.check_write)
+        monkeypatch.setattr(time, "monotonic", lambda: now)
+        monkeypatch.setattr(time, "sleep", lambda x: None)
+
+        now = 100
+
+        it = SleepingBandwidthLimiter(100)
+
+        # all fits
+        self.expect_write(5, b"test")
+        it.write(5, b"test")
+
+        # only partial write
+        self.expect_write(5, b"123456")
+        it.write(5, b"1234567890")
+
+        # sleeps
+        self.expect_write(5, b"123456")
+        it.write(5, b"123456")
+
+        # long time interval between writes
+        now += 10
+        self.expect_write(5, b"1")
+        it.write(5, b"1")
+
+        # long time interval between writes, filling up quota
+        now += 10
+        self.expect_write(5, b"1")
+        it.write(5, b"1")
+
+        # long time interval between writes, filling up quota to clip to maximum
+        now += 10
+        self.expect_write(5, b"1")
+        it.write(5, b"1")