浏览代码

Added support for grafana loki (#743).

Reviewed-on: https://projects.torsion.org/borgmatic-collective/borgmatic/pulls/747
Dan Helfman 1 年之前
父节点
当前提交
b3d2560563

+ 30 - 0
borgmatic/config/schema.yaml

@@ -1403,3 +1403,33 @@ properties:
             Configuration for a monitoring integration with Crunhub. Create an
             account at https://cronhub.io if you'd like to use this service. See
             borgmatic monitoring documentation for details.
+    loki:
+        type: object
+        required: ['url', 'labels']
+        additionalProperties: false
+        properties:
+            url:
+                type: string
+                description: |
+                    Grafana loki log URL to notify when a backup begins,
+                    ends, or fails.
+                example: "http://localhost:3100/loki/api/v1/push"
+            labels:
+                type: object
+                additionalProperties:
+                    type: string
+                description: |
+                    Allows setting custom labels for the logging stream. At
+                    least one label is required. "__hostname" gets replaced by
+                    the machine hostname automatically. "__config" gets replaced
+                    by just the name of the configuration file. "__config_path"
+                    gets replaced by the full path of the configuration file.
+                example:
+                    app: "borgmatic"
+                    config: "__config"
+                    hostname: "__hostname"
+        description: |
+            Configuration for a monitoring integration with Grafana loki. You
+            can send the logs to a self-hosted instance or create an account at
+            https://grafana.com/auth/sign-up/create-user. See borgmatic
+            monitoring documentation for details.

+ 2 - 0
borgmatic/hooks/dispatch.py

@@ -4,6 +4,7 @@ from borgmatic.hooks import (
     cronhub,
     cronitor,
     healthchecks,
+    loki,
     mariadb,
     mongodb,
     mysql,
@@ -26,6 +27,7 @@ HOOK_NAME_TO_MODULE = {
     'pagerduty': pagerduty,
     'postgresql_databases': postgresql,
     'sqlite_databases': sqlite,
+    'loki': loki,
 }
 
 

+ 149 - 0
borgmatic/hooks/loki.py

@@ -0,0 +1,149 @@
+import json
+import logging
+import os
+import platform
+import time
+
+import requests
+
+from borgmatic.hooks import monitor
+
+logger = logging.getLogger(__name__)
+
+MONITOR_STATE_TO_LOKI = {
+    monitor.State.START: 'Started',
+    monitor.State.FINISH: 'Finished',
+    monitor.State.FAIL: 'Failed',
+}
+
+# Threshold at which logs get flushed to loki
+MAX_BUFFER_LINES = 100
+
+
+class Loki_log_buffer:
+    '''
+    A log buffer that allows to output the logs as loki requests in json. Allows
+    adding labels to the log stream and takes care of communication with loki.
+    '''
+
+    def __init__(self, url, dry_run):
+        self.url = url
+        self.dry_run = dry_run
+        self.root = {'streams': [{'stream': {}, 'values': []}]}
+
+    def add_value(self, value):
+        '''
+        Add a log entry to the stream.
+        '''
+        timestamp = str(time.time_ns())
+        self.root['streams'][0]['values'].append((timestamp, value))
+
+    def add_label(self, label, value):
+        '''
+        Add a label to the logging stream.
+        '''
+        self.root['streams'][0]['stream'][label] = value
+
+    def to_request(self):
+        return json.dumps(self.root)
+
+    def __len__(self):
+        '''
+        Gets the number of lines currently in the buffer.
+        '''
+        return len(self.root['streams'][0]['values'])
+
+    def flush(self):
+        if self.dry_run:
+            # Just empty the buffer and skip
+            self.root['streams'][0]['values'] = []
+            logger.info('Skipped uploading logs to loki due to dry run')
+            return
+
+        if len(self) == 0:
+            # Skip as there are not logs to send yet
+            return
+
+        request_body = self.to_request()
+        self.root['streams'][0]['values'] = []
+        request_header = {'Content-Type': 'application/json'}
+        try:
+            result = requests.post(self.url, headers=request_header, data=request_body, timeout=5)
+            result.raise_for_status()
+        except requests.RequestException:
+            logger.warning('Failed to upload logs to loki')
+
+
+class Loki_log_handler(logging.Handler):
+    '''
+    A log handler that sends logs to loki.
+    '''
+
+    def __init__(self, url, dry_run):
+        super().__init__()
+        self.buffer = Loki_log_buffer(url, dry_run)
+
+    def emit(self, record):
+        '''
+        Add a log record from the logging module to the stream.
+        '''
+        self.raw(record.getMessage())
+
+    def add_label(self, key, value):
+        '''
+        Add a label to the logging stream.
+        '''
+        self.buffer.add_label(key, value)
+
+    def raw(self, msg):
+        '''
+        Add an arbitrary string as a log entry to the stream.
+        '''
+        self.buffer.add_value(msg)
+        if len(self.buffer) > MAX_BUFFER_LINES:
+            self.buffer.flush()
+
+    def flush(self):
+        '''
+        Send the logs to loki and empty the buffer.
+        '''
+        self.buffer.flush()
+
+
+def initialize_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run):
+    '''
+    Add a handler to the root logger to regularly send the logs to loki.
+    '''
+    url = hook_config.get('url')
+    loki = Loki_log_handler(url, dry_run)
+    for key, value in hook_config.get('labels').items():
+        if value == '__hostname':
+            loki.add_label(key, platform.node())
+        elif value == '__config':
+            loki.add_label(key, os.path.basename(config_filename))
+        elif value == '__config_path':
+            loki.add_label(key, config_filename)
+        else:
+            loki.add_label(key, value)
+    logging.getLogger().addHandler(loki)
+
+
+def ping_monitor(hook_config, config, config_filename, state, monitoring_log_level, dry_run):
+    '''
+    Add an entry to the loki logger with the current state.
+    '''
+    for handler in tuple(logging.getLogger().handlers):
+        if isinstance(handler, Loki_log_handler):
+            if state in MONITOR_STATE_TO_LOKI.keys():
+                handler.raw(f'{config_filename}: {MONITOR_STATE_TO_LOKI[state]} backup')
+
+
+def destroy_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run):
+    '''
+    Remove the monitor handler that was added to the root logger.
+    '''
+    logger = logging.getLogger()
+    for handler in tuple(logger.handlers):
+        if isinstance(handler, Loki_log_handler):
+            handler.flush()
+            logger.removeHandler(handler)

+ 1 - 1
borgmatic/hooks/monitor.py

@@ -1,6 +1,6 @@
 from enum import Enum
 
-MONITOR_HOOK_NAMES = ('healthchecks', 'cronitor', 'cronhub', 'pagerduty', 'ntfy')
+MONITOR_HOOK_NAMES = ('healthchecks', 'cronitor', 'cronhub', 'pagerduty', 'ntfy', 'loki')
 
 
 class State(Enum):

+ 82 - 0
tests/integration/hooks/test_loki.py

@@ -0,0 +1,82 @@
+import logging
+import platform
+
+from flexmock import flexmock
+
+from borgmatic.hooks import loki as module
+
+
+def test_log_handler_label_replacment():
+    '''
+    Assert that label placeholders get replaced
+    '''
+    hook_config = {
+        'url': 'http://localhost:3100/loki/api/v1/push',
+        'labels': {'hostname': '__hostname', 'config': '__config', 'config_full': '__config_path'},
+    }
+    config_filename = '/mock/path/test.yaml'
+    dry_run = True
+    module.initialize_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run)
+    for handler in tuple(logging.getLogger().handlers):
+        if isinstance(handler, module.Loki_log_handler):
+            assert handler.buffer.root['streams'][0]['stream']['hostname'] == platform.node()
+            assert handler.buffer.root['streams'][0]['stream']['config'] == 'test.yaml'
+            assert handler.buffer.root['streams'][0]['stream']['config_full'] == config_filename
+            return
+    assert False
+
+
+def test_initalize_adds_log_handler():
+    '''
+    Assert that calling initialize_monitor adds our logger to the root logger
+    '''
+    hook_config = {'url': 'http://localhost:3100/loki/api/v1/push', 'labels': {'app': 'borgmatic'}}
+    module.initialize_monitor(
+        hook_config,
+        flexmock(),
+        config_filename='test.yaml',
+        monitoring_log_level=flexmock(),
+        dry_run=True,
+    )
+    for handler in tuple(logging.getLogger().handlers):
+        if isinstance(handler, module.Loki_log_handler):
+            return
+    assert False
+
+
+def test_ping_adds_log_message():
+    '''
+    Assert that calling ping_monitor adds a message to our logger
+    '''
+    hook_config = {'url': 'http://localhost:3100/loki/api/v1/push', 'labels': {'app': 'borgmatic'}}
+    config_filename = 'test.yaml'
+    dry_run = True
+    module.initialize_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run)
+    module.ping_monitor(
+        hook_config, flexmock(), config_filename, module.monitor.State.FINISH, flexmock(), dry_run
+    )
+    for handler in tuple(logging.getLogger().handlers):
+        if isinstance(handler, module.Loki_log_handler):
+            assert any(
+                map(
+                    lambda log: log
+                    == f'{config_filename}: {module.MONITOR_STATE_TO_LOKI[module.monitor.State.FINISH]} backup',
+                    map(lambda x: x[1], handler.buffer.root['streams'][0]['values']),
+                )
+            )
+            return
+    assert False
+
+
+def test_log_handler_gets_removed():
+    '''
+    Assert that destroy_monitor removes the logger from the root logger
+    '''
+    hook_config = {'url': 'http://localhost:3100/loki/api/v1/push', 'labels': {'app': 'borgmatic'}}
+    config_filename = 'test.yaml'
+    dry_run = True
+    module.initialize_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run)
+    module.destroy_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run)
+    for handler in tuple(logging.getLogger().handlers):
+        if isinstance(handler, module.Loki_log_handler):
+            assert False

+ 98 - 0
tests/unit/hooks/test_loki.py

@@ -0,0 +1,98 @@
+import json
+
+import requests
+from flexmock import flexmock
+
+from borgmatic.hooks import loki as module
+
+
+def test_log_handler_gets_labels():
+    '''
+    Assert that adding labels works
+    '''
+    buffer = module.Loki_log_buffer(flexmock(), False)
+    buffer.add_label('test', 'label')
+    assert buffer.root['streams'][0]['stream']['test'] == 'label'
+    buffer.add_label('test2', 'label2')
+    assert buffer.root['streams'][0]['stream']['test2'] == 'label2'
+
+
+def test_log_buffer_gets_raw():
+    '''
+    Assert that adding values to the log buffer increases it's length
+    '''
+    buffer = module.Loki_log_buffer(flexmock(), False)
+    assert len(buffer) == 0
+    buffer.add_value('Some test log line')
+    assert len(buffer) == 1
+    buffer.add_value('Another test log line')
+    assert len(buffer) == 2
+
+
+def test_log_buffer_gets_log_messages():
+    '''
+    Assert that adding log records works
+    '''
+    handler = module.Loki_log_handler(flexmock(), False)
+    handler.emit(flexmock(getMessage=lambda: 'Some test log line'))
+    assert len(handler.buffer) == 1
+
+
+def test_log_buffer_json():
+    '''
+    Assert that the buffer correctly serializes when empty
+    '''
+    buffer = module.Loki_log_buffer(flexmock(), False)
+    assert json.loads(buffer.to_request()) == json.loads('{"streams":[{"stream":{},"values":[]}]}')
+
+
+def test_log_buffer_json_labels():
+    '''
+    Assert that the buffer correctly serializes with labels
+    '''
+    buffer = module.Loki_log_buffer(flexmock(), False)
+    buffer.add_label('test', 'label')
+    assert json.loads(buffer.to_request()) == json.loads(
+        '{"streams":[{"stream":{"test": "label"},"values":[]}]}'
+    )
+
+
+def test_log_buffer_json_log_lines():
+    '''
+    Assert that log lines end up in the correct place in the log buffer
+    '''
+    buffer = module.Loki_log_buffer(flexmock(), False)
+    buffer.add_value('Some test log line')
+    assert json.loads(buffer.to_request())['streams'][0]['values'][0][1] == 'Some test log line'
+
+
+def test_log_handler_post():
+    '''
+    Assert that the flush function sends a post request after a certain limit
+    '''
+    handler = module.Loki_log_handler(flexmock(), False)
+    flexmock(module.requests).should_receive('post').and_return(
+        flexmock(raise_for_status=lambda: '')
+    ).once()
+    for num in range(int(module.MAX_BUFFER_LINES * 1.5)):
+        handler.raw(num)
+
+
+def test_log_handler_post_failiure():
+    '''
+    Assert that the flush function catches request exceptions
+    '''
+    handler = module.Loki_log_handler(flexmock(), False)
+    flexmock(module.requests).should_receive('post').and_return(
+        flexmock(raise_for_status=lambda: (_ for _ in ()).throw(requests.RequestException()))
+    ).once()
+    for num in range(int(module.MAX_BUFFER_LINES * 1.5)):
+        handler.raw(num)
+
+
+def test_log_handler_empty_flush_noop():
+    '''
+    Test that flushing an empty buffer does indeed nothing
+    '''
+    handler = module.Loki_log_handler(flexmock(), False)
+    handler.flush()