Sfoglia il codice sorgente

Merge pull request #3943 from ThomasWaldmann/wrap-msgpack

wrap msgpack to avoid api troubles
TW 6 anni fa
parent
commit
a350dd8f3e

+ 11 - 24
src/borg/archive.py

@@ -13,8 +13,6 @@ from io import BytesIO
 from itertools import groupby, zip_longest
 from shutil import get_terminal_size
 
-import msgpack
-
 from .logger import create_logger
 
 logger = create_logger()
@@ -39,6 +37,7 @@ from .helpers import StableDict
 from .helpers import bin_to_hex
 from .helpers import safe_ns
 from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
+from .helpers import msgpack
 from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
 from .item import Item, ArchiveItem, ItemDiff
 from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
@@ -239,7 +238,7 @@ class ChunkBuffer:
 
     def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
         self.buffer = BytesIO()
-        self.packer = msgpack.Packer(unicode_errors='surrogateescape')
+        self.packer = msgpack.Packer()
         self.chunks = []
         self.key = key
         self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
@@ -348,7 +347,7 @@ class Archive:
 
     def _load_meta(self, id):
         data = self.key.decrypt(id, self.repository.get(id))
-        metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
+        metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
         if metadata.version != 1:
             raise Exception('Unknown archive metadata version')
         return metadata
@@ -735,7 +734,7 @@ Utilization of max. archive size: {csize_max:.0%}
     def set_meta(self, key, value):
         metadata = self._load_meta(self.id)
         setattr(metadata, key, value)
-        data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
+        data = msgpack.packb(metadata.as_dict())
         new_id = self.key.id_hash(data)
         self.cache.add_chunk(new_id, data, self.stats)
         self.manifest.archives[self.name] = (new_id, metadata.time)
@@ -1207,9 +1206,6 @@ def valid_msgpacked_dict(d, keys_serialized):
 class RobustUnpacker:
     """A restartable/robust version of the streaming msgpack unpacker
     """
-    class UnpackerCrashed(Exception):
-        """raise if unpacker crashed"""
-
     def __init__(self, validator, item_keys):
         super().__init__()
         self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
@@ -1232,14 +1228,6 @@ class RobustUnpacker:
         return self
 
     def __next__(self):
-        def unpack_next():
-            try:
-                return next(self._unpacker)
-            except (TypeError, ValueError) as err:
-                # transform exceptions that might be raised when feeding
-                # msgpack with invalid data to a more specific exception
-                raise self.UnpackerCrashed(str(err))
-
         if self._resync:
             data = b''.join(self._buffered_data)
             while self._resync:
@@ -1252,8 +1240,8 @@ class RobustUnpacker:
                 self._unpacker = msgpack.Unpacker(object_hook=StableDict)
                 self._unpacker.feed(data)
                 try:
-                    item = unpack_next()
-                except (self.UnpackerCrashed, StopIteration):
+                    item = next(self._unpacker)
+                except (msgpack.UnpackException, StopIteration):
                     # as long as we are resyncing, we also ignore StopIteration
                     pass
                 else:
@@ -1262,7 +1250,7 @@ class RobustUnpacker:
                         return item
                 data = data[1:]
         else:
-            return unpack_next()
+            return next(self._unpacker)
 
 
 class ArchiveChecker:
@@ -1459,9 +1447,8 @@ class ArchiveChecker:
                 continue
             try:
                 archive = msgpack.unpackb(data)
-            # Ignore exceptions that might be raised when feeding
-            # msgpack with invalid data
-            except (TypeError, ValueError, StopIteration):
+            # Ignore exceptions that might be raised when feeding msgpack with invalid data
+            except msgpack.UnpackException:
                 continue
             if valid_archive(archive):
                 archive = ArchiveItem(internal_dict=archive)
@@ -1640,7 +1627,7 @@ class ArchiveChecker:
                                 yield Item(internal_dict=item)
                             else:
                                 report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
-                    except RobustUnpacker.UnpackerCrashed as err:
+                    except msgpack.UnpackException as err:
                         report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
                         unpacker.resync()
                     except Exception:
@@ -1696,7 +1683,7 @@ class ArchiveChecker:
                 for previous_item_id in archive.items:
                     mark_as_possibly_superseded(previous_item_id)
                 archive.items = items_buffer.chunks
-                data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
+                data = msgpack.packb(archive.as_dict())
                 new_archive_id = self.key.id_hash(data)
                 cdata = self.key.encrypt(data)
                 add_reference(new_archive_id, len(data), len(cdata), cdata)

+ 6 - 6
src/borg/archiver.py

@@ -29,8 +29,6 @@ from .logger import create_logger, setup_logging
 
 logger = create_logger()
 
-import msgpack
-
 import borg
 from . import __version__
 from . import helpers
@@ -68,6 +66,7 @@ from .helpers import ChunkIteratorFileWrapper
 from .helpers import popen_with_error_handling, prepare_subprocess_env
 from .helpers import dash_open
 from .helpers import umount
+from .helpers import msgpack
 from .nanorst import rst_to_terminal
 from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern
 from .patterns import PatternMatcher
@@ -1705,7 +1704,7 @@ class Archiver:
             fd.write(',\n')
 
             data = key.decrypt(archive_meta_orig[b'id'], repository.get(archive_meta_orig[b'id']))
-            archive_org_dict = msgpack.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape')
+            archive_org_dict = msgpack.unpackb(data, object_hook=StableDict)
 
             fd.write('    "_meta":\n')
             fd.write(do_indent(prepare_dump_dict(archive_org_dict)))
@@ -1738,7 +1737,7 @@ class Archiver:
 
         data = key.decrypt(None, repository.get(manifest.MANIFEST_ID))
 
-        meta = prepare_dump_dict(msgpack.fallback.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape'))
+        meta = prepare_dump_dict(msgpack.unpackb(data, object_hook=StableDict))
 
         with dash_open(args.path, 'w') as fd:
             json.dump(meta, fd, indent=4)
@@ -1844,7 +1843,7 @@ class Archiver:
         """convert Borg profile to Python profile"""
         import marshal
         with args.output, args.input:
-            marshal.dump(msgpack.unpack(args.input, use_list=False, encoding='utf-8'), args.output)
+            marshal.dump(msgpack.mp_unpack(args.input, use_list=False, raw=False), args.output)
         return EXIT_SUCCESS
 
     @with_repository(lock=False, manifest=False)
@@ -4107,7 +4106,8 @@ class Archiver:
                         # into a marshal file that can be read by e.g. pyprof2calltree.
                         # For local use it's unnecessary hassle, though, that's why .pyprof makes
                         # it compatible (see above).
-                        msgpack.pack(profiler.stats, fd, use_bin_type=True)
+                        # We do not use our msgpack wrapper here, but directly call mp_pack.
+                        msgpack.mp_pack(profiler.stats, fd, use_bin_type=True)
         else:
             return set_ec(func(args))
 

+ 1 - 2
src/borg/cache.py

@@ -6,8 +6,6 @@ from binascii import unhexlify
 from collections import namedtuple
 from time import perf_counter
 
-import msgpack
-
 from .logger import create_logger
 
 logger = create_logger()
@@ -26,6 +24,7 @@ from .helpers import remove_surrogates
 from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
 from .helpers import set_ec, EXIT_WARNING
 from .helpers import truncate_and_unlink
+from .helpers import msgpack
 from .item import ArchiveItem, ChunkListEntry
 from .crypto.key import PlaintextKey
 from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError

+ 3 - 4
src/borg/crypto/key.py

@@ -9,8 +9,6 @@ from binascii import a2b_base64, b2a_base64, hexlify
 from hashlib import sha256, sha512, pbkdf2_hmac
 from hmac import HMAC, compare_digest
 
-import msgpack
-
 from ..logger import create_logger
 
 logger = create_logger()
@@ -24,6 +22,7 @@ from ..helpers import get_keys_dir, get_security_dir
 from ..helpers import get_limited_unpacker
 from ..helpers import bin_to_hex
 from ..helpers import prepare_subprocess_env
+from ..helpers import msgpack
 from ..item import Key, EncryptedKey
 from ..platform import SaveFile
 
@@ -212,10 +211,10 @@ class KeyBase:
             'hmac': bytes(64),
             'salt': os.urandom(64),
         })
-        packed = msgpack.packb(metadata_dict, unicode_errors='surrogateescape')
+        packed = msgpack.packb(metadata_dict)
         tam_key = self._tam_key(tam['salt'], context)
         tam['hmac'] = HMAC(tam_key, packed, sha512).digest()
-        return msgpack.packb(metadata_dict, unicode_errors='surrogateescape')
+        return msgpack.packb(metadata_dict)
 
     def unpack_and_verify_manifest(self, data, force_tam_not_required=False):
         """Unpack msgpacked *data* and return (object, did_verify)."""

+ 1 - 1
src/borg/fuse.py

@@ -11,7 +11,6 @@ from signal import SIGINT
 from distutils.version import LooseVersion
 
 import llfuse
-import msgpack
 
 from .logger import create_logger
 logger = create_logger()
@@ -21,6 +20,7 @@ from .archiver import Archiver
 from .archive import Archive
 from .hashindex import FuseVersionsIndex
 from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
+from .helpers import msgpack
 from .item import Item
 from .lrucache import LRUCache
 from .remote import RemoteRepository

+ 3 - 1
src/borg/helpers/__init__.py

@@ -12,7 +12,6 @@ from .errors import *  # NOQA
 from .fs import *  # NOQA
 from .manifest import *  # NOQA
 from .misc import *  # NOQA
-from .msgpack import *  # NOQA
 from .parseformat import *  # NOQA
 from .process import *  # NOQA
 from .progress import *  # NOQA
@@ -20,6 +19,9 @@ from .time import *  # NOQA
 from .usergroup import *  # NOQA
 from .yes import *  # NOQA
 
+from .msgpack import is_slow_msgpack, int_to_bigint, bigint_to_int, get_limited_unpacker
+from . import msgpack
+
 """
 The global exit_code variable is used so that modules other than archiver can increase the program exit code if a
 warning or error occurred during their operation. This is different from archiver.exit_code, which is only accessible

+ 140 - 6
src/borg/helpers/msgpack.py

@@ -1,11 +1,147 @@
-import msgpack
-import msgpack.fallback
-
 from .datastruct import StableDict
 from ..constants import *  # NOQA
 
+# wrapping msgpack ---------------------------------------------------------------------------------------------------
+#
+# due to the planned breaking api changes in upstream msgpack, we wrap it the way we need it -
+# to avoid having lots of clutter in the calling code. see tickets #968 and #3632.
+#
+# Packing
+# -------
+# use_bin_type = False is needed to generate the old msgpack format (not msgpack 2.0 spec) as borg always did.
+# encoding = None is needed because usage of it is deprecated
+# unicode_errors = None is needed because usage of it is deprecated
+#
+# Unpacking
+# ---------
+# raw = True is needed to unpack the old msgpack format to bytes (not str, about the decoding see item.pyx).
+# encoding = None is needed because usage of it is deprecated
+# unicode_errors = None is needed because usage of it is deprecated
+
+from msgpack import Packer as mp_Packer
+from msgpack import packb as mp_packb
+from msgpack import pack as mp_pack
+from msgpack import Unpacker as mp_Unpacker
+from msgpack import unpackb as mp_unpackb
+from msgpack import unpack as mp_unpack
+
+from msgpack import ExtType
+from msgpack import OutOfData
+
+
+class PackException(Exception):
+    """Exception while msgpack packing"""
+
+
+class UnpackException(Exception):
+    """Exception while msgpack unpacking"""
+
+
+class Packer(mp_Packer):
+    def __init__(self, *, default=None, encoding=None, unicode_errors=None,
+                 use_single_float=False, autoreset=True, use_bin_type=False,
+                 strict_types=False):
+        assert use_bin_type is False
+        assert encoding is None
+        assert unicode_errors is None
+        super().__init__(default=default, encoding=encoding, unicode_errors=unicode_errors,
+                         use_single_float=use_single_float, autoreset=autoreset, use_bin_type=use_bin_type,
+                         strict_types=strict_types)
+
+    def pack(self, obj):
+        try:
+            return super().pack(obj)
+        except Exception as e:
+            raise PackException(e)
+
+
+def packb(o, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs):
+    assert use_bin_type is False
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_packb(o, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise PackException(e)
+
+
+def pack(o, stream, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs):
+    assert use_bin_type is False
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_pack(o, stream, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise PackException(e)
+
+
+class Unpacker(mp_Unpacker):
+    def __init__(self, file_like=None, *, read_size=0, use_list=True, raw=True,
+                 object_hook=None, object_pairs_hook=None, list_hook=None,
+                 encoding=None, unicode_errors=None, max_buffer_size=0,
+                 ext_hook=ExtType,
+                 max_str_len=2147483647,  # 2**32-1
+                 max_bin_len=2147483647,
+                 max_array_len=2147483647,
+                 max_map_len=2147483647,
+                 max_ext_len=2147483647):
+        assert raw is True
+        assert encoding is None
+        assert unicode_errors is None
+        super().__init__(file_like=file_like, read_size=read_size, use_list=use_list, raw=raw,
+                         object_hook=object_hook, object_pairs_hook=object_pairs_hook, list_hook=list_hook,
+                         encoding=encoding, unicode_errors=unicode_errors, max_buffer_size=max_buffer_size,
+                         ext_hook=ext_hook,
+                         max_str_len=max_str_len,
+                         max_bin_len=max_bin_len,
+                         max_array_len=max_array_len,
+                         max_map_len=max_map_len,
+                         max_ext_len=max_ext_len)
+
+    def unpack(self):
+        try:
+            return super().unpack()
+        except OutOfData:
+            raise
+        except Exception as e:
+            raise UnpackException(e)
+
+    def __next__(self):
+        try:
+            return super().__next__()
+        except StopIteration:
+            raise
+        except Exception as e:
+            raise UnpackException(e)
+
+    next = __next__
+
+
+def unpackb(packed, *, raw=True, encoding=None, unicode_errors=None, **kwargs):
+    assert raw is True
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_unpackb(packed, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise UnpackException(e)
+
+
+def unpack(stream, *, raw=True, encoding=None, unicode_errors=None, **kwargs):
+    assert raw is True
+    assert encoding is None
+    assert unicode_errors is None
+    try:
+        return mp_unpack(stream, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs)
+    except Exception as e:
+        raise UnpackException(e)
+
+
+# msgpacking related utilities -----------------------------------------------
 
 def is_slow_msgpack():
+    import msgpack
+    import msgpack.fallback
     return msgpack.Packer is msgpack.fallback.Packer
 
 
@@ -31,7 +167,6 @@ def get_limited_unpacker(kind):
                          max_map_len=MAX_ARCHIVES,  # list of archives
                          max_str_len=255,  # archive name
                          object_hook=StableDict,
-                         unicode_errors='surrogateescape',
                          ))
     elif kind == 'key':
         args.update(dict(use_list=True,  # default value
@@ -39,11 +174,10 @@ def get_limited_unpacker(kind):
                          max_map_len=10,  # EncryptedKey dict
                          max_str_len=4000,  # inner key data
                          object_hook=StableDict,
-                         unicode_errors='surrogateescape',
                          ))
     else:
         raise ValueError('kind must be "server", "client", "manifest" or "key"')
-    return msgpack.Unpacker(**args)
+    return Unpacker(**args)
 
 
 def bigint_to_int(mtime):

+ 1 - 2
src/borg/remote.py

@@ -15,8 +15,6 @@ import time
 import traceback
 from subprocess import Popen, PIPE
 
-import msgpack
-
 from . import __version__
 from .compress import LZ4
 from .constants import *  # NOQA
@@ -31,6 +29,7 @@ from .helpers import format_file_size
 from .helpers import truncate_and_unlink
 from .helpers import prepare_subprocess_env
 from .logger import create_logger, setup_logging
+from .helpers import msgpack
 from .repository import Repository
 from .version import parse_version, format_version
 from .algorithms.checksums import xxh64

+ 2 - 3
src/borg/repository.py

@@ -11,8 +11,6 @@ from datetime import datetime
 from functools import partial
 from itertools import islice
 
-import msgpack
-
 from .constants import *  # NOQA
 from .hashindex import NSIndex
 from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
@@ -22,6 +20,7 @@ from .helpers import bin_to_hex
 from .helpers import hostname_is_unique
 from .helpers import secure_erase, truncate_and_unlink
 from .helpers import Manifest
+from .helpers import msgpack
 from .locking import Lock, LockError, LockErrorT
 from .logger import create_logger
 from .lrucache import LRUCache
@@ -513,7 +512,7 @@ class Repository:
             try:
                 with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
                     hints = msgpack.unpack(fd)
-            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e:
+            except (msgpack.UnpackException, FileNotFoundError, FileIntegrityError) as e:
                 logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
                 if not isinstance(e, FileNotFoundError):
                     os.unlink(hints_path)

+ 1 - 1
src/borg/testsuite/archive.py

@@ -3,7 +3,6 @@ from datetime import datetime, timezone
 from io import StringIO
 from unittest.mock import Mock
 
-import msgpack
 import pytest
 
 from . import BaseTestCase
@@ -11,6 +10,7 @@ from ..crypto.key import PlaintextKey
 from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
 from ..archive import BackupOSError, backup_io, backup_io_iter
 from ..helpers import Manifest
+from ..helpers import msgpack
 from ..item import Item, ArchiveItem
 
 

+ 1 - 1
src/borg/testsuite/archiver.py

@@ -23,7 +23,6 @@ from hashlib import sha256
 from io import BytesIO, StringIO
 from unittest.mock import patch
 
-import msgpack
 import pytest
 
 try:
@@ -46,6 +45,7 @@ from ..helpers import Manifest, MandatoryFeatureUnsupported
 from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
 from ..helpers import bin_to_hex
 from ..helpers import MAX_S
+from ..helpers import msgpack
 from ..nanorst import RstToTextLazy, rst_to_terminal
 from ..patterns import IECommand, PatternMatcher, parse_pattern
 from ..item import Item, ItemDiff

+ 4 - 3
src/borg/testsuite/helpers.py

@@ -8,9 +8,6 @@ from time import sleep
 
 import pytest
 
-import msgpack
-import msgpack.fallback
-
 from .. import platform
 from ..helpers import Location
 from ..helpers import Buffer
@@ -19,6 +16,7 @@ from ..helpers import make_path_safe, clean_lines
 from ..helpers import interval, prune_within, prune_split
 from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir
 from ..helpers import is_slow_msgpack
+from ..helpers import msgpack
 from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
 from ..helpers import StableDict, int_to_bigint, bigint_to_int, bin_to_hex
 from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams
@@ -594,6 +592,9 @@ def test_parse_file_size_invalid(string):
 
 
 def test_is_slow_msgpack():
+    # we need to import upstream msgpack package here, not helpers.msgpack:
+    import msgpack
+    import msgpack.fallback
     saved_packer = msgpack.Packer
     try:
         msgpack.Packer = msgpack.fallback.Packer

+ 2 - 2
src/borg/testsuite/key.py

@@ -4,7 +4,6 @@ import re
 import tempfile
 from binascii import hexlify, unhexlify
 
-import msgpack
 import pytest
 
 from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex
@@ -19,6 +18,7 @@ from ..helpers import IntegrityError
 from ..helpers import Location
 from ..helpers import StableDict
 from ..helpers import get_security_dir
+from ..helpers import msgpack
 
 
 class TestKey:
@@ -333,7 +333,7 @@ class TestTAM:
             key.unpack_and_verify_manifest(blob)
 
         blob = b'\xc1\xc1\xc1'
-        with pytest.raises((ValueError, msgpack.UnpackException)):
+        with pytest.raises(msgpack.UnpackException):
             key.unpack_and_verify_manifest(blob)
 
     def test_missing_when_required(self, key):

+ 1 - 2
src/borg/testsuite/repository.py

@@ -6,13 +6,12 @@ import sys
 import tempfile
 from unittest.mock import patch
 
-import msgpack
-
 import pytest
 
 from ..hashindex import NSIndex
 from ..helpers import Location
 from ..helpers import IntegrityError
+from ..helpers import msgpack
 from ..locking import Lock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
 from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE