Răsfoiți Sursa

Merge pull request #5650 from ThomasWaldmann/create-from-tar

implement simple import-tar
TW 4 ani în urmă
părinte
comite
fd02923fac

+ 54 - 0
src/borg/archive.py

@@ -1392,6 +1392,60 @@ class FilesystemObjectProcessors:
                 return status
 
 
+class TarfileObjectProcessors:
+    def __init__(self, *, cache, key,
+                 add_item, process_file_chunks,
+                 chunker_params, show_progress,
+                 log_json, iec, file_status_printer=None):
+        self.cache = cache
+        self.key = key
+        self.add_item = add_item
+        self.process_file_chunks = process_file_chunks
+        self.show_progress = show_progress
+        self.print_file_status = file_status_printer or (lambda *args: None)
+
+        self.stats = Statistics(output_json=log_json, iec=iec)  # threading: done by cache (including progress)
+        self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=False)
+
+    @contextmanager
+    def create_helper(self, tarinfo, status=None, type=None):
+        item = Item(path=make_path_safe(tarinfo.name), mode=tarinfo.mode | type,
+                    uid=tarinfo.uid, gid=tarinfo.gid, user=tarinfo.uname, group=tarinfo.gname,
+                    mtime=tarinfo.mtime * 1000**3)
+        yield item, status
+        # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
+        self.add_item(item, stats=self.stats)
+
+    def process_dir(self, *, tarinfo, status, type):
+        with self.create_helper(tarinfo, status, type) as (item, status):
+            return status
+
+    def process_fifo(self, *, tarinfo, status, type):
+        with self.create_helper(tarinfo, status, type) as (item, status):
+            return status
+
+    def process_dev(self, *, tarinfo, status, type):
+        with self.create_helper(tarinfo, status, type) as (item, status):
+            item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
+            return status
+
+    def process_link(self, *, tarinfo, status, type):
+        with self.create_helper(tarinfo, status, type) as (item, status):
+            item.source = tarinfo.linkname
+            return status
+
+    def process_file(self, *, tarinfo, status, type, tar):
+        with self.create_helper(tarinfo, status, type) as (item, status):
+            self.print_file_status(status, tarinfo.name)
+            status = None  # we already printed the status
+            fd = tar.extractfile(tarinfo)
+            self.process_file_chunks(item, self.cache, self.stats, self.show_progress,
+                                     backup_io_iter(self.chunker.chunkify(fd)))
+            item.get_size(memorize=True)
+            self.stats.nfiles += 1
+            return status
+
+
 def valid_msgpacked_dict(d, keys_serialized):
     """check if the data <d> looks like a msgpacked dict"""
     d_len = len(d)

+ 181 - 60
src/borg/archiver.py

@@ -39,7 +39,7 @@ try:
     from .algorithms.checksums import crc32
     from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special
     from .archive import BackupError, BackupOSError, backup_io, OsOpen, stat_update_check
-    from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor
+    from .archive import FilesystemObjectProcessors, TarfileObjectProcessors, MetadataCollector, ChunksProcessor
     from .archive import has_link
     from .cache import Cache, assert_secure, SecurityManager
     from .constants import *  # NOQA
@@ -68,13 +68,14 @@ try:
     from .helpers import basic_json_data, json_print
     from .helpers import replace_placeholders
     from .helpers import ChunkIteratorFileWrapper
-    from .helpers import popen_with_error_handling, prepare_subprocess_env
+    from .helpers import popen_with_error_handling, prepare_subprocess_env, create_filter_process
     from .helpers import dash_open
     from .helpers import umount
     from .helpers import flags_root, flags_dir, flags_special_follow, flags_special
     from .helpers import msgpack
     from .helpers import sig_int
     from .helpers import iter_separated
+    from .helpers import get_tar_filter
     from .nanorst import rst_to_terminal
     from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern
     from .patterns import PatternMatcher
@@ -951,67 +952,17 @@ class Archiver:
         # that it has to be installed -- hardly a problem, considering that
         # the decompressor must be installed as well to make use of the exported tarball!
 
-        filter = None
-        if args.tar_filter == 'auto':
-            # Note that filter remains None if tarfile is '-'.
-            if args.tarfile.endswith('.tar.gz'):
-                filter = 'gzip'
-            elif args.tarfile.endswith('.tar.bz2'):
-                filter = 'bzip2'
-            elif args.tarfile.endswith('.tar.xz'):
-                filter = 'xz'
-            logger.debug('Automatically determined tar filter: %s', filter)
-        else:
-            filter = args.tar_filter
+        filter = get_tar_filter(args.tarfile, decompress=False) if args.tar_filter == 'auto' else args.tar_filter
 
         tarstream = dash_open(args.tarfile, 'wb')
         tarstream_close = args.tarfile != '-'
 
-        if filter:
-            # When we put a filter between us and the final destination,
-            # the selected output (tarstream until now) becomes the output of the filter (=filterout).
-            # The decision whether to close that or not remains the same.
-            filterout = tarstream
-            filterout_close = tarstream_close
-            env = prepare_subprocess_env(system=True)
-            # There is no deadlock potential here (the subprocess docs warn about this), because
-            # communication with the process is a one-way road, i.e. the process can never block
-            # for us to do something while we block on the process for something different.
-            filterproc = popen_with_error_handling(filter, stdin=subprocess.PIPE, stdout=filterout,
-                                                   log_prefix='--tar-filter: ', env=env)
-            if not filterproc:
-                return EXIT_ERROR
-            # Always close the pipe, otherwise the filter process would not notice when we are done.
-            tarstream = filterproc.stdin
-            tarstream_close = True
-
-        # The | (pipe) symbol instructs tarfile to use a streaming mode of operation
-        # where it never seeks on the passed fileobj.
-        tar = tarfile.open(fileobj=tarstream, mode='w|', format=tarfile.GNU_FORMAT)
-
-        self._export_tar(args, archive, tar)
-
-        # This does not close the fileobj (tarstream) we passed to it -- a side effect of the | mode.
-        tar.close()
-
-        if tarstream_close:
-            tarstream.close()
-
-        if filter:
-            logger.debug('Done creating tar, waiting for filter to die...')
-            rc = filterproc.wait()
-            if rc:
-                logger.error('--tar-filter exited with code %d, output file is likely unusable!', rc)
-                self.exit_code = EXIT_ERROR
-            else:
-                logger.debug('filter exited with code %d', rc)
-
-            if filterout_close:
-                filterout.close()
+        with create_filter_process(filter, stream=tarstream, stream_close=tarstream_close, inbound=False) as _stream:
+            self._export_tar(args, archive, _stream)
 
         return self.exit_code
 
-    def _export_tar(self, args, archive, tar):
+    def _export_tar(self, args, archive, tarstream):
         matcher = self.build_matcher(args.patterns, args.paths)
 
         progress = args.progress
@@ -1027,6 +978,10 @@ class Archiver:
 
         filter = self.build_filter(matcher, peek_and_store_hardlink_masters, strip_components)
 
+        # The | (pipe) symbol instructs tarfile to use a streaming mode of operation
+        # where it never seeks on the passed fileobj.
+        tar = tarfile.open(fileobj=tarstream, mode='w|', format=tarfile.GNU_FORMAT)
+
         if progress:
             pi = ProgressIndicatorPercent(msg='%5.1f%% Processing: %s', step=0.1, msgid='extract')
             pi.output('Calculating size')
@@ -1138,6 +1093,9 @@ class Archiver:
         if pi:
             pi.finish()
 
+        # This does not close the fileobj (tarstream) we passed to it -- a side effect of the | mode.
+        tar.close()
+
         for pattern in matcher.get_unmatched_include_patterns():
             self.print_warning("Include pattern '%s' never matched.", pattern)
         return self.exit_code
@@ -1700,6 +1658,90 @@ class Archiver:
             cache.commit()
         return self.exit_code
 
+    @with_repository(cache=True, exclusive=True, compatibility=(Manifest.Operation.WRITE,))
+    def do_import_tar(self, args, repository, manifest, key, cache):
+        self.output_filter = args.output_filter
+        self.output_list = args.output_list
+
+        filter = get_tar_filter(args.tarfile, decompress=True) if args.tar_filter == 'auto' else args.tar_filter
+
+        tarstream = dash_open(args.tarfile, 'rb')
+        tarstream_close = args.tarfile != '-'
+
+        with create_filter_process(filter, stream=tarstream, stream_close=tarstream_close, inbound=True) as _stream:
+            self._import_tar(args, repository, manifest, key, cache, _stream)
+
+        return self.exit_code
+
+    def _import_tar(self, args, repository, manifest, key, cache, tarstream):
+        t0 = datetime.utcnow()
+        t0_monotonic = time.monotonic()
+
+        archive = Archive(repository, key, manifest, args.location.archive, cache=cache,
+                          create=True, checkpoint_interval=args.checkpoint_interval,
+                          progress=args.progress,
+                          chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic,
+                          log_json=args.log_json)
+        cp = ChunksProcessor(cache=cache, key=key,
+                             add_item=archive.add_item, write_checkpoint=archive.write_checkpoint,
+                             checkpoint_interval=args.checkpoint_interval, rechunkify=False)
+        tfo = TarfileObjectProcessors(cache=cache, key=key,
+                                      process_file_chunks=cp.process_file_chunks, add_item=archive.add_item,
+                                      chunker_params=args.chunker_params, show_progress=args.progress,
+                                      log_json=args.log_json, iec=args.iec,
+                                      file_status_printer=self.print_file_status)
+
+        tar = tarfile.open(fileobj=tarstream, mode='r|')
+
+        while True:
+            tarinfo = tar.next()
+            if not tarinfo:
+                break
+            if tarinfo.isreg():
+                status = tfo.process_file(tarinfo=tarinfo, status='A', type=stat.S_IFREG, tar=tar)
+                archive.stats.nfiles += 1
+            elif tarinfo.isdir():
+                status = tfo.process_dir(tarinfo=tarinfo, status='d', type=stat.S_IFDIR)
+            elif tarinfo.issym():
+                status = tfo.process_link(tarinfo=tarinfo, status='s', type=stat.S_IFLNK)
+            elif tarinfo.islnk():
+                # tar uses the same hardlink model as borg (rather vice versa); the first instance of a hardlink
+                # is stored as a regular file, later instances are special entries referencing back to the
+                # first instance.
+                status = tfo.process_link(tarinfo=tarinfo, status='h', type=stat.S_IFREG)
+            elif tarinfo.isblk():
+                status = tfo.process_dev(tarinfo=tarinfo, status='b', type=stat.S_IFBLK)
+            elif tarinfo.ischr():
+                status = tfo.process_dev(tarinfo=tarinfo, status='c', type=stat.S_IFCHR)
+            elif tarinfo.isfifo():
+                status = tfo.process_fifo(tarinfo=tarinfo, status='f', type=stat.S_IFIFO)
+            else:
+                status = 'E'
+                self.print_warning('%s: Unsupported tarinfo type %s', tarinfo.name, tarinfo.type)
+            self.print_file_status(status, tarinfo.name)
+
+        # This does not close the fileobj (tarstream) we passed to it -- a side effect of the | mode.
+        tar.close()
+
+        if args.progress:
+            archive.stats.show_progress(final=True)
+        archive.stats += tfo.stats
+        archive.save(comment=args.comment, timestamp=args.timestamp)
+        args.stats |= args.json
+        if args.stats:
+            if args.json:
+                json_print(basic_json_data(archive.manifest, cache=archive.cache, extra={
+                    'archive': archive,
+                }))
+            else:
+                log_multi(DASHES,
+                          str(archive),
+                          DASHES,
+                          STATS_HEADER,
+                          str(archive.stats),
+                          str(archive.cache),
+                          DASHES, logger=logging.getLogger('borg.output.stats'))
+
     @with_repository(manifest=False, exclusive=True)
     def do_with_lock(self, args, repository):
         """run a user specified command with the repository lock held"""
@@ -3749,11 +3791,13 @@ class Archiver:
         based on its file extension and pipe the tarball through an appropriate filter
         before writing it to FILE:
 
-        - .tar.gz: gzip
-        - .tar.bz2: bzip2
-        - .tar.xz: xz
+        - .tar.gz or .tgz: gzip
+        - .tar.bz2 or .tbz: bzip2
+        - .tar.xz or .txz: xz
+        - .tar.zstd: zstd
+        - .tar.lz4: lz4
 
-        Alternatively a ``--tar-filter`` program may be explicitly specified. It should
+        Alternatively, a ``--tar-filter`` program may be explicitly specified. It should
         read the uncompressed tar stream from stdin and write a compressed/filtered
         tar stream to stdout.
 
@@ -4612,6 +4656,83 @@ class Archiver:
         subparser.add_argument('args', metavar='ARGS', nargs=argparse.REMAINDER,
                                help='command arguments')
 
+        # borg import-tar
+        import_tar_epilog = process_epilog("""
+        This command creates a backup archive from a tarball.
+
+        When giving '-' as path, Borg will read a tar stream from standard input.
+
+        By default (--tar-filter=auto) Borg will detect whether the file is compressed
+        based on its file extension and pipe the file through an appropriate filter:
+
+        - .tar.gz or .tgz: gzip -d
+        - .tar.bz2 or .tbz: bzip2 -d
+        - .tar.xz or .txz: xz -d
+        - .tar.zstd: zstd -d
+        - .tar.lz4: lz4 -d
+
+        Alternatively, a --tar-filter program may be explicitly specified. It should
+        read compressed data from stdin and output an uncompressed tar stream on
+        stdout.
+
+        Most documentation of borg create applies. Note that this command does not
+        support excluding files.
+
+        import-tar is a lossy conversion:
+        BSD flags, ACLs, extended attributes (xattrs), atime and ctime are not exported.
+        Timestamp resolution is limited to whole seconds, not the nanosecond resolution
+        otherwise supported by Borg.
+
+        A ``--sparse`` option (as found in borg create) is not supported.
+
+        import-tar reads POSIX.1-1988 (ustar), POSIX.1-2001 (pax), GNU tar, UNIX V7 tar
+        and SunOS tar with extended attributes.
+        """)
+        subparser = subparsers.add_parser('import-tar', parents=[common_parser], add_help=False,
+                                          description=self.do_import_tar.__doc__,
+                                          epilog=import_tar_epilog,
+                                          formatter_class=argparse.RawDescriptionHelpFormatter,
+                                          help=self.do_import_tar.__doc__)
+        subparser.set_defaults(func=self.do_import_tar)
+        subparser.add_argument('--tar-filter', dest='tar_filter', default='auto',
+                               help='filter program to pipe data through')
+        subparser.add_argument('-s', '--stats', dest='stats',
+                               action='store_true', default=False,
+                               help='print statistics for the created archive')
+        subparser.add_argument('--list', dest='output_list',
+                               action='store_true', default=False,
+                               help='output verbose list of items (files, dirs, ...)')
+        subparser.add_argument('--filter', dest='output_filter', metavar='STATUSCHARS',
+                               help='only display items with the given status characters')
+        subparser.add_argument('--json', action='store_true',
+                               help='output stats as JSON (implies --stats)')
+
+        archive_group = subparser.add_argument_group('Archive options')
+        archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default='',
+                                   help='add a comment text to the archive')
+        archive_group.add_argument('--timestamp', dest='timestamp',
+                                   type=timestamp, default=None,
+                                   metavar='TIMESTAMP',
+                                   help='manually specify the archive creation date/time (UTC, yyyy-mm-ddThh:mm:ss format). '
+                                        'alternatively, give a reference file/directory.')
+        archive_group.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval',
+                                   type=int, default=1800, metavar='SECONDS',
+                                   help='write checkpoint every SECONDS seconds (Default: 1800)')
+        archive_group.add_argument('--chunker-params', dest='chunker_params',
+                                   type=ChunkerParams, default=CHUNKER_PARAMS,
+                                   metavar='PARAMS',
+                                   help='specify the chunker parameters (ALGO, CHUNK_MIN_EXP, CHUNK_MAX_EXP, '
+                                        'HASH_MASK_BITS, HASH_WINDOW_SIZE). default: %s,%d,%d,%d,%d' % CHUNKER_PARAMS)
+        archive_group.add_argument('-C', '--compression', metavar='COMPRESSION', dest='compression',
+                                   type=CompressionSpec, default=CompressionSpec('lz4'),
+                                   help='select compression algorithm, see the output of the '
+                                        '"borg help compression" command for details.')
+
+        subparser.add_argument('location', metavar='ARCHIVE',
+                               type=location_validator(archive=True),
+                               help='name of archive to create (must be also a valid directory name)')
+        subparser.add_argument('tarfile', metavar='TARFILE',
+                               help='input tar file. "-" to read from stdin instead.')
         return parser
 
     def get_args(self, argv, cmd):

+ 18 - 0
src/borg/helpers/misc.py

@@ -238,3 +238,21 @@ def iter_separated(fd, sep=None, read_size=4096):
     # or if there was no data before EOF
     if len(part) > 0:
         yield part
+
+
+def get_tar_filter(fname, decompress):
+    # Note that filter is None if fname is '-'.
+    if fname.endswith(('.tar.gz', '.tgz')):
+        filter = 'gzip -d' if decompress else 'gzip'
+    elif fname.endswith(('.tar.bz2', '.tbz')):
+        filter = 'bzip2 -d' if decompress else 'bzip2'
+    elif fname.endswith(('.tar.xz', '.txz')):
+        filter = 'xz -d' if decompress else 'xz'
+    elif fname.endswith(('.tar.lz4', )):
+        filter = 'lz4 -d' if decompress else 'lz4'
+    elif fname.endswith(('.tar.zstd', )):
+        filter = 'zstd -d' if decompress else 'zstd'
+    else:
+        filter = None
+    logger.debug('Automatically determined tar filter: %s', filter)
+    return filter

+ 43 - 1
src/borg/helpers/process.py

@@ -15,7 +15,7 @@ from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin
 from ..logger import create_logger
 logger = create_logger()
 
-from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE
+from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE, Error
 
 
 @contextlib.contextmanager
@@ -300,3 +300,45 @@ def prepare_subprocess_env(system, env=None):
     # for information, give borg version to the subprocess
     env['BORG_VERSION'] = __version__
     return env
+
+
+@contextlib.contextmanager
+def create_filter_process(cmd, stream, stream_close, inbound=True):
+    if cmd:
+        # put a filter process between stream and us (e.g. a [de]compression command)
+        # inbound: <stream> --> filter --> us
+        # outbound: us --> filter --> <stream>
+        filter_stream = stream
+        filter_stream_close = stream_close
+        env = prepare_subprocess_env(system=True)
+        # There is no deadlock potential here (the subprocess docs warn about this), because
+        # communication with the process is a one-way road, i.e. the process can never block
+        # for us to do something while we block on the process for something different.
+        if inbound:
+            proc = popen_with_error_handling(cmd, stdout=subprocess.PIPE, stdin=filter_stream,
+                                             log_prefix='filter-process: ', env=env)
+        else:
+            proc = popen_with_error_handling(cmd, stdin=subprocess.PIPE, stdout=filter_stream,
+                                             log_prefix='filter-process: ', env=env)
+        if not proc:
+            raise Error('filter %s: process creation failed' % (cmd, ))
+        stream = proc.stdout if inbound else proc.stdin
+        # inbound: do not close the pipe (this is the task of the filter process [== writer])
+        # outbound: close the pipe, otherwise the filter process would not notice when we are done.
+        stream_close = not inbound
+
+    try:
+        yield stream
+
+    finally:
+        if stream_close:
+            stream.close()
+
+        if cmd:
+            logger.debug('Done, waiting for filter to die...')
+            rc = proc.wait()
+            logger.debug('filter cmd exited with code %d', rc)
+            if filter_stream_close:
+                filter_stream.close()
+            if rc:
+                raise Error('filter %s failed, rc=%d' % (cmd, rc))

+ 25 - 0
src/borg/testsuite/archiver.py

@@ -3413,6 +3413,31 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
             assert os.stat('input/dir1/aaaa').st_nlink == 2
             assert os.stat('input/dir1/source2').st_nlink == 2
 
+    def test_import_tar(self):
+        self.create_test_files()
+        os.unlink('input/flagfile')
+        self.cmd('init', '--encryption=none', self.repository_location)
+        self.cmd('create', self.repository_location + '::src', 'input')
+        self.cmd('export-tar', self.repository_location + '::src', 'simple.tar')
+        self.cmd('import-tar', self.repository_location + '::dst', 'simple.tar')
+        with changedir(self.output_path):
+            self.cmd('extract', self.repository_location + '::dst')
+        self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
+
+    @requires_gzip
+    def test_import_tar_gz(self):
+        if not shutil.which('gzip'):
+            pytest.skip('gzip is not installed')
+        self.create_test_files()
+        os.unlink('input/flagfile')
+        self.cmd('init', '--encryption=none', self.repository_location)
+        self.cmd('create', self.repository_location + '::src', 'input')
+        self.cmd('export-tar', self.repository_location + '::src', 'simple.tgz')
+        self.cmd('import-tar', self.repository_location + '::dst', 'simple.tgz')
+        with changedir(self.output_path):
+            self.cmd('extract', self.repository_location + '::dst')
+        self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
+
     def test_detect_attic_repo(self):
         path = make_attic_repo(self.repository_path)
         cmds = [