Browse Source

Merge pull request #6703 from ThomasWaldmann/r2r-transfer

borg2: "borg transfer" cmd (and also getting rid of legacy)
TW 3 years ago
parent
commit
86fe8bdd57

+ 0 - 3
docs/faq.rst

@@ -132,9 +132,6 @@ Which file types, attributes, etc. are *not* preserved?
       Archive extraction has optional support to extract all-zero chunks as
       holes in a sparse file.
     * Some filesystem specific attributes, like btrfs NOCOW, see :ref:`platforms`.
-    * For hardlinked symlinks, the hardlinking can not be archived (and thus,
-      the hardlinking will not be done at extraction time). The symlinks will
-      be archived and extracted as non-hardlinked symlinks, see :issue:`2379`.
 
 Are there other known limitations?
 ----------------------------------

+ 1 - 1
docs/internals/data-structures.rst

@@ -567,7 +567,7 @@ dictionary created by the ``Item`` class that contains:
 * uid
 * gid
 * mode (item type + permissions)
-* source (for symlinks, and for hardlinks within one archive)
+* source (for symlinks)
 * rdev (for device files)
 * mtime, atime, ctime in nanoseconds
 * xattrs

+ 1 - 1
docs/usage/general/file-metadata.rst.inc

@@ -10,7 +10,7 @@ Besides regular file and directory structures, Borg can preserve
   * FIFOs ("named pipes")
   * special file *contents* can be backed up in ``--read-special`` mode.
     By default the metadata to create them with mknod(2), mkfifo(2) etc. is stored.
-* hardlinked regular files, devices, FIFOs (considering all items in the same archive)
+* hardlinked regular files, devices, symlinks, FIFOs (considering all items in the same archive)
 * timestamps in nanosecond precision: mtime, atime, ctime
 * other timestamps: birthtime (on platforms supporting it)
 * permissions:

+ 116 - 198
src/borg/archive.py

@@ -28,7 +28,7 @@ from .constants import *  # NOQA
 from .crypto.low_level import IntegrityError as IntegrityErrorBase
 from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
 from .helpers import Manifest
-from .helpers import hardlinkable
+from .helpers import HardLinkManager
 from .helpers import ChunkIteratorFileWrapper, open_item
 from .helpers import Error, IntegrityError, set_ec
 from .platform import uid2user, user2uid, gid2group, group2gid
@@ -280,7 +280,7 @@ class DownloadPipeline:
         self.repository = repository
         self.key = key
 
-    def unpack_many(self, ids, filter=None, partial_extract=False, preload=False, hardlink_masters=None):
+    def unpack_many(self, ids, *, filter=None, preload=False):
         """
         Return iterator of items.
 
@@ -290,10 +290,7 @@ class DownloadPipeline:
         Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
         otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
         """
-        def _preload(chunks):
-            self.repository.preload([c.id for c in chunks])
-
-        masters_preloaded = set()
+        hlids_preloaded = set()
         unpacker = msgpack.Unpacker(use_list=False)
         for data in self.fetch_many(ids):
             unpacker.feed(data)
@@ -306,33 +303,20 @@ class DownloadPipeline:
                 items = [item for item in items if filter(item)]
 
             if preload:
-                if filter and partial_extract:
-                    # if we do only a partial extraction, it gets a bit
-                    # complicated with computing the preload items: if a hardlink master item is not
-                    # selected (== not extracted), we will still need to preload its chunks if a
-                    # corresponding hardlink slave is selected (== is extracted).
-                    # due to a side effect of the filter() call, we now have hardlink_masters dict populated.
-                    for item in items:
-                        if hardlinkable(item.mode):
-                            source = item.get('source')
-                            if source is None:  # maybe a hardlink master
-                                if 'chunks' in item:
-                                    _preload(item.chunks)
-                                # if this is a hl master, remember that we already preloaded all chunks of it (if any):
-                                if item.get('hardlink_master', True):
-                                    masters_preloaded.add(item.path)
-                            else:  # hardlink slave
-                                if source not in masters_preloaded:
-                                    # we only need to preload *once* (for the 1st selected slave)
-                                    chunks, _ = hardlink_masters[source]
-                                    if chunks is not None:
-                                        _preload(chunks)
-                                    masters_preloaded.add(source)
-                else:
-                    # easy: we do not have a filter, thus all items are selected, thus we need to preload all chunks.
-                    for item in items:
-                        if 'chunks' in item:
-                            _preload(item.chunks)
+                for item in items:
+                    if 'chunks' in item:
+                        hlid = item.get('hlid', None)
+                        if hlid is None:
+                            preload_chunks = True
+                        else:
+                            if hlid in hlids_preloaded:
+                                preload_chunks = False
+                            else:
+                                # not having the hardlink's chunks already preloaded for other hardlink to same inode
+                                preload_chunks = True
+                                hlids_preloaded.add(hlid)
+                        if preload_chunks:
+                            self.repository.preload([c.id for c in item.chunks])
 
             for item in items:
                 yield item
@@ -443,7 +427,6 @@ class Archive:
         self.repository = repository
         self.cache = cache
         self.manifest = manifest
-        self.hard_links = {}
         self.stats = Statistics(output_json=log_json, iec=iec)
         self.iec = iec
         self.show_progress = progress
@@ -489,7 +472,7 @@ class Archive:
     def _load_meta(self, id):
         data = self.key.decrypt(id, self.repository.get(id))
         metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
-        if metadata.version != 1:
+        if metadata.version not in (1, 2):  # legacy: still need to read v1 archives
             raise Exception('Unknown archive metadata version')
         return metadata
 
@@ -584,12 +567,10 @@ Utilization of max. archive size: {csize_max:.0%}
             return False
         return filter(item) if filter else True
 
-    def iter_items(self, filter=None, partial_extract=False, preload=False, hardlink_masters=None):
+    def iter_items(self, filter=None, preload=False):
         # note: when calling this with preload=True, later fetch_many() must be called with
         # is_preloaded=True or the RemoteRepository code will leak memory!
-        assert not (filter and partial_extract and preload) or hardlink_masters is not None
-        for item in self.pipeline.unpack_many(self.metadata.items, partial_extract=partial_extract,
-                                              preload=preload, hardlink_masters=hardlink_masters,
+        for item in self.pipeline.unpack_many(self.metadata.items, preload=preload,
                                               filter=lambda item: self.item_filter(item, filter)):
             yield item
 
@@ -620,7 +601,7 @@ Utilization of max. archive size: {csize_max:.0%}
         self.start = start
         self.end = end
         metadata = {
-            'version': 1,
+            'version': 2,
             'name': name,
             'comment': comment or '',
             'items': self.items_buffer.chunks,
@@ -719,33 +700,30 @@ Utilization of max. archive size: {csize_max:.0%}
         return stats
 
     @contextmanager
-    def extract_helper(self, dest, item, path, stripped_components, original_path, hardlink_masters):
+    def extract_helper(self, item, path, hlm, *, dry_run=False):
         hardlink_set = False
         # Hard link?
-        if 'source' in item:
-            source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
-            chunks, link_target = hardlink_masters.get(item.source, (None, source))
-            if link_target and has_link:
-                # Hard link was extracted previously, just link
-                with backup_io('link'):
-                    os.link(link_target, path)
-                    hardlink_set = True
-            elif chunks is not None:
-                # assign chunks to this item, since the item which had the chunks was not extracted
-                item.chunks = chunks
+        if 'hlid' in item:
+            link_target = hlm.retrieve(id=item.hlid)
+            if link_target is not None and has_link:
+                if not dry_run:
+                    # another hardlink to same inode (same hlid) was extracted previously, just link to it
+                    with backup_io('link'):
+                        os.link(link_target, path, follow_symlinks=False)
+                hardlink_set = True
         yield hardlink_set
-        if not hardlink_set and hardlink_masters:
-            if has_link:
-                # Update master entry with extracted item path, so that following hardlinks don't extract twice.
+        if not hardlink_set:
+            if 'hlid' in item and has_link:
+                # Update entry with extracted item path, so that following hardlinks don't extract twice.
                 # We have hardlinking support, so we will hardlink not extract.
-                hardlink_masters[item.get('source') or original_path] = (None, path)
+                hlm.remember(id=item.hlid, info=path)
             else:
                 # Broken platform with no hardlinking support.
                 # In this case, we *want* to extract twice, because there is no other way.
                 pass
 
     def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
-                     hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
+                     hlm=None, stripped_components=0, original_path=None, pi=None):
         """
         Extract archive item.
 
@@ -754,29 +732,33 @@ Utilization of max. archive size: {csize_max:.0%}
         :param dry_run: do not write any data
         :param stdout: write extracted data to stdout
         :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
-        :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
+        :param hlm: maps hlid to link_target for extracting subtrees with hardlinks correctly
         :param stripped_components: stripped leading path components to correct hard link extraction
         :param original_path: 'path' key as stored in archive
         :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
         """
-        hardlink_masters = hardlink_masters or {}
         has_damaged_chunks = 'chunks_healthy' in item
         if dry_run or stdout:
-            if 'chunks' in item:
-                item_chunks_size = 0
-                for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
-                    if pi:
-                        pi.show(increase=len(data), info=[remove_surrogates(item.path)])
-                    if stdout:
-                        sys.stdout.buffer.write(data)
-                    item_chunks_size += len(data)
-                if stdout:
-                    sys.stdout.buffer.flush()
-                if 'size' in item:
-                    item_size = item.size
-                    if item_size != item_chunks_size:
-                        raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format(
-                                          item_size, item_chunks_size))
+            with self.extract_helper(item, '', hlm, dry_run=dry_run or stdout) as hardlink_set:
+                if not hardlink_set:
+                    # it does not really set hardlinks due to dry_run, but we need to behave same
+                    # as non-dry_run concerning fetching preloaded chunks from the pipeline or
+                    # it would get stuck.
+                    if 'chunks' in item:
+                        item_chunks_size = 0
+                        for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
+                            if pi:
+                                pi.show(increase=len(data), info=[remove_surrogates(item.path)])
+                            if stdout:
+                                sys.stdout.buffer.write(data)
+                            item_chunks_size += len(data)
+                        if stdout:
+                            sys.stdout.buffer.flush()
+                        if 'size' in item:
+                            item_size = item.size
+                            if item_size != item_chunks_size:
+                                raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format(
+                                                  item_size, item_chunks_size))
             if has_damaged_chunks:
                 raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.')
             return
@@ -807,8 +789,7 @@ Utilization of max. archive size: {csize_max:.0%}
         if stat.S_ISREG(mode):
             with backup_io('makedirs'):
                 make_parent(path)
-            with self.extract_helper(dest, item, path, stripped_components, original_path,
-                                     hardlink_masters) as hardlink_set:
+            with self.extract_helper(item, path, hlm) as hardlink_set:
                 if hardlink_set:
                     return
                 with backup_io('open'):
@@ -847,24 +828,26 @@ Utilization of max. archive size: {csize_max:.0%}
                     self.restore_attrs(path, item)
             elif stat.S_ISLNK(mode):
                 make_parent(path)
-                source = item.source
-                try:
-                    os.symlink(source, path)
-                except UnicodeEncodeError:
-                    raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
-                self.restore_attrs(path, item, symlink=True)
+                with self.extract_helper(item, path, hlm) as hardlink_set:
+                    if hardlink_set:
+                        # unusual, but possible: this is a hardlinked symlink.
+                        return
+                    source = item.source
+                    try:
+                        os.symlink(source, path)
+                    except UnicodeEncodeError:
+                        raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
+                    self.restore_attrs(path, item, symlink=True)
             elif stat.S_ISFIFO(mode):
                 make_parent(path)
-                with self.extract_helper(dest, item, path, stripped_components, original_path,
-                                         hardlink_masters) as hardlink_set:
+                with self.extract_helper(item, path, hlm) as hardlink_set:
                     if hardlink_set:
                         return
                     os.mkfifo(path)
                     self.restore_attrs(path, item)
             elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
                 make_parent(path)
-                with self.extract_helper(dest, item, path, stripped_components, original_path,
-                                         hardlink_masters) as hardlink_set:
+                with self.extract_helper(item, path, hlm) as hardlink_set:
                     if hardlink_set:
                         return
                     os.mknod(path, item.mode, item.rdev)
@@ -1041,79 +1024,43 @@ Utilization of max. archive size: {csize_max:.0%}
         :param can_compare_chunk_ids: Whether --chunker-params are the same for both archives.
         """
 
-        def hardlink_master_seen(item):
-            return 'source' not in item or not hardlinkable(item.mode) or item.source in hardlink_masters
-
-        def is_hardlink_master(item):
-            return item.get('hardlink_master', True) and 'source' not in item and hardlinkable(item.mode)
-
-        def update_hardlink_masters(item1, item2):
-            if is_hardlink_master(item1) or is_hardlink_master(item2):
-                hardlink_masters[item1.path] = (item1, item2)
-
-        def has_hardlink_master(item, hardlink_masters):
-            return hardlinkable(item.mode) and item.get('source') in hardlink_masters
-
         def compare_items(item1, item2):
-            if has_hardlink_master(item1, hardlink_masters):
-                item1 = hardlink_masters[item1.source][0]
-            if has_hardlink_master(item2, hardlink_masters):
-                item2 = hardlink_masters[item2.source][1]
             return ItemDiff(item1, item2,
                             archive1.pipeline.fetch_many([c.id for c in item1.get('chunks', [])]),
                             archive2.pipeline.fetch_many([c.id for c in item2.get('chunks', [])]),
                             can_compare_chunk_ids=can_compare_chunk_ids)
 
-        def defer_if_necessary(item1, item2):
-            """Adds item tuple to deferred if necessary and returns True, if items were deferred"""
-            update_hardlink_masters(item1, item2)
-            defer = not hardlink_master_seen(item1) or not hardlink_master_seen(item2)
-            if defer:
-                deferred.append((item1, item2))
-            return defer
-
         orphans_archive1 = OrderedDict()
         orphans_archive2 = OrderedDict()
-        deferred = []
-        hardlink_masters = {}
 
         for item1, item2 in zip_longest(
                 archive1.iter_items(lambda item: matcher.match(item.path)),
                 archive2.iter_items(lambda item: matcher.match(item.path)),
         ):
             if item1 and item2 and item1.path == item2.path:
-                if not defer_if_necessary(item1, item2):
-                    yield (item1.path, compare_items(item1, item2))
+                yield (item1.path, compare_items(item1, item2))
                 continue
             if item1:
                 matching_orphan = orphans_archive2.pop(item1.path, None)
                 if matching_orphan:
-                    if not defer_if_necessary(item1, matching_orphan):
-                        yield (item1.path, compare_items(item1, matching_orphan))
+                    yield (item1.path, compare_items(item1, matching_orphan))
                 else:
                     orphans_archive1[item1.path] = item1
             if item2:
                 matching_orphan = orphans_archive1.pop(item2.path, None)
                 if matching_orphan:
-                    if not defer_if_necessary(matching_orphan, item2):
-                        yield (matching_orphan.path, compare_items(matching_orphan, item2))
+                    yield (matching_orphan.path, compare_items(matching_orphan, item2))
                 else:
                     orphans_archive2[item2.path] = item2
         # At this point orphans_* contain items that had no matching partner in the other archive
         for added in orphans_archive2.values():
             path = added.path
             deleted_item = Item.create_deleted(path)
-            update_hardlink_masters(deleted_item, added)
             yield (path, compare_items(deleted_item, added))
         for deleted in orphans_archive1.values():
             path = deleted.path
             deleted_item = Item.create_deleted(path)
-            update_hardlink_masters(deleted, deleted_item)
             yield (path, compare_items(deleted, deleted_item))
-        for item1, item2 in deferred:
-            assert hardlink_master_seen(item1)
-            assert hardlink_master_seen(item2)
-            yield (path, compare_items(item1, item2))
 
 
 class MetadataCollector:
@@ -1289,7 +1236,7 @@ class FilesystemObjectProcessors:
         self.show_progress = show_progress
         self.print_file_status = file_status_printer or (lambda *args: None)
 
-        self.hard_links = {}
+        self.hlm = HardLinkManager(id_type=tuple, info_type=(list, type(None)))  # (dev, ino) -> chunks or None
         self.stats = Statistics(output_json=log_json, iec=iec)  # threading: done by cache (including progress)
         self.cwd = os.getcwd()
         self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=sparse)
@@ -1298,29 +1245,32 @@ class FilesystemObjectProcessors:
     def create_helper(self, path, st, status=None, hardlinkable=True):
         safe_path = make_path_safe(path)
         item = Item(path=safe_path)
-        hardlink_master = False
         hardlinked = hardlinkable and st.st_nlink > 1
+        update_map = False
         if hardlinked:
-            source = self.hard_links.get((st.st_ino, st.st_dev))
-            if source is not None:
-                item.source = source
-                status = 'h'  # hardlink (to already seen inodes)
-            else:
-                hardlink_master = True
-        yield item, status, hardlinked, hardlink_master
-        # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
+            status = 'h'  # hardlink
+            nothing = object()
+            chunks = self.hlm.retrieve(id=(st.st_ino, st.st_dev), default=nothing)
+            if chunks is nothing:
+                update_map = True
+            elif chunks is not None:
+                item.chunks = chunks
+            item.hlid = self.hlm.hardlink_id_from_inode(ino=st.st_ino, dev=st.st_dev)
+        yield item, status, hardlinked
         self.add_item(item, stats=self.stats)
-        # ... and added to the archive, so we can remember it to refer to it later in the archive:
-        if hardlink_master:
-            self.hard_links[(st.st_ino, st.st_dev)] = safe_path
+        if update_map:
+            # remember the hlid of this fs object and if the item has chunks,
+            # also remember them, so we do not have to re-chunk a hardlink.
+            chunks = item.chunks if 'chunks' in item else None
+            self.hlm.remember(id=(st.st_ino, st.st_dev), info=chunks)
 
     def process_dir_with_fd(self, *, path, fd, st):
-        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
+        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked):
             item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
             return status
 
     def process_dir(self, *, path, parent_fd, name, st):
-        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
+        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked):
             with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_dir,
                         noatime=True, op='dir_open') as fd:
                 # fd is None for directories on windows, in that case a race condition check is not possible.
@@ -1331,7 +1281,7 @@ class FilesystemObjectProcessors:
                 return status
 
     def process_fifo(self, *, path, parent_fd, name, st):
-        with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master):  # fifo
+        with self.create_helper(path, st, 'f') as (item, status, hardlinked):  # fifo
             with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_normal, noatime=True) as fd:
                 with backup_io('fstat'):
                     st = stat_update_check(st, os.fstat(fd))
@@ -1339,7 +1289,7 @@ class FilesystemObjectProcessors:
                 return status
 
     def process_dev(self, *, path, parent_fd, name, st, dev_type):
-        with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master):  # char/block device
+        with self.create_helper(path, st, dev_type) as (item, status, hardlinked):  # char/block device
             # looks like we can not work fd-based here without causing issues when trying to open/close the device
             with backup_io('stat'):
                 st = stat_update_check(st, os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False))
@@ -1348,10 +1298,7 @@ class FilesystemObjectProcessors:
             return status
 
     def process_symlink(self, *, path, parent_fd, name, st):
-        # note: using hardlinkable=False because we can not support hardlinked symlinks,
-        #       due to the dual-use of item.source, see issue #2343:
-        # hardlinked symlinks will be archived [and extracted] as non-hardlinked symlinks.
-        with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
+        with self.create_helper(path, st, 's', hardlinkable=True) as (item, status, hardlinked):
             fname = name if name is not None and parent_fd is not None else path
             with backup_io('readlink'):
                 source = os.readlink(fname, dir_fd=parent_fd)
@@ -1384,7 +1331,7 @@ class FilesystemObjectProcessors:
         return status
 
     def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
-        with self.create_helper(path, st, None) as (item, status, hardlinked, hardlink_master):  # no status yet
+        with self.create_helper(path, st, None) as (item, status, hardlinked):  # no status yet
             with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd:
                 with backup_io('fstat'):
                     st = stat_update_check(st, os.fstat(fd))
@@ -1395,7 +1342,9 @@ class FilesystemObjectProcessors:
                     # so it can be extracted / accessed in FUSE mount like a regular file.
                     # this needs to be done early, so that part files also get the patched mode.
                     item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
-                if not hardlinked or hardlink_master:
+                if 'chunks' in item:  # create_helper might have put chunks from a previous hardlink there
+                    [cache.chunk_incref(id_, self.stats) for id_, _, _ in item.chunks]
+                else:  # normal case, no "2nd+" hardlink
                     if not is_special_file:
                         hashed_path = safe_encode(os.path.join(self.cwd, path))
                         path_hash = self.key.id_hash(hashed_path)
@@ -1420,7 +1369,6 @@ class FilesystemObjectProcessors:
                         status = 'M' if known else 'A'  # regular file, modified or added
                     self.print_file_status(status, path)
                     status = None  # we already printed the status
-                    item.hardlink_master = hardlinked
                     # Only chunkify the file if needed
                     if chunks is not None:
                         item.chunks = chunks
@@ -1444,7 +1392,7 @@ class FilesystemObjectProcessors:
                             # also, we must not memorize a potentially inconsistent/corrupt file that
                             # changed while we backed it up.
                             cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
-                    self.stats.nfiles += 1
+                self.stats.nfiles += 1
                 item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
                 item.get_size(memorize=True)
                 return status
@@ -1464,6 +1412,7 @@ class TarfileObjectProcessors:
 
         self.stats = Statistics(output_json=log_json, iec=iec)  # threading: done by cache (including progress)
         self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=False)
+        self.hlm = HardLinkManager(id_type=str, info_type=list)  # path -> chunks
 
     @contextmanager
     def create_helper(self, tarinfo, status=None, type=None):
@@ -1504,11 +1453,21 @@ class TarfileObjectProcessors:
             item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
             return status
 
-    def process_link(self, *, tarinfo, status, type):
+    def process_symlink(self, *, tarinfo, status, type):
         with self.create_helper(tarinfo, status, type) as (item, status):
             item.source = tarinfo.linkname
             return status
 
+    def process_hardlink(self, *, tarinfo, status, type):
+        with self.create_helper(tarinfo, status, type) as (item, status):
+            # create a not hardlinked borg item, reusing the chunks, see HardLinkManager.__doc__
+            chunks = self.hlm.retrieve(tarinfo.linkname)
+            if chunks is not None:
+                item.chunks = chunks
+            item.get_size(memorize=True, from_chunks=True)
+            self.stats.nfiles += 1
+            return status
+
     def process_file(self, *, tarinfo, status, type, tar):
         with self.create_helper(tarinfo, status, type) as (item, status):
             self.print_file_status(status, tarinfo.name)
@@ -1516,8 +1475,10 @@ class TarfileObjectProcessors:
             fd = tar.extractfile(tarinfo)
             self.process_file_chunks(item, self.cache, self.stats, self.show_progress,
                                      backup_io_iter(self.chunker.chunkify(fd)))
-            item.get_size(memorize=True)
+            item.get_size(memorize=True, from_chunks=True)
             self.stats.nfiles += 1
+            # we need to remember ALL files, see HardLinkManager.__doc__
+            self.hlm.remember(id=tarinfo.name, info=item.chunks)
             return status
 
 
@@ -1787,7 +1748,7 @@ class ArchiveChecker:
                 continue
             if not valid_msgpacked_dict(data, archive_keys_serialized):
                 continue
-            if b'cmdline' not in data or b'\xa7version\x01' not in data:
+            if b'cmdline' not in data or b'\xa7version\x02' not in data:
                 continue
             try:
                 archive = msgpack.unpackb(data)
@@ -1944,9 +1905,6 @@ class ArchiveChecker:
             def valid_item(obj):
                 if not isinstance(obj, StableDict):
                     return False, 'not a dictionary'
-                # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
-                # We ignore it here, should it exist. See test_attic013_acl_bug for details.
-                obj.pop(b'acl', None)
                 keys = set(obj)
                 if not required_item_keys.issubset(keys):
                     return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
@@ -2031,7 +1989,7 @@ class ArchiveChecker:
                     del self.manifest.archives[info.name]
                     continue
                 archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
-                if archive.version != 1:
+                if archive.version != 2:
                     raise Exception('Unknown archive metadata version')
                 archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
                 items_buffer = ChunkBuffer(self.key)
@@ -2130,34 +2088,11 @@ class ArchiveRecreater:
 
     def process_items(self, archive, target):
         matcher = self.matcher
-        target_is_subset = not matcher.empty()
-        hardlink_masters = {} if target_is_subset else None
-
-        def item_is_hardlink_master(item):
-            return (target_is_subset and
-                    hardlinkable(item.mode) and
-                    item.get('hardlink_master', True) and
-                    'source' not in item)
 
         for item in archive.iter_items():
             if not matcher.match(item.path):
                 self.print_file_status('x', item.path)
-                if item_is_hardlink_master(item):
-                    hardlink_masters[item.path] = (item.get('chunks'), item.get('chunks_healthy'), None)
                 continue
-            if target_is_subset and hardlinkable(item.mode) and item.get('source') in hardlink_masters:
-                # master of this hard link is outside the target subset
-                chunks, chunks_healthy, new_source = hardlink_masters[item.source]
-                if new_source is None:
-                    # First item to use this master, move the chunks
-                    item.chunks = chunks
-                    if chunks_healthy is not None:
-                        item.chunks_healthy = chunks_healthy
-                    hardlink_masters[item.source] = (None, None, item.path)
-                    del item.source
-                else:
-                    # Master was already moved, only update this item's source
-                    item.source = new_source
             if self.dry_run:
                 self.print_file_status('-', item.path)
             else:
@@ -2264,30 +2199,13 @@ class ArchiveRecreater:
         tag_files = []
         tagged_dirs = []
 
-        # to support reading hard-linked CACHEDIR.TAGs (aka CACHE_TAG_NAME), similar to hardlink_masters:
-        cachedir_masters = {}
-
-        if self.exclude_caches:
-            # sadly, due to how CACHEDIR.TAG works (filename AND file [header] contents) and
-            # how borg deals with hardlinks (slave hardlinks referring back to master hardlinks),
-            # we need to pass over the archive collecting hardlink master paths.
-            # as seen in issue #4911, the master paths can have an arbitrary filenames,
-            # not just CACHEDIR.TAG.
-            for item in archive.iter_items(filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME):
-                if stat.S_ISREG(item.mode) and 'chunks' not in item and 'source' in item:
-                    # this is a hardlink slave, referring back to its hardlink master (via item.source)
-                    cachedir_masters[item.source] = None  # we know the key (path), but not the value (item) yet
-
         for item in archive.iter_items(
                 filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME or matcher.match(item.path)):
-            if self.exclude_caches and item.path in cachedir_masters:
-                cachedir_masters[item.path] = item
             dir, tag_file = os.path.split(item.path)
             if tag_file in self.exclude_if_present:
                 exclude(dir, item)
             elif self.exclude_caches and tag_file == CACHE_TAG_NAME and stat.S_ISREG(item.mode):
-                content_item = item if 'chunks' in item else cachedir_masters[item.source]
-                file = open_item(archive, content_item)
+                file = open_item(archive, item)
                 if file.read(len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS:
                     exclude(dir, item)
         matcher.add(tag_files, IECommand.Include)

+ 194 - 58
src/borg/archiver.py

@@ -29,6 +29,7 @@ try:
     from contextlib import contextmanager
     from datetime import datetime, timedelta
     from io import TextIOWrapper
+    from struct import Struct
 
     from .logger import create_logger, setup_logging
 
@@ -44,7 +45,7 @@ try:
     from .archive import has_link
     from .cache import Cache, assert_secure, SecurityManager
     from .constants import *  # NOQA
-    from .compress import CompressionSpec
+    from .compress import CompressionSpec, ZLIB, ZLIB_legacy, ObfuscateSize
     from .crypto.key import key_creator, key_argument_names, tam_required_file, tam_required
     from .crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey, FlexiKey
     from .crypto.keymanager import KeyManager
@@ -59,7 +60,7 @@ try:
     from .helpers import timestamp
     from .helpers import get_cache_dir, os_stat
     from .helpers import Manifest, AI_HUMAN_SORT_KEYS
-    from .helpers import hardlinkable
+    from .helpers import HardLinkManager
     from .helpers import StableDict
     from .helpers import check_python, check_extension_modules
     from .helpers import dir_is_tagged, is_slow_msgpack, is_supported_msgpack, yes, sysinfo
@@ -338,6 +339,137 @@ class Archiver:
         ).serve()
         return EXIT_SUCCESS
 
+    @with_other_repository(manifest=True, key=True, compatibility=(Manifest.Operation.READ,))
+    @with_repository(exclusive=True, manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
+    def do_transfer(self, args, *,
+               repository, manifest, key, cache,
+               other_repository=None, other_manifest=None, other_key=None):
+        """archives transfer from other repository"""
+
+        ITEM_KEY_WHITELIST = {'path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hlid',
+                              'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime', 'birthtime', 'size',
+                              'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended',
+                              'part'}
+
+        def upgrade_item(item):
+            """upgrade item as needed, get rid of legacy crap"""
+            if hlm.borg1_hardlink_master(item):
+                item._dict['hlid'] = hlid = hlm.hardlink_id_from_path(item._dict['path'])
+                hlm.remember(id=hlid, info=(item._dict.get('chunks'), item._dict.get('chunks_healthy')))
+            elif hlm.borg1_hardlink_slave(item):
+                item._dict['hlid'] = hlid = hlm.hardlink_id_from_path(item._dict['source'])
+                chunks, chunks_healthy = hlm.retrieve(id=hlid, default=(None, None))
+                if chunks is not None:
+                    item._dict['chunks'] = chunks
+                    for chunk_id, _, _ in chunks:
+                        cache.chunk_incref(chunk_id, archive.stats)
+                if chunks_healthy is not None:
+                    item._dict['chunks_healthy'] = chunks
+                item._dict.pop('source')  # not used for hardlinks any more, replaced by hlid
+            for attr in 'atime', 'ctime', 'mtime', 'birthtime':
+                if attr in item:
+                    ns = getattr(item, attr)  # decode (bigint or Timestamp) --> int ns
+                    setattr(item, attr, ns)  # encode int ns --> msgpack.Timestamp only, no bigint any more
+            # make sure we only have desired stuff in the new item. specifically, make sure to get rid of:
+            # - 'acl' remnants of bug in attic <= 0.13
+            # - 'hardlink_master' (superseded by hlid)
+            new_item_dict = {key: value for key, value in item.as_dict().items() if key in ITEM_KEY_WHITELIST}
+            new_item = Item(internal_dict=new_item_dict)
+            new_item.get_size(memorize=True)  # if not already present: compute+remember size for items with chunks
+            assert all(key in new_item for key in REQUIRED_ITEM_KEYS)
+            return new_item
+
+        def upgrade_compressed_chunk(chunk):
+            def upgrade_zlib_and_level(chunk):
+                if ZLIB_legacy.detect(chunk):
+                    ctype = ZLIB.ID
+                    chunk = ctype + level + chunk  # get rid of the attic legacy: prepend separate type/level bytes
+                else:
+                    ctype = chunk[0:1]
+                    chunk = ctype + level + chunk[2:]  # keep type same, but set level
+                return chunk
+
+            ctype = chunk[0:1]
+            level = b'\xFF'  # FF means unknown compression level
+
+            if ctype == ObfuscateSize.ID:
+                # in older borg, we used unusual byte order
+                old_header_fmt = Struct('>I')
+                new_header_fmt = ObfuscateSize.header_fmt
+                length = ObfuscateSize.header_len
+                size_bytes = chunk[2:2+length]
+                size = old_header_fmt.unpack(size_bytes)
+                size_bytes = new_header_fmt.pack(size)
+                compressed = chunk[2+length:]
+                compressed = upgrade_zlib_and_level(compressed)
+                chunk = ctype + level + size_bytes + compressed
+            else:
+                chunk = upgrade_zlib_and_level(chunk)
+            return chunk
+
+        dry_run = args.dry_run
+
+        args.consider_checkpoints = True
+        archive_names = tuple(x.name for x in other_manifest.archives.list_considering(args))
+        if not archive_names:
+            return EXIT_SUCCESS
+
+        for name in archive_names:
+            transfer_size = 0
+            present_size = 0
+            if name in manifest.archives and not dry_run:
+                print(f"{name}: archive is already present in destination repo, skipping.")
+            else:
+                if not dry_run:
+                    print(f"{name}: copying archive to destination repo...")
+                hlm = HardLinkManager(id_type=bytes, info_type=tuple)  # hlid -> (chunks, chunks_healthy)
+                other_archive = Archive(other_repository, other_key, other_manifest, name)
+                archive = Archive(repository, key, manifest, name, cache=cache, create=True) if not dry_run else None
+                for item in other_archive.iter_items():
+                    if 'chunks' in item:
+                        chunks = []
+                        for chunk_id, size, _ in item.chunks:
+                            refcount = cache.seen_chunk(chunk_id, size)
+                            if refcount == 0:  # target repo does not yet have this chunk
+                                if not dry_run:
+                                    cdata = other_repository.get(chunk_id)
+                                    # keep compressed payload same, avoid decompression / recompression
+                                    data = other_key.decrypt(chunk_id, cdata, decompress=False)
+                                    data = upgrade_compressed_chunk(data)
+                                    chunk_entry = cache.add_chunk(chunk_id, data, archive.stats, wait=False,
+                                                                  compress=False, size=size)
+                                    cache.repository.async_response(wait=False)
+                                    chunks.append(chunk_entry)
+                                transfer_size += size
+                            else:
+                                if not dry_run:
+                                    chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
+                                    chunks.append(chunk_entry)
+                                present_size += size
+                        if not dry_run:
+                            item.chunks = chunks  # overwrite! IDs and sizes are same, csizes are likely different
+                            archive.stats.nfiles += 1
+                    if not dry_run:
+                        archive.add_item(upgrade_item(item))
+                if not dry_run:
+                    additional_metadata = {}
+                    # keep all metadata except archive version and stats. also do not keep
+                    # recreate_source_id, recreate_args, recreate_partial_chunks which were used only in 1.1.0b1 .. b2.
+                    for attr in ('cmdline', 'hostname', 'username', 'time', 'time_end', 'comment',
+                                 'chunker_params', 'recreate_cmdline'):
+                        if hasattr(other_archive.metadata, attr):
+                            additional_metadata[attr] = getattr(other_archive.metadata, attr)
+                    archive.save(stats=archive.stats, additional_metadata=additional_metadata)
+                    print(f"{name}: finished. "
+                          f"transfer_size: {format_file_size(transfer_size)} "
+                          f"present_size: {format_file_size(present_size)}")
+                else:
+                    print(f"{name}: completed" if transfer_size == 0 else
+                          f"{name}: incomplete, "
+                          f"transfer_size: {format_file_size(transfer_size)} "
+                          f"present_size: {format_file_size(present_size)}")
+        return EXIT_SUCCESS
+
     @with_repository(create=True, exclusive=True, manifest=False)
     @with_other_repository(key=True, compatibility=(Manifest.Operation.READ, ))
     def do_init(self, args, repository, *, other_repository=None, other_key=None):
@@ -1055,16 +1187,14 @@ class Archiver:
             self.print_file_status(status, path)
 
     @staticmethod
-    def build_filter(matcher, peek_and_store_hardlink_masters, strip_components):
+    def build_filter(matcher, strip_components):
         if strip_components:
             def item_filter(item):
                 matched = matcher.match(item.path) and os.sep.join(item.path.split(os.sep)[strip_components:])
-                peek_and_store_hardlink_masters(item, matched)
                 return matched
         else:
             def item_filter(item):
                 matched = matcher.match(item.path)
-                peek_and_store_hardlink_masters(item, matched)
                 return matched
         return item_filter
 
@@ -1087,33 +1217,18 @@ class Archiver:
         sparse = args.sparse
         strip_components = args.strip_components
         dirs = []
-        partial_extract = not matcher.empty() or strip_components
-        hardlink_masters = {} if partial_extract or not has_link else None
+        hlm = HardLinkManager(id_type=bytes, info_type=str)  # hlid -> path
 
-        def peek_and_store_hardlink_masters(item, matched):
-            # not has_link:
-            # OS does not have hardlink capability thus we need to remember the chunks so that
-            # we can extract all hardlinks as separate normal (not-hardlinked) files instead.
-            #
-            # partial_extract and not matched and hardlinkable:
-            # we do not extract the very first hardlink, so we need to remember the chunks
-            # in hardlinks_master, so we can use them when we extract some 2nd+ hardlink item
-            # that has no chunks list.
-            if ((not has_link or (partial_extract and not matched and hardlinkable(item.mode))) and
-                    (item.get('hardlink_master', True) and 'source' not in item)):
-                hardlink_masters[item.get('path')] = (item.get('chunks'), None)
-
-        filter = self.build_filter(matcher, peek_and_store_hardlink_masters, strip_components)
+        filter = self.build_filter(matcher, strip_components)
         if progress:
             pi = ProgressIndicatorPercent(msg='%5.1f%% Extracting: %s', step=0.1, msgid='extract')
             pi.output('Calculating total archive size for the progress indicator (might take long for large archives)')
-            extracted_size = sum(item.get_size(hardlink_masters) for item in archive.iter_items(filter))
+            extracted_size = sum(item.get_size() for item in archive.iter_items(filter))
             pi.total = extracted_size
         else:
             pi = None
 
-        for item in archive.iter_items(filter, partial_extract=partial_extract,
-                                       preload=True, hardlink_masters=hardlink_masters):
+        for item in archive.iter_items(filter, preload=True):
             orig_path = item.path
             if strip_components:
                 item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
@@ -1128,13 +1243,13 @@ class Archiver:
                 logging.getLogger('borg.output.list').info(remove_surrogates(item.path))
             try:
                 if dry_run:
-                    archive.extract_item(item, dry_run=True, pi=pi)
+                    archive.extract_item(item, dry_run=True, hlm=hlm, pi=pi)
                 else:
                     if stat.S_ISDIR(item.mode):
                         dirs.append(item)
                         archive.extract_item(item, stdout=stdout, restore_attrs=False)
                     else:
-                        archive.extract_item(item, stdout=stdout, sparse=sparse, hardlink_masters=hardlink_masters,
+                        archive.extract_item(item, stdout=stdout, sparse=sparse, hlm=hlm,
                                              stripped_components=strip_components, original_path=orig_path, pi=pi)
             except (BackupOSError, BackupError) as e:
                 self.print_warning('%s: %s', remove_surrogates(orig_path), e)
@@ -1199,15 +1314,9 @@ class Archiver:
         progress = args.progress
         output_list = args.output_list
         strip_components = args.strip_components
-        partial_extract = not matcher.empty() or strip_components
-        hardlink_masters = {} if partial_extract else None
-
-        def peek_and_store_hardlink_masters(item, matched):
-            if ((partial_extract and not matched and hardlinkable(item.mode)) and
-                    (item.get('hardlink_master', True) and 'source' not in item)):
-                hardlink_masters[item.get('path')] = (item.get('chunks'), None)
+        hlm = HardLinkManager(id_type=bytes, info_type=str)  # hlid -> path
 
-        filter = self.build_filter(matcher, peek_and_store_hardlink_masters, strip_components)
+        filter = self.build_filter(matcher, strip_components)
 
         # The | (pipe) symbol instructs tarfile to use a streaming mode of operation
         # where it never seeks on the passed fileobj.
@@ -1217,7 +1326,7 @@ class Archiver:
         if progress:
             pi = ProgressIndicatorPercent(msg='%5.1f%% Processing: %s', step=0.1, msgid='extract')
             pi.output('Calculating size')
-            extracted_size = sum(item.get_size(hardlink_masters) for item in archive.iter_items(filter))
+            extracted_size = sum(item.get_size() for item in archive.iter_items(filter))
             pi.total = extracted_size
         else:
             pi = None
@@ -1252,9 +1361,8 @@ class Archiver:
             tarinfo.gid = item.gid
             tarinfo.uname = item.user or ''
             tarinfo.gname = item.group or ''
-            # The linkname in tar has the same dual use the 'source' attribute of Borg items,
-            # i.e. for symlinks it means the destination, while for hardlinks it refers to the
-            # file.
+            # The linkname in tar has 2 uses:
+            # for symlinks it means the destination, while for hardlinks it refers to the file.
             # Since hardlinks in tar have a different type code (LNKTYPE) the format might
             # support hardlinking arbitrary objects (including symlinks and directories), but
             # whether implementations actually support that is a whole different question...
@@ -1263,23 +1371,16 @@ class Archiver:
             modebits = stat.S_IFMT(item.mode)
             if modebits == stat.S_IFREG:
                 tarinfo.type = tarfile.REGTYPE
-                if 'source' in item:
-                    source = os.sep.join(item.source.split(os.sep)[strip_components:])
-                    if hardlink_masters is None:
-                        linkname = source
-                    else:
-                        chunks, linkname = hardlink_masters.get(item.source, (None, source))
-                    if linkname:
-                        # Master was already added to the archive, add a hardlink reference to it.
+                if 'hlid' in item:
+                    linkname = hlm.retrieve(id=item.hlid)
+                    if linkname is not None:
+                        # the first hardlink was already added to the archive, add a tar-hardlink reference to it.
                         tarinfo.type = tarfile.LNKTYPE
                         tarinfo.linkname = linkname
-                    elif chunks is not None:
-                        # The item which has the chunks was not put into the tar, therefore
-                        # we do that now and update hardlink_masters to reflect that.
-                        item.chunks = chunks
+                    else:
                         tarinfo.size = item.get_size()
                         stream = item_content_stream(item)
-                        hardlink_masters[item.get('source') or original_path] = (None, item.path)
+                        hlm.remember(id=item.hlid, info=item.path)
                 else:
                     tarinfo.size = item.get_size()
                     stream = item_content_stream(item)
@@ -1337,8 +1438,7 @@ class Archiver:
                 ph['BORG.item.meta'] = meta_text
             return ph
 
-        for item in archive.iter_items(filter, partial_extract=partial_extract,
-                                       preload=True, hardlink_masters=hardlink_masters):
+        for item in archive.iter_items(filter, preload=True):
             orig_path = item.path
             if strip_components:
                 item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
@@ -1973,12 +2073,11 @@ class Archiver:
             elif tarinfo.isdir():
                 status = tfo.process_dir(tarinfo=tarinfo, status='d', type=stat.S_IFDIR)
             elif tarinfo.issym():
-                status = tfo.process_link(tarinfo=tarinfo, status='s', type=stat.S_IFLNK)
+                status = tfo.process_symlink(tarinfo=tarinfo, status='s', type=stat.S_IFLNK)
             elif tarinfo.islnk():
-                # tar uses the same hardlink model as borg (rather vice versa); the first instance of a hardlink
-                # is stored as a regular file, later instances are special entries referencing back to the
-                # first instance.
-                status = tfo.process_link(tarinfo=tarinfo, status='h', type=stat.S_IFREG)
+                # tar uses a hardlink model like: the first instance of a hardlink is stored as a regular file,
+                # later instances are special entries referencing back to the first instance.
+                status = tfo.process_hardlink(tarinfo=tarinfo, status='h', type=stat.S_IFREG)
             elif tarinfo.isblk():
                 status = tfo.process_dev(tarinfo=tarinfo, status='b', type=stat.S_IFBLK)
             elif tarinfo.ischr():
@@ -4083,6 +4182,43 @@ class Archiver:
                                help='archives to delete')
         define_archive_filters_group(subparser)
 
+        # borg transfer
+        transfer_epilog = process_epilog("""
+        This command transfers archives from one repository to another repository.
+
+        Suggested use:
+
+        # initialize DST_REPO reusing key material from SRC_REPO, so that
+        # chunking and chunk id generation will work in the same way as before.
+        borg init --other-location=SRC_REPO --encryption=DST_ENC DST_REPO
+
+        # transfer archives from SRC_REPO to DST_REPO
+        borg transfer --dry-run SRC_REPO DST_REPO  # check what it would do
+        borg transfer           SRC_REPO DST_REPO  # do it!
+        borg transfer --dry-run SRC_REPO DST_REPO  # check! anything left?
+
+        The default is to transfer all archives, including checkpoint archives.
+
+        You could use the misc. archive filter options to limit which archives it will
+        transfer, e.g. using the --prefix option. This is recommended for big
+        repositories with multiple data sets to keep the runtime per invocation lower.
+        """)
+        subparser = subparsers.add_parser('transfer', parents=[common_parser], add_help=False,
+                                          description=self.do_transfer.__doc__,
+                                          epilog=transfer_epilog,
+                                          formatter_class=argparse.RawDescriptionHelpFormatter,
+                                          help='transfer of archives from another repository')
+        subparser.set_defaults(func=self.do_transfer)
+        subparser.add_argument('-n', '--dry-run', dest='dry_run', action='store_true',
+                               help='do not change repository, just check')
+        subparser.add_argument('other_location', metavar='SRC_REPOSITORY',
+                               type=location_validator(archive=False, other=True),
+                               help='source repository')
+        subparser.add_argument('location', metavar='DST_REPOSITORY',
+                               type=location_validator(archive=False, other=False),
+                               help='destination repository')
+        define_archive_filters_group(subparser)
+
         # borg diff
         diff_epilog = process_epilog("""
             This command finds differences (file contents, user/group/mode) between archives.

+ 7 - 6
src/borg/cache.py

@@ -19,7 +19,7 @@ from .helpers import Location
 from .helpers import Error
 from .helpers import Manifest
 from .helpers import get_cache_dir, get_security_dir
-from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, parse_stringified_list
+from .helpers import bin_to_hex, parse_stringified_list
 from .helpers import format_file_size
 from .helpers import safe_ns
 from .helpers import yes
@@ -28,6 +28,7 @@ from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
 from .helpers import set_ec, EXIT_WARNING
 from .helpers import safe_unlink
 from .helpers import msgpack
+from .helpers.msgpack import int_to_timestamp, timestamp_to_int
 from .item import ArchiveItem, ChunkListEntry
 from .crypto.key import PlaintextKey
 from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
@@ -623,7 +624,7 @@ class LocalCache(CacheStatsMixin):
                     # this is to avoid issues with filesystem snapshots and cmtime granularity.
                     # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
                     entry = FileCacheEntry(*msgpack.unpackb(item))
-                    if entry.age == 0 and bigint_to_int(entry.cmtime) < self._newest_cmtime or \
+                    if entry.age == 0 and timestamp_to_int(entry.cmtime) < self._newest_cmtime or \
                        entry.age > 0 and entry.age < ttl:
                         msgpack.pack((path_hash, entry), fd)
                         entry_count += 1
@@ -756,7 +757,7 @@ class LocalCache(CacheStatsMixin):
             csize, data = decrypted_repository.get(archive_id)
             chunk_idx.add(archive_id, 1, len(data), csize)
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
-            if archive.version != 1:
+            if archive.version not in (1, 2):  # legacy
                 raise Exception('Unknown archive metadata version')
             sync = CacheSynchronizer(chunk_idx)
             for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)):
@@ -1018,10 +1019,10 @@ class LocalCache(CacheStatsMixin):
         if 'i' in cache_mode and entry.inode != st.st_ino:
             files_cache_logger.debug('KNOWN-CHANGED: file inode number has changed: %r', hashed_path)
             return True, None
-        if 'c' in cache_mode and bigint_to_int(entry.cmtime) != st.st_ctime_ns:
+        if 'c' in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
             files_cache_logger.debug('KNOWN-CHANGED: file ctime has changed: %r', hashed_path)
             return True, None
-        elif 'm' in cache_mode and bigint_to_int(entry.cmtime) != st.st_mtime_ns:
+        elif 'm' in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
             files_cache_logger.debug('KNOWN-CHANGED: file mtime has changed: %r', hashed_path)
             return True, None
         # we ignored the inode number in the comparison above or it is still same.
@@ -1049,7 +1050,7 @@ class LocalCache(CacheStatsMixin):
         elif 'm' in cache_mode:
             cmtime_type = 'mtime'
             cmtime_ns = safe_ns(st.st_mtime_ns)
-        entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_bigint(cmtime_ns), chunk_ids=ids)
+        entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids)
         self.files[path_hash] = msgpack.packb(entry)
         self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
         files_cache_logger.debug('FILES-CACHE-UPDATE: put %r [has %s] <- %r',

+ 74 - 27
src/borg/compress.pyx

@@ -56,16 +56,21 @@ cdef class CompressorBase:
     also handles compression format auto detection and
     adding/stripping the ID header (which enable auto detection).
     """
-    ID = b'\xFF\xFF'  # reserved and not used
-                      # overwrite with a unique 2-bytes bytestring in child classes
+    ID = b'\xFF'  # reserved and not used
+                  # overwrite with a unique 1-byte bytestring in child classes
     name = 'baseclass'
 
     @classmethod
     def detect(cls, data):
         return data.startswith(cls.ID)
 
-    def __init__(self, **kwargs):
-        pass
+    def __init__(self, level=255, **kwargs):
+        assert 0 <= level <= 255
+        if self.ID is not None:
+            self.id_level = self.ID + bytes((level, ))  # level 255 means "unknown level"
+            assert len(self.id_level) == 2
+        else:
+            self.id_level = None
 
     def decide(self, data):
         """
@@ -85,8 +90,8 @@ cdef class CompressorBase:
         Compress *data* (bytes) and return bytes result. Prepend the ID bytes of this compressor,
         which is needed so that the correct decompressor can be used for decompression.
         """
-        # add ID bytes
-        return self.ID + data
+        # add id_level bytes
+        return self.id_level + data
 
     def decompress(self, data):
         """
@@ -96,7 +101,7 @@ cdef class CompressorBase:
         Only handles input generated by _this_ Compressor - for a general purpose
         decompression method see *Compressor.decompress*.
         """
-        # strip ID bytes
+        # strip id_level bytes
         return data[2:]
 
 cdef class DecidingCompressor(CompressorBase):
@@ -106,8 +111,8 @@ cdef class DecidingCompressor(CompressorBase):
     """
     name = 'decidebaseclass'
 
-    def __init__(self, **kwargs):
-        super().__init__(**kwargs)
+    def __init__(self, level=255, **kwargs):
+        super().__init__(level=level, **kwargs)
 
     def _decide(self, data):
         """
@@ -148,9 +153,12 @@ class CNONE(CompressorBase):
     """
     none - no compression, just pass through data
     """
-    ID = b'\x00\x00'
+    ID = b'\x00'
     name = 'none'
 
+    def __init__(self, level=255, **kwargs):
+        super().__init__(level=level, **kwargs)  # no defined levels for CNONE, so just say "unknown"
+
     def compress(self, data):
         return super().compress(data)
 
@@ -170,11 +178,11 @@ class LZ4(DecidingCompressor):
         - wrapper releases CPython's GIL to support multithreaded code
         - uses safe lz4 methods that never go beyond the end of the output buffer
     """
-    ID = b'\x01\x00'
+    ID = b'\x01'
     name = 'lz4'
 
-    def __init__(self, **kwargs):
-        pass
+    def __init__(self, level=255, **kwargs):
+        super().__init__(level=level, **kwargs)  # no defined levels for LZ4, so just say "unknown"
 
     def _decide(self, idata):
         """
@@ -235,11 +243,11 @@ class LZMA(DecidingCompressor):
     """
     lzma compression / decompression
     """
-    ID = b'\x02\x00'
+    ID = b'\x02'
     name = 'lzma'
 
     def __init__(self, level=6, **kwargs):
-        super().__init__(**kwargs)
+        super().__init__(level=level, **kwargs)
         self.level = level
         if lzma is None:
             raise ValueError('No lzma support found.')
@@ -270,11 +278,11 @@ class ZSTD(DecidingCompressor):
     # This is a NOT THREAD SAFE implementation.
     # Only ONE python context must be created at a time.
     # It should work flawlessly as long as borg will call ONLY ONE compression job at time.
-    ID = b'\x03\x00'
+    ID = b'\x03'
     name = 'zstd'
 
     def __init__(self, level=3, **kwargs):
-        super().__init__(**kwargs)
+        super().__init__(level=level, **kwargs)
         self.level = level
 
     def _decide(self, idata):
@@ -331,14 +339,52 @@ class ZSTD(DecidingCompressor):
         return dest[:osize]
 
 
-class ZLIB(CompressorBase):
+class ZLIB(DecidingCompressor):
     """
     zlib compression / decompression (python stdlib)
     """
-    ID = b'\x08\x00'  # not used here, see detect()
-                      # avoid all 0x.8.. IDs elsewhere!
+    ID = b'\x05'
     name = 'zlib'
 
+    def __init__(self, level=6, **kwargs):
+        super().__init__(level=level, **kwargs)
+        self.level = level
+
+    def _decide(self, data):
+        """
+        Decides what to do with *data*. Returns (compressor, zlib_data).
+
+        *zlib_data* is the ZLIB result if *compressor* is ZLIB as well, otherwise it is None.
+        """
+        zlib_data = zlib.compress(data, self.level)
+        if len(zlib_data) < len(data):
+            return self, zlib_data
+        else:
+            return NONE_COMPRESSOR, None
+
+    def decompress(self, data):
+        data = super().decompress(data)
+        try:
+            return zlib.decompress(data)
+        except zlib.error as e:
+            raise DecompressionError(str(e)) from None
+
+
+class ZLIB_legacy(CompressorBase):
+    """
+    zlib compression / decompression (python stdlib)
+
+    Note: This is the legacy ZLIB support as used by borg < 1.3.
+          It still suffers from attic *only* supporting zlib and not having separate
+          ID bytes to differentiate between differently compressed chunks.
+          This just works because zlib compressed stuff always starts with 0x.8.. bytes.
+          Newer borg uses the ZLIB class that has separate ID bytes (as all the other
+          compressors) and does not need this hack.
+    """
+    ID = b'\x08'  # not used here, see detect()
+    # avoid all 0x.8 IDs elsewhere!
+    name = 'zlib_legacy'
+
     @classmethod
     def detect(cls, data):
         # matches misc. patterns 0x.8.. used by zlib
@@ -348,7 +394,7 @@ class ZLIB(CompressorBase):
         return check_ok and is_deflate
 
     def __init__(self, level=6, **kwargs):
-        super().__init__(**kwargs)
+        super().__init__(level=level, **kwargs)
         self.level = level
 
     def compress(self, data):
@@ -440,14 +486,14 @@ class ObfuscateSize(CompressorBase):
     """
     Meta-Compressor that obfuscates the compressed data size.
     """
-    ID = b'\x04\x00'
+    ID = b'\x04'
     name = 'obfuscate'
 
-    header_fmt = Struct('>I')
+    header_fmt = Struct('<I')
     header_len = len(header_fmt.pack(0))
 
     def __init__(self, level=None, compressor=None):
-        super().__init__()
+        super().__init__(level=level)  # data will be encrypted, so we can tell the level
         self.compressor = compressor
         if level is None:
             pass  # decompression
@@ -502,13 +548,14 @@ COMPRESSOR_TABLE = {
     CNONE.name: CNONE,
     LZ4.name: LZ4,
     ZLIB.name: ZLIB,
+    ZLIB_legacy.name: ZLIB_legacy,
     LZMA.name: LZMA,
     Auto.name: Auto,
     ZSTD.name: ZSTD,
     ObfuscateSize.name: ObfuscateSize,
 }
 # List of possible compression types. Does not include Auto, since it is a meta-Compressor.
-COMPRESSOR_LIST = [LZ4, ZSTD, CNONE, ZLIB, LZMA, ObfuscateSize, ]  # check fast stuff first
+COMPRESSOR_LIST = [LZ4, ZSTD, CNONE, ZLIB, ZLIB_legacy, LZMA, ObfuscateSize, ]  # check fast stuff first
 
 def get_compressor(name, **kwargs):
     cls = COMPRESSOR_TABLE[name]
@@ -554,7 +601,7 @@ class CompressionSpec:
         self.name = values[0]
         if self.name in ('none', 'lz4', ):
             return
-        elif self.name in ('zlib', 'lzma', ):
+        elif self.name in ('zlib', 'lzma', 'zlib_legacy'):  # zlib_legacy just for testing
             if count < 2:
                 level = 6  # default compression level in py stdlib
             elif count == 2:
@@ -597,7 +644,7 @@ class CompressionSpec:
     def compressor(self):
         if self.name in ('none', 'lz4', ):
             return get_compressor(self.name)
-        elif self.name in ('zlib', 'lzma', 'zstd', ):
+        elif self.name in ('zlib', 'lzma', 'zstd', 'zlib_legacy'):
             return get_compressor(self.name, level=self.level)
         elif self.name == 'auto':
             return get_compressor(self.name, compressor=self.inner.compressor)

+ 1 - 1
src/borg/constants.py

@@ -1,5 +1,5 @@
 # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
-ITEM_KEYS = frozenset(['path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hardlink_master',
+ITEM_KEYS = frozenset(['path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hardlink_master', 'hlid',
                        'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime', 'birthtime', 'size',
                        'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended',
                        'part'])

+ 14 - 25
src/borg/fuse.py

@@ -35,7 +35,8 @@ from .crypto.low_level import blake2b_128
 from .archiver import Archiver
 from .archive import Archive, get_item_uid_gid
 from .hashindex import FuseVersionsIndex
-from .helpers import daemonize, daemonizing, hardlinkable, signal_handler, format_file_size, Error
+from .helpers import daemonize, daemonizing, signal_handler, format_file_size, Error
+from .helpers import HardLinkManager
 from .helpers import msgpack
 from .item import Item
 from .lrucache import LRUCache
@@ -339,15 +340,9 @@ class FuseBackend:
                           consider_part_files=self._args.consider_part_files)
         strip_components = self._args.strip_components
         matcher = Archiver.build_matcher(self._args.patterns, self._args.paths)
-        partial_extract = not matcher.empty() or strip_components
-        hardlink_masters = {} if partial_extract else None
+        hlm = HardLinkManager(id_type=bytes, info_type=str)  # hlid -> path
 
-        def peek_and_store_hardlink_masters(item, matched):
-            if (partial_extract and not matched and hardlinkable(item.mode) and
-                    item.get('hardlink_master', True) and 'source' not in item):
-                hardlink_masters[item.get('path')] = (item.get('chunks'), None)
-
-        filter = Archiver.build_filter(matcher, peek_and_store_hardlink_masters, strip_components)
+        filter = Archiver.build_filter(matcher, strip_components)
         for item_inode, item in self.cache.iter_archive_items(archive.metadata.items, filter=filter,
                                                               consider_part_files=self._args.consider_part_files):
             if strip_components:
@@ -369,15 +364,13 @@ class FuseBackend:
             parent = 1
             for segment in segments[:-1]:
                 parent = self._process_inner(segment, parent)
-            self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode,
-                               hardlink_masters, strip_components)
+            self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode, hlm)
         duration = time.perf_counter() - t0
         logger.debug('fuse: _process_archive completed in %.1f s for archive %s', duration, archive.name)
 
-    def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode, hardlink_masters, stripped_components):
+    def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode, hlm):
         path = item.path
         del item.path  # save some space
-        hardlink_masters = hardlink_masters or {}
 
         def file_version(item, path):
             if 'chunks' in item:
@@ -402,10 +395,9 @@ class FuseBackend:
             version_enc = os.fsencode('.%05d' % version)
             return name + version_enc + ext
 
-        if 'source' in item and hardlinkable(item.mode):
-            source = os.sep.join(item.source.split(os.sep)[stripped_components:])
-            chunks, link_target = hardlink_masters.get(item.source, (None, source))
-            if link_target:
+        if 'hlid' in item:
+            link_target = hlm.retrieve(id=item.hlid, default=None)
+            if link_target is not None:
                 # Hard link was extracted previously, just link
                 link_target = os.fsencode(link_target)
                 if self.versions:
@@ -415,19 +407,16 @@ class FuseBackend:
                 try:
                     inode = self.find_inode(link_target, prefix)
                 except KeyError:
-                    logger.warning('Skipping broken hard link: %s -> %s', path, source)
+                    logger.warning('Skipping broken hard link: %s -> %s', path, link_target)
                     return
                 item = self.get_item(inode)
                 item.nlink = item.get('nlink', 1) + 1
                 self._items[inode] = item
-            elif chunks is not None:
-                # assign chunks to this item, since the item which had the chunks was not extracted
-                item.chunks = chunks
+            else:
                 inode = item_inode
                 self._items[inode] = item
-                if hardlink_masters:
-                    # Update master entry with extracted item path, so that following hardlinks don't extract twice.
-                    hardlink_masters[item.source] = (None, path)
+                # remember extracted item path, so that following hardlinks don't extract twice.
+                hlm.remember(id=item.hlid, info=path)
         else:
             inode = item_inode
 
@@ -436,7 +425,7 @@ class FuseBackend:
             enc_path = os.fsencode(path)
             version = file_version(item, enc_path)
             if version is not None:
-                # regular file, with contents - maybe a hardlink master
+                # regular file, with contents
                 name = make_versioned_name(name, version)
                 self.file_versions[enc_path] = version
 

+ 71 - 3
src/borg/helpers/fs.py

@@ -1,4 +1,5 @@
 import errno
+import hashlib
 import os
 import os.path
 import re
@@ -165,9 +166,76 @@ def make_path_safe(path):
     return _safe_re.sub('', path) or '.'
 
 
-def hardlinkable(mode):
-    """return True if we support hardlinked items of this type"""
-    return stat.S_ISREG(mode) or stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
+class HardLinkManager:
+    """
+    Manage hardlinks (and avoid code duplication doing so).
+
+    A) When creating a borg2 archive from the filesystem, we have to maintain a mapping like:
+       (dev, ino) -> (hlid, chunks)  # for fs_hl_targets
+       If we encounter the same (dev, ino) again later, we'll just re-use the hlid and chunks list.
+
+    B) When extracting a borg2 archive to the filesystem, we have to maintain a mapping like:
+       hlid -> path
+       If we encounter the same hlid again later, we hardlink to the path of the already extracted content of same hlid.
+
+    C) When transferring from a borg1 archive, we need:
+       path -> chunks, chunks_healthy  # for borg1_hl_targets
+       If we encounter a regular file item with source == path later, we reuse chunks and chunks_healthy
+       and create the same hlid = hardlink_id_from_path(source).
+
+    D) When importing a tar file (simplified 1-pass way for now, not creating borg hardlink items):
+       path -> chunks
+       If we encounter a LNK tar entry later with linkname==path, we re-use the chunks and create a regular file item.
+       For better hardlink support (including the very first hardlink item for each group of same-target hardlinks),
+       we would need a 2-pass processing, which is not yet implemented.
+    """
+    def __init__(self, *, id_type, info_type):
+        self._map = {}
+        self.id_type = id_type
+        self.info_type = info_type
+
+    def borg1_hardlinkable(self, mode):  # legacy
+        return stat.S_ISREG(mode) or stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
+
+    def borg1_hardlink_master(self, item):  # legacy
+        return item.get('hardlink_master', True) and 'source' not in item and self.borg1_hardlinkable(item.mode)
+
+    def borg1_hardlink_slave(self, item):  # legacy
+        return 'source' in item and self.borg1_hardlinkable(item.mode)
+
+    def hardlink_id_from_path(self, path):
+        """compute a hardlink id from a path"""
+        assert isinstance(path, bytes)
+        return hashlib.sha256(path).digest()
+
+    def hardlink_id_from_inode(self, *, ino, dev):
+        """compute a hardlink id from an inode"""
+        assert isinstance(ino, int)
+        assert isinstance(dev, int)
+        return hashlib.sha256(f'{ino}/{dev}'.encode()).digest()
+
+    def remember(self, *, id, info):
+        """
+        remember stuff from a (usually contentful) item.
+
+        :param id: some id used to reference to the contentful item, could be:
+                   a path (tar style, old borg style) [bytes]
+                   a hlid (new borg style) [bytes]
+                   a (dev, inode) tuple (filesystem)
+        :param info: information to remember, could be:
+                     chunks / chunks_healthy list
+                     hlid
+        """
+        assert isinstance(id, self.id_type), f"key is {key!r}, not of type {self.key_type}"
+        assert isinstance(info, self.info_type), f"info is {info!r}, not of type {self.info_type}"
+        self._map[id] = info
+
+    def retrieve(self, id, *, default=None):
+        """
+        retrieve stuff to use it in a (usually contentless) item.
+        """
+        assert isinstance(id, self.id_type)
+        return self._map.get(id, default)
 
 
 def scandir_keyfunc(dirent):

+ 14 - 3
src/borg/helpers/msgpack.py

@@ -24,7 +24,7 @@ from msgpack import unpackb as mp_unpackb
 from msgpack import unpack as mp_unpack
 from msgpack import version as mp_version
 
-from msgpack import ExtType
+from msgpack import ExtType, Timestamp
 from msgpack import OutOfData
 
 
@@ -164,7 +164,7 @@ def get_limited_unpacker(kind):
     return Unpacker(**args)
 
 
-def bigint_to_int(mtime):
+def bigint_to_int(mtime):  # legacy
     """Convert bytearray to int
     """
     if isinstance(mtime, bytes):
@@ -172,7 +172,7 @@ def bigint_to_int(mtime):
     return mtime
 
 
-def int_to_bigint(value):
+def int_to_bigint(value):  # legacy
     """Convert integers larger than 64 bits to bytearray
 
     Smaller integers are left alone
@@ -180,3 +180,14 @@ def int_to_bigint(value):
     if value.bit_length() > 63:
         return value.to_bytes((value.bit_length() + 9) // 8, 'little', signed=True)
     return value
+
+
+def int_to_timestamp(ns):
+    return Timestamp.from_unix_nano(ns)
+
+
+def timestamp_to_int(ts):
+    if isinstance(ts, Timestamp):
+        return ts.to_unix_nano()
+    # legacy support note: we need to keep the bigint conversion for compatibility with borg < 1.3 archives.
+    return bigint_to_int(ts)

+ 10 - 7
src/borg/helpers/parseformat.py

@@ -19,6 +19,7 @@ logger = create_logger()
 
 from .errors import Error
 from .fs import get_keys_dir
+from .msgpack import Timestamp
 from .time import OutputTimestamp, format_time, to_localtime, safe_timestamp, safe_s
 from .. import __version__ as borg_version
 from .. import __version_tuple__ as borg_version_tuple
@@ -694,7 +695,8 @@ class ItemFormatter(BaseFormatter):
     KEY_DESCRIPTIONS = {
         'bpath': 'verbatim POSIX path, can contain any character except NUL',
         'path': 'path interpreted as text (might be missing non-text characters, see bpath)',
-        'source': 'link target for links (identical to linktarget)',
+        'source': 'link target for symlinks (identical to linktarget)',
+        'hlid': 'hard link identity (same if hardlinking same fs object)',
         'extra': 'prepends {source} with " -> " for soft links and " link to " for hard links',
         'csize': 'compressed size',
         'dsize': 'deduplicated size',
@@ -705,7 +707,7 @@ class ItemFormatter(BaseFormatter):
         'health': 'either "healthy" (file ok) or "broken" (if file has all-zero replacement chunks)',
     }
     KEY_GROUPS = (
-        ('type', 'mode', 'uid', 'gid', 'user', 'group', 'path', 'bpath', 'source', 'linktarget', 'flags'),
+        ('type', 'mode', 'uid', 'gid', 'user', 'group', 'path', 'bpath', 'source', 'linktarget', 'hlid', 'flags'),
         ('size', 'csize', 'dsize', 'dcsize', 'num_chunks', 'unique_chunks'),
         ('mtime', 'ctime', 'atime', 'isomtime', 'isoctime', 'isoatime'),
         tuple(sorted(hash_algorithms)),
@@ -801,11 +803,9 @@ class ItemFormatter(BaseFormatter):
         extra = ''
         if source:
             source = remove_surrogates(source)
-            if item_type == 'l':
-                extra = ' -> %s' % source
-            else:
-                mode = 'h' + mode[1:]
-                extra = ' link to %s' % source
+            extra = ' -> %s' % source
+        hlid = item.get('hlid')
+        hlid = bin_to_hex(hlid) if hlid else ''
         item_data['type'] = item_type
         item_data['mode'] = mode
         item_data['user'] = item.user or item.uid
@@ -821,6 +821,7 @@ class ItemFormatter(BaseFormatter):
             item_data['health'] = 'broken' if 'chunks_healthy' in item else 'healthy'
         item_data['source'] = source
         item_data['linktarget'] = source
+        item_data['hlid'] = hlid
         item_data['flags'] = item.get('bsdflags')
         for key in self.used_call_keys:
             item_data[key] = self.call_keys[key](item)
@@ -1043,6 +1044,8 @@ def prepare_dump_dict(d):
                 value = decode_tuple(value)
             elif isinstance(value, bytes):
                 value = decode_bytes(value)
+            elif isinstance(value, Timestamp):
+                value = value.to_unix_nano()
             if isinstance(key, bytes):
                 key = key.decode()
             res[key] = value

+ 7 - 8
src/borg/item.pyx

@@ -3,9 +3,9 @@ from collections import namedtuple
 
 from .constants import ITEM_KEYS, ARCHIVE_KEYS
 from .helpers import safe_encode, safe_decode
-from .helpers import bigint_to_int, int_to_bigint
 from .helpers import StableDict
 from .helpers import format_file_size
+from .helpers.msgpack import timestamp_to_int, int_to_timestamp
 
 
 cdef extern from "_item.c":
@@ -171,17 +171,17 @@ class Item(PropDict):
     rdev = PropDict._make_property('rdev', int)
     bsdflags = PropDict._make_property('bsdflags', int)
 
-    # note: we need to keep the bigint conversion for compatibility with borg 1.0 archives.
-    atime = PropDict._make_property('atime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
-    ctime = PropDict._make_property('ctime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
-    mtime = PropDict._make_property('mtime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
-    birthtime = PropDict._make_property('birthtime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
+    atime = PropDict._make_property('atime', int, 'int (ns)', encode=int_to_timestamp, decode=timestamp_to_int)
+    ctime = PropDict._make_property('ctime', int, 'int (ns)', encode=int_to_timestamp, decode=timestamp_to_int)
+    mtime = PropDict._make_property('mtime', int, 'int (ns)', encode=int_to_timestamp, decode=timestamp_to_int)
+    birthtime = PropDict._make_property('birthtime', int, 'int (ns)', encode=int_to_timestamp, decode=timestamp_to_int)
 
     # size is only present for items with a chunk list and then it is sum(chunk_sizes)
     # compatibility note: this is a new feature, in old archives size will be missing.
     size = PropDict._make_property('size', int)
 
-    hardlink_master = PropDict._make_property('hardlink_master', bool)
+    hlid = PropDict._make_property('hlid', bytes)  # hard link id: same value means same hard link.
+    hardlink_master = PropDict._make_property('hardlink_master', bool)  # legacy
 
     chunks = PropDict._make_property('chunks', (list, type(None)), 'list or None')
     chunks_healthy = PropDict._make_property('chunks_healthy', (list, type(None)), 'list or None')
@@ -214,7 +214,6 @@ class Item(PropDict):
         except AttributeError:
             if stat.S_ISLNK(self.mode):
                 # get out of here quickly. symlinks have no own chunks, their fs size is the length of the target name.
-                # also, there is the dual-use issue of .source (#2343), so don't confuse it with a hardlink slave.
                 return len(self.source)
             # no precomputed (c)size value available, compute it:
             try:

+ 35 - 44
src/borg/testsuite/archiver.py

@@ -321,7 +321,7 @@ class ArchiverTestCaseBase(BaseTestCase):
                 contents = b'X' * size
             fd.write(contents)
 
-    def create_test_files(self):
+    def create_test_files(self, create_hardlinks=True):
         """Create a minimal test case including all supported file types
         """
         # File
@@ -332,7 +332,7 @@ class ArchiverTestCaseBase(BaseTestCase):
         # File mode
         os.chmod('input/file1', 0o4755)
         # Hard link
-        if are_hardlinks_supported():
+        if are_hardlinks_supported() and create_hardlinks:
             os.link(os.path.join(self.input_path, 'file1'),
                     os.path.join(self.input_path, 'hardlink'))
         # Symlink
@@ -432,7 +432,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.assert_in(name, list_output)
         self.assert_dirs_equal('input', 'output/input')
         info_output = self.cmd('info', self.repository_location + '::test')
-        item_count = 4 if has_lchflags else 5  # one file is UF_NODUMP
+        item_count = 5 if has_lchflags else 6  # one file is UF_NODUMP
         self.assert_in('Number of files: %d' % item_count, info_output)
         shutil.rmtree(self.cache_path)
         info_output2 = self.cmd('info', self.repository_location + '::test')
@@ -506,6 +506,29 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             self.cmd('extract', self.repository_location + '::test')
             assert os.readlink('input/link1') == 'somewhere'
 
+    @pytest.mark.skipif(not are_symlinks_supported() or not are_hardlinks_supported(),
+                        reason='symlinks or hardlinks not supported')
+    def test_hardlinked_symlinks_extract(self):
+        self.create_regular_file('target', size=1024)
+        with changedir('input'):
+            os.symlink('target', 'symlink1')
+            os.link('symlink1', 'symlink2', follow_symlinks=False)
+        self.cmd('init', '--encryption=repokey', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        with changedir('output'):
+            output = self.cmd('extract', self.repository_location + '::test')
+            print(output)
+            with changedir('input'):
+                assert os.path.exists('target')
+                assert os.readlink('symlink1') == 'target'
+                assert os.readlink('symlink2') == 'target'
+                st1 = os.stat('symlink1', follow_symlinks=False)
+                st2 = os.stat('symlink2', follow_symlinks=False)
+                assert st1.st_nlink == 2
+                assert st2.st_nlink == 2
+                assert st1.st_ino == st2.st_ino
+                assert st1.st_size == st2.st_size
+
     @pytest.mark.skipif(not is_utime_fully_supported(), reason='cannot properly setup and execute test without utime')
     def test_atime(self):
         def has_noatime(some_file):
@@ -2442,7 +2465,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
     def test_compression_zlib_compressible(self):
         size, csize = self._get_sizes('zlib', compressible=True)
         assert csize < size * 0.1
-        assert csize == 35
+        assert csize == 37
 
     def test_compression_zlib_uncompressible(self):
         size, csize = self._get_sizes('zlib', compressible=False)
@@ -2451,7 +2474,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
     def test_compression_auto_compressible(self):
         size, csize = self._get_sizes('auto,zlib', compressible=True)
         assert csize < size * 0.1
-        assert csize == 35  # same as compression 'zlib'
+        assert csize == 37  # same as compression 'zlib'
 
     def test_compression_auto_uncompressible(self):
         size, csize = self._get_sizes('auto,zlib', compressible=False)
@@ -2661,7 +2684,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
                 hl3 = os.path.join(mountpoint, 'input', 'hardlink3', 'hardlink3.00001')
                 assert os.stat(hl1).st_ino == os.stat(hl2).st_ino == os.stat(hl3).st_ino
                 assert open(hl3, 'rb').read() == b'123456'
-        # similar again, but exclude the hardlink master:
+        # similar again, but exclude the 1st hardlink:
         with self.fuse_mount(self.repository_location, mountpoint, '-o', 'versions', '-e', 'input/hardlink1'):
             if are_hardlinks_supported():
                 hl2 = os.path.join(mountpoint, 'input', 'hardlink2', 'hardlink2.00001')
@@ -3475,7 +3498,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
             assert os.stat('input/dir1/source2').st_nlink == 2
 
     def test_import_tar(self, tar_format='PAX'):
-        self.create_test_files()
+        self.create_test_files(create_hardlinks=False)  # hardlinks become separate files
         os.unlink('input/flagfile')
         self.cmd('init', '--encryption=none', self.repository_location)
         self.cmd('create', self.repository_location + '::src', 'input')
@@ -3489,7 +3512,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
     def test_import_tar_gz(self, tar_format='GNU'):
         if not shutil.which('gzip'):
             pytest.skip('gzip is not installed')
-        self.create_test_files()
+        self.create_test_files(create_hardlinks=False)  # hardlinks become separate files
         os.unlink('input/flagfile')
         self.cmd('init', '--encryption=none', self.repository_location)
         self.cmd('create', self.repository_location + '::src', 'input')
@@ -3850,7 +3873,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
                 'username': 'bar',
                 'name': 'archive1',
                 'time': '2016-12-15T18:49:51.849711',
-                'version': 1,
+                'version': 2,
             })
             archive_id = key.id_hash(archive)
             repository.put(archive_id, key.encrypt(archive_id, archive))
@@ -3907,35 +3930,6 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
             repository.commit(compact=False)
         self.cmd('check', self.repository_location, exit_code=1)
 
-    def test_attic013_acl_bug(self):
-        # Attic up to release 0.13 contained a bug where every item unintentionally received
-        # a b'acl'=None key-value pair.
-        # This bug can still live on in Borg repositories (through borg upgrade).
-        class Attic013Item:
-            def as_dict(self):
-                return {
-                    # These are required
-                    b'path': '1234',
-                    b'mtime': 0,
-                    b'mode': 0,
-                    b'user': b'0',
-                    b'group': b'0',
-                    b'uid': 0,
-                    b'gid': 0,
-                    # acl is the offending key.
-                    b'acl': None,
-                }
-
-        archive, repository = self.open_archive('archive1')
-        with repository:
-            manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
-            with Cache(repository, key, manifest) as cache:
-                archive = Archive(repository, key, manifest, '0.13', cache=cache, create=True)
-                archive.items_buffer.add(Attic013Item())
-                archive.save()
-        self.cmd('check', self.repository_location, exit_code=0)
-        self.cmd('list', self.repository_location + '::0.13', exit_code=0)
-
 
 class ManifestAuthenticationTest(ArchiverTestCaseBase):
     def spoof_manifest(self, repository):
@@ -4473,26 +4467,23 @@ def test_chunk_content_equal():
 
 
 class TestBuildFilter:
-    @staticmethod
-    def peek_and_store_hardlink_masters(item, matched):
-        pass
 
     def test_basic(self):
         matcher = PatternMatcher()
         matcher.add([parse_pattern('included')], IECommand.Include)
-        filter = Archiver.build_filter(matcher, self.peek_and_store_hardlink_masters, 0)
+        filter = Archiver.build_filter(matcher, 0)
         assert filter(Item(path='included'))
         assert filter(Item(path='included/file'))
         assert not filter(Item(path='something else'))
 
     def test_empty(self):
         matcher = PatternMatcher(fallback=True)
-        filter = Archiver.build_filter(matcher, self.peek_and_store_hardlink_masters, 0)
+        filter = Archiver.build_filter(matcher, 0)
         assert filter(Item(path='anything'))
 
     def test_strip_components(self):
         matcher = PatternMatcher(fallback=True)
-        filter = Archiver.build_filter(matcher, self.peek_and_store_hardlink_masters, strip_components=1)
+        filter = Archiver.build_filter(matcher, strip_components=1)
         assert not filter(Item(path='shallow'))
         assert not filter(Item(path='shallow/'))  # can this even happen? paths are normalized...
         assert filter(Item(path='deep enough/file'))

+ 2 - 2
src/borg/testsuite/compress.py

@@ -88,11 +88,11 @@ def test_autodetect_invalid():
         Compressor(**params).decompress(b'\x08\x00notreallyzlib')
 
 
-def test_zlib_compat():
+def test_zlib_legacy_compat():
     # for compatibility reasons, we do not add an extra header for zlib,
     # nor do we expect one when decompressing / autodetecting
     for level in range(10):
-        c = get_compressor(name='zlib', level=level)
+        c = get_compressor(name='zlib_legacy', level=level)
         cdata1 = c.compress(data)
         cdata2 = zlib.compress(data, level)
         assert cdata1 == cdata2

+ 4 - 3
src/borg/testsuite/item.py

@@ -3,6 +3,7 @@ import pytest
 from ..cache import ChunkListEntry
 from ..item import Item
 from ..helpers import StableDict
+from ..helpers.msgpack import Timestamp
 
 
 def test_item_empty():
@@ -77,15 +78,15 @@ def test_item_int_property():
         item.mode = "invalid"
 
 
-def test_item_bigint_property():
+def test_item_mptimestamp_property():
     item = Item()
     small, big = 42, 2 ** 65
     item.atime = small
     assert item.atime == small
-    assert item.as_dict() == {'atime': small}
+    assert item.as_dict() == {'atime': Timestamp.from_unix_nano(small)}
     item.atime = big
     assert item.atime == big
-    assert item.as_dict() == {'atime': b'\0' * 8 + b'\x02'}
+    assert item.as_dict() == {'atime': Timestamp.from_unix_nano(big)}
 
 
 def test_item_user_group_none():

+ 4 - 4
src/borg/testsuite/key.py

@@ -256,8 +256,8 @@ class TestKey:
         plaintext = b'123456789'
         id = key.id_hash(plaintext)
         authenticated = key.encrypt(id, plaintext)
-        # 0x07 is the key TYPE, \x0000 identifies no compression.
-        assert authenticated == b'\x07\x00\x00' + plaintext
+        # 0x07 is the key TYPE, \x00ff identifies no compression / unknown level.
+        assert authenticated == b'\x07\x00\xff' + plaintext
 
     def test_blake2_authenticated_encrypt(self, monkeypatch):
         monkeypatch.setenv('BORG_PASSPHRASE', 'test')
@@ -267,8 +267,8 @@ class TestKey:
         plaintext = b'123456789'
         id = key.id_hash(plaintext)
         authenticated = key.encrypt(id, plaintext)
-        # 0x06 is the key TYPE, 0x0000 identifies no compression.
-        assert authenticated == b'\x06\x00\x00' + plaintext
+        # 0x06 is the key TYPE, 0x00ff identifies no compression / unknown level.
+        assert authenticated == b'\x06\x00\xff' + plaintext
 
 
 class TestTAM: