浏览代码

wrap msgpack, fixes #3632, fixes #2738

wrap msgpack to avoid future upstream api changes making troubles
or that we would have to globally spoil our code with extra params.

make sure the packing is always with use_bin_type=False,
thus generating "old" msgpack format (as borg always did) from
bytes objects.

make sure the unpacking is always with raw=True,
thus generating bytes objects.

note:

safe unicode encoding/decoding for some kinds of data types is done in Item
class (see item.pyx), so it is enough if we care for bytes objects on the
msgpack level.

also wrap exception handling, so borg code can catch msgpack specific
exceptions even if the upstream msgpack code raises way too generic
exceptions typed Exception, TypeError or ValueError.
We use own Exception classes for this, upstream classes are deprecated
Thomas Waldmann 7 年之前
父节点
当前提交
3c173cc03b

+ 11 - 24
src/borg/archive.py

@@ -13,8 +13,6 @@ from io import BytesIO
 from itertools import groupby, zip_longest
 from shutil import get_terminal_size
 
-import msgpack
-
 from .logger import create_logger
 
 logger = create_logger()
@@ -39,6 +37,7 @@ from .helpers import StableDict
 from .helpers import bin_to_hex
 from .helpers import safe_ns
 from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
+from .helpers import msgpack
 from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
 from .item import Item, ArchiveItem, ItemDiff
 from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
@@ -239,7 +238,7 @@ class ChunkBuffer:
 
     def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
         self.buffer = BytesIO()
-        self.packer = msgpack.Packer(unicode_errors='surrogateescape')
+        self.packer = msgpack.Packer()
         self.chunks = []
         self.key = key
         self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
@@ -348,7 +347,7 @@ class Archive:
 
     def _load_meta(self, id):
         data = self.key.decrypt(id, self.repository.get(id))
-        metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
+        metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
         if metadata.version != 1:
             raise Exception('Unknown archive metadata version')
         return metadata
@@ -735,7 +734,7 @@ Utilization of max. archive size: {csize_max:.0%}
     def set_meta(self, key, value):
         metadata = self._load_meta(self.id)
         setattr(metadata, key, value)
-        data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
+        data = msgpack.packb(metadata.as_dict())
         new_id = self.key.id_hash(data)
         self.cache.add_chunk(new_id, data, self.stats)
         self.manifest.archives[self.name] = (new_id, metadata.time)
@@ -1207,9 +1206,6 @@ def valid_msgpacked_dict(d, keys_serialized):
 class RobustUnpacker:
     """A restartable/robust version of the streaming msgpack unpacker
     """
-    class UnpackerCrashed(Exception):
-        """raise if unpacker crashed"""
-
     def __init__(self, validator, item_keys):
         super().__init__()
         self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
@@ -1232,14 +1228,6 @@ class RobustUnpacker:
         return self
 
     def __next__(self):
-        def unpack_next():
-            try:
-                return next(self._unpacker)
-            except (TypeError, ValueError) as err:
-                # transform exceptions that might be raised when feeding
-                # msgpack with invalid data to a more specific exception
-                raise self.UnpackerCrashed(str(err))
-
         if self._resync:
             data = b''.join(self._buffered_data)
             while self._resync:
@@ -1252,8 +1240,8 @@ class RobustUnpacker:
                 self._unpacker = msgpack.Unpacker(object_hook=StableDict)
                 self._unpacker.feed(data)
                 try:
-                    item = unpack_next()
-                except (self.UnpackerCrashed, StopIteration):
+                    item = next(self._unpacker)
+                except (msgpack.UnpackException, StopIteration):
                     # as long as we are resyncing, we also ignore StopIteration
                     pass
                 else:
@@ -1262,7 +1250,7 @@ class RobustUnpacker:
                         return item
                 data = data[1:]
         else:
-            return unpack_next()
+            return next(self._unpacker)
 
 
 class ArchiveChecker:
@@ -1459,9 +1447,8 @@ class ArchiveChecker:
                 continue
             try:
                 archive = msgpack.unpackb(data)
-            # Ignore exceptions that might be raised when feeding
-            # msgpack with invalid data
-            except (TypeError, ValueError, StopIteration):
+            # Ignore exceptions that might be raised when feeding msgpack with invalid data
+            except msgpack.UnpackException:
                 continue
             if valid_archive(archive):
                 archive = ArchiveItem(internal_dict=archive)
@@ -1640,7 +1627,7 @@ class ArchiveChecker:
                                 yield Item(internal_dict=item)
                             else:
                                 report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
-                    except RobustUnpacker.UnpackerCrashed as err:
+                    except msgpack.UnpackException as err:
                         report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
                         unpacker.resync()
                     except Exception:
@@ -1696,7 +1683,7 @@ class ArchiveChecker:
                 for previous_item_id in archive.items:
                     mark_as_possibly_superseded(previous_item_id)
                 archive.items = items_buffer.chunks
-                data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
+                data = msgpack.packb(archive.as_dict())
                 new_archive_id = self.key.id_hash(data)
                 cdata = self.key.encrypt(data)
                 add_reference(new_archive_id, len(data), len(cdata), cdata)

+ 6 - 6
src/borg/archiver.py

@@ -29,8 +29,6 @@ from .logger import create_logger, setup_logging
 
 logger = create_logger()
 
-import msgpack
-
 import borg
 from . import __version__
 from . import helpers
@@ -68,6 +66,7 @@ from .helpers import ChunkIteratorFileWrapper
 from .helpers import popen_with_error_handling, prepare_subprocess_env
 from .helpers import dash_open
 from .helpers import umount
+from .helpers import msgpack
 from .nanorst import rst_to_terminal
 from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern
 from .patterns import PatternMatcher
@@ -1705,7 +1704,7 @@ class Archiver:
             fd.write(',\n')
 
             data = key.decrypt(archive_meta_orig[b'id'], repository.get(archive_meta_orig[b'id']))
-            archive_org_dict = msgpack.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape')
+            archive_org_dict = msgpack.unpackb(data, object_hook=StableDict)
 
             fd.write('    "_meta":\n')
             fd.write(do_indent(prepare_dump_dict(archive_org_dict)))
@@ -1738,7 +1737,7 @@ class Archiver:
 
         data = key.decrypt(None, repository.get(manifest.MANIFEST_ID))
 
-        meta = prepare_dump_dict(msgpack.fallback.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape'))
+        meta = prepare_dump_dict(msgpack.unpackb(data, object_hook=StableDict))
 
         with dash_open(args.path, 'w') as fd:
             json.dump(meta, fd, indent=4)
@@ -1844,7 +1843,7 @@ class Archiver:
         """convert Borg profile to Python profile"""
         import marshal
         with args.output, args.input:
-            marshal.dump(msgpack.unpack(args.input, use_list=False, encoding='utf-8'), args.output)
+            marshal.dump(msgpack.mp_unpack(args.input, use_list=False, raw=False), args.output)
         return EXIT_SUCCESS
 
     @with_repository(lock=False, manifest=False)
@@ -4107,7 +4106,8 @@ class Archiver:
                         # into a marshal file that can be read by e.g. pyprof2calltree.
                         # For local use it's unnecessary hassle, though, that's why .pyprof makes
                         # it compatible (see above).
-                        msgpack.pack(profiler.stats, fd, use_bin_type=True)
+                        # We do not use our msgpack wrapper here, but directly call mp_pack.
+                        msgpack.mp_pack(profiler.stats, fd, use_bin_type=True)
         else:
             return set_ec(func(args))
 

+ 1 - 2
src/borg/cache.py

@@ -6,8 +6,6 @@ from binascii import unhexlify
 from collections import namedtuple
 from time import perf_counter
 
-import msgpack
-
 from .logger import create_logger
 
 logger = create_logger()
@@ -26,6 +24,7 @@ from .helpers import remove_surrogates
 from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
 from .helpers import set_ec, EXIT_WARNING
 from .helpers import truncate_and_unlink
+from .helpers import msgpack
 from .item import ArchiveItem, ChunkListEntry
 from .crypto.key import PlaintextKey
 from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError

+ 3 - 4
src/borg/crypto/key.py

@@ -9,8 +9,6 @@ from binascii import a2b_base64, b2a_base64, hexlify
 from hashlib import sha256, sha512, pbkdf2_hmac
 from hmac import HMAC, compare_digest
 
-import msgpack
-
 from ..logger import create_logger
 
 logger = create_logger()
@@ -24,6 +22,7 @@ from ..helpers import get_keys_dir, get_security_dir
 from ..helpers import get_limited_unpacker
 from ..helpers import bin_to_hex
 from ..helpers import prepare_subprocess_env
+from ..helpers import msgpack
 from ..item import Key, EncryptedKey
 from ..platform import SaveFile
 
@@ -212,10 +211,10 @@ class KeyBase:
             'hmac': bytes(64),
             'salt': os.urandom(64),
         })
-        packed = msgpack.packb(metadata_dict, unicode_errors='surrogateescape')
+        packed = msgpack.packb(metadata_dict)
         tam_key = self._tam_key(tam['salt'], context)
         tam['hmac'] = HMAC(tam_key, packed, sha512).digest()
-        return msgpack.packb(metadata_dict, unicode_errors='surrogateescape')
+        return msgpack.packb(metadata_dict)
 
     def unpack_and_verify_manifest(self, data, force_tam_not_required=False):
         """Unpack msgpacked *data* and return (object, did_verify)."""

+ 1 - 1
src/borg/fuse.py

@@ -11,7 +11,6 @@ from signal import SIGINT
 from distutils.version import LooseVersion
 
 import llfuse
-import msgpack
 
 from .logger import create_logger
 logger = create_logger()
@@ -21,6 +20,7 @@ from .archiver import Archiver
 from .archive import Archive
 from .hashindex import FuseVersionsIndex
 from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
+from .helpers import msgpack
 from .item import Item
 from .lrucache import LRUCache
 from .remote import RemoteRepository

+ 3 - 1
src/borg/helpers/__init__.py

@@ -12,7 +12,6 @@ from .errors import *  # NOQA
 from .fs import *  # NOQA
 from .manifest import *  # NOQA
 from .misc import *  # NOQA
-from .msgpack import *  # NOQA
 from .parseformat import *  # NOQA
 from .process import *  # NOQA
 from .progress import *  # NOQA
@@ -20,6 +19,9 @@ from .time import *  # NOQA
 from .usergroup import *  # NOQA
 from .yes import *  # NOQA
 
+from .msgpack import is_slow_msgpack, int_to_bigint, bigint_to_int, get_limited_unpacker
+from . import msgpack
+
 """
 The global exit_code variable is used so that modules other than archiver can increase the program exit code if a
 warning or error occurred during their operation. This is different from archiver.exit_code, which is only accessible

+ 140 - 6
src/borg/helpers/msgpack.py

@@ -1,11 +1,147 @@
-import msgpack
-import msgpack.fallback
-
 from .datastruct import StableDict
 from ..constants import *  # NOQA
 
+# wrapping msgpack ---------------------------------------------------------------------------------------------------
+#
+# due to the planned breaking api changes in upstream msgpack, we wrap it the way we need it -
+# to avoid having lots of clutter in the calling code. see tickets #968 and #3632.
+#
+# Packing
+# -------
+# use_bin_type = False is needed to generate the old msgpack format (not msgpack 2.0 spec) as borg always did.
+# encoding = None is needed because usage of it is deprecated
+# unicode_errors = None is needed because usage of it is deprecated
+#
+# Unpacking
+# ---------
+# raw = True is needed to unpack the old msgpack format to bytes (not str, about the decoding see item.pyx).
+# encoding = None is needed because usage of it is deprecated
+# unicode_errors = None is needed because usage of it is deprecated
+
+from msgpack import Packer as mp_Packer
+from msgpack import packb as mp_packb
+from msgpack import pack as mp_pack
+from msgpack import Unpacker as mp_Unpacker
+from msgpack import unpackb as mp_unpackb
+from msgpack import unpack as mp_unpack
+
+from msgpack import ExtType
+from msgpack import OutOfData
+
+
+class PackException(Exception):
+    """Exception while msgpack packing"""
+
+
+class UnpackException(Exception):
+    """Exception while msgpack unpacking"""
+
+
+class Packer(mp_Packer):
+    def __init__(self, *, default=None, encoding=None, unicode_errors=None,
+                 use_single_float=False, autoreset=True, use_bin_type=False,
+                 strict_types=False):
+        assert use_bin_type is False
+        assert encoding is None
+        assert unicode_errors is None
+        super().__init__(default=default, encoding=encoding, unicode_errors=unicode_errors,
+                         use_single_float=use_single_float, autoreset=autoreset, use_bin_type=use_bin_type,
+                         strict_types=strict_types)
+
+    def pack(self, obj):
+        try:
+            return super().pack(obj)
+        except Exception as e:
+            raise PackException(e)
+
+
+def packb(o, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs):
+    assert use_bin_type is False
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_packb(o, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise PackException(e)
+
+
+def pack(o, stream, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs):
+    assert use_bin_type is False
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_pack(o, stream, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise PackException(e)
+
+
+class Unpacker(mp_Unpacker):
+    def __init__(self, file_like=None, *, read_size=0, use_list=True, raw=True,
+                 object_hook=None, object_pairs_hook=None, list_hook=None,
+                 encoding=None, unicode_errors=None, max_buffer_size=0,
+                 ext_hook=ExtType,
+                 max_str_len=2147483647,  # 2**32-1
+                 max_bin_len=2147483647,
+                 max_array_len=2147483647,
+                 max_map_len=2147483647,
+                 max_ext_len=2147483647):
+        assert raw is True
+        assert encoding is None
+        assert unicode_errors is None
+        super().__init__(file_like=file_like, read_size=read_size, use_list=use_list, raw=raw,
+                         object_hook=object_hook, object_pairs_hook=object_pairs_hook, list_hook=list_hook,
+                         encoding=encoding, unicode_errors=unicode_errors, max_buffer_size=max_buffer_size,
+                         ext_hook=ext_hook,
+                         max_str_len=max_str_len,
+                         max_bin_len=max_bin_len,
+                         max_array_len=max_array_len,
+                         max_map_len=max_map_len,
+                         max_ext_len=max_ext_len)
+
+    def unpack(self):
+        try:
+            return super().unpack()
+        except OutOfData:
+            raise
+        except Exception as e:
+            raise UnpackException(e)
+
+    def __next__(self):
+        try:
+            return super().__next__()
+        except StopIteration:
+            raise
+        except Exception as e:
+            raise UnpackException(e)
+
+    next = __next__
+
+
+def unpackb(packed, *, raw=True, encoding=None, unicode_errors=None, **kwargs):
+    assert raw is True
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_unpackb(packed, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise UnpackException(e)
+
+
+def unpack(stream, *, raw=True, encoding=None, unicode_errors=None, **kwargs):
+    assert raw is True
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_unpack(stream, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise UnpackException(e)
+
+
+# msgpacking related utilities -----------------------------------------------
 
 def is_slow_msgpack():
+    import msgpack
+    import msgpack.fallback
     return msgpack.Packer is msgpack.fallback.Packer
 
 
@@ -31,7 +167,6 @@ def get_limited_unpacker(kind):
                          max_map_len=MAX_ARCHIVES,  # list of archives
                          max_str_len=255,  # archive name
                          object_hook=StableDict,
-                         unicode_errors='surrogateescape',
                          ))
     elif kind == 'key':
         args.update(dict(use_list=True,  # default value
@@ -39,11 +174,10 @@ def get_limited_unpacker(kind):
                          max_map_len=10,  # EncryptedKey dict
                          max_str_len=4000,  # inner key data
                          object_hook=StableDict,
-                         unicode_errors='surrogateescape',
                          ))
     else:
         raise ValueError('kind must be "server", "client", "manifest" or "key"')
-    return msgpack.Unpacker(**args)
+    return Unpacker(**args)
 
 
 def bigint_to_int(mtime):

+ 1 - 2
src/borg/remote.py

@@ -15,8 +15,6 @@ import time
 import traceback
 from subprocess import Popen, PIPE
 
-import msgpack
-
 from . import __version__
 from .compress import LZ4
 from .constants import *  # NOQA
@@ -31,6 +29,7 @@ from .helpers import format_file_size
 from .helpers import truncate_and_unlink
 from .helpers import prepare_subprocess_env
 from .logger import create_logger, setup_logging
+from .helpers import msgpack
 from .repository import Repository
 from .version import parse_version, format_version
 from .algorithms.checksums import xxh64

+ 2 - 3
src/borg/repository.py

@@ -11,8 +11,6 @@ from datetime import datetime
 from functools import partial
 from itertools import islice
 
-import msgpack
-
 from .constants import *  # NOQA
 from .hashindex import NSIndex
 from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
@@ -22,6 +20,7 @@ from .helpers import bin_to_hex
 from .helpers import hostname_is_unique
 from .helpers import secure_erase, truncate_and_unlink
 from .helpers import Manifest
+from .helpers import msgpack
 from .locking import Lock, LockError, LockErrorT
 from .logger import create_logger
 from .lrucache import LRUCache
@@ -513,7 +512,7 @@ class Repository:
             try:
                 with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
                     hints = msgpack.unpack(fd)
-            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e:
+            except (msgpack.UnpackException, FileNotFoundError, FileIntegrityError) as e:
                 logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
                 if not isinstance(e, FileNotFoundError):
                     os.unlink(hints_path)

+ 1 - 1
src/borg/testsuite/archive.py

@@ -3,7 +3,6 @@ from datetime import datetime, timezone
 from io import StringIO
 from unittest.mock import Mock
 
-import msgpack
 import pytest
 
 from . import BaseTestCase
@@ -11,6 +10,7 @@ from ..crypto.key import PlaintextKey
 from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
 from ..archive import BackupOSError, backup_io, backup_io_iter
 from ..helpers import Manifest
+from ..helpers import msgpack
 from ..item import Item, ArchiveItem
 
 

+ 1 - 1
src/borg/testsuite/archiver.py

@@ -23,7 +23,6 @@ from hashlib import sha256
 from io import BytesIO, StringIO
 from unittest.mock import patch
 
-import msgpack
 import pytest
 
 try:
@@ -46,6 +45,7 @@ from ..helpers import Manifest, MandatoryFeatureUnsupported
 from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
 from ..helpers import bin_to_hex
 from ..helpers import MAX_S
+from ..helpers import msgpack
 from ..nanorst import RstToTextLazy, rst_to_terminal
 from ..patterns import IECommand, PatternMatcher, parse_pattern
 from ..item import Item, ItemDiff

+ 4 - 3
src/borg/testsuite/helpers.py

@@ -8,9 +8,6 @@ from time import sleep
 
 import pytest
 
-import msgpack
-import msgpack.fallback
-
 from .. import platform
 from ..helpers import Location
 from ..helpers import Buffer
@@ -19,6 +16,7 @@ from ..helpers import make_path_safe, clean_lines
 from ..helpers import interval, prune_within, prune_split
 from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir
 from ..helpers import is_slow_msgpack
+from ..helpers import msgpack
 from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
 from ..helpers import StableDict, int_to_bigint, bigint_to_int, bin_to_hex
 from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams
@@ -594,6 +592,9 @@ def test_parse_file_size_invalid(string):
 
 
 def test_is_slow_msgpack():
+    # we need to import upstream msgpack package here, not helpers.msgpack:
+    import msgpack
+    import msgpack.fallback
     saved_packer = msgpack.Packer
     try:
         msgpack.Packer = msgpack.fallback.Packer

+ 2 - 2
src/borg/testsuite/key.py

@@ -4,7 +4,6 @@ import re
 import tempfile
 from binascii import hexlify, unhexlify
 
-import msgpack
 import pytest
 
 from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex
@@ -19,6 +18,7 @@ from ..helpers import IntegrityError
 from ..helpers import Location
 from ..helpers import StableDict
 from ..helpers import get_security_dir
+from ..helpers import msgpack
 
 
 class TestKey:
@@ -333,7 +333,7 @@ class TestTAM:
             key.unpack_and_verify_manifest(blob)
 
         blob = b'\xc1\xc1\xc1'
-        with pytest.raises((ValueError, msgpack.UnpackException)):
+        with pytest.raises(msgpack.UnpackException):
             key.unpack_and_verify_manifest(blob)
 
     def test_missing_when_required(self, key):

+ 1 - 2
src/borg/testsuite/repository.py

@@ -6,13 +6,12 @@ import sys
 import tempfile
 from unittest.mock import patch
 
-import msgpack
-
 import pytest
 
 from ..hashindex import NSIndex
 from ..helpers import Location
 from ..helpers import IntegrityError
+from ..helpers import msgpack
 from ..locking import Lock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
 from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE