Bläddra i källkod

use context manager for filter process setup/teardown

Thomas Waldmann 4 år sedan
förälder
incheckning
de84440c18
4 ändrade filer med 65 tillägg och 61 borttagningar
  1. 7 59
      src/borg/archiver.py
  2. 1 1
      src/borg/helpers/misc.py
  3. 43 1
      src/borg/helpers/process.py
  4. 14 0
      src/borg/testsuite/archiver.py

+ 7 - 59
src/borg/archiver.py

@@ -68,7 +68,7 @@ try:
     from .helpers import basic_json_data, json_print
     from .helpers import basic_json_data, json_print
     from .helpers import replace_placeholders
     from .helpers import replace_placeholders
     from .helpers import ChunkIteratorFileWrapper
     from .helpers import ChunkIteratorFileWrapper
-    from .helpers import popen_with_error_handling, prepare_subprocess_env
+    from .helpers import popen_with_error_handling, prepare_subprocess_env, create_filter_process
     from .helpers import dash_open
     from .helpers import dash_open
     from .helpers import umount
     from .helpers import umount
     from .helpers import flags_root, flags_dir, flags_special_follow, flags_special
     from .helpers import flags_root, flags_dir, flags_special_follow, flags_special
@@ -952,45 +952,13 @@ class Archiver:
         # that it has to be installed -- hardly a problem, considering that
         # that it has to be installed -- hardly a problem, considering that
         # the decompressor must be installed as well to make use of the exported tarball!
         # the decompressor must be installed as well to make use of the exported tarball!
 
 
-        filter = get_tar_filter(args.tarfile) if args.tar_filter == 'auto' else args.tar_filter
+        filter = get_tar_filter(args.tarfile, decompress=False) if args.tar_filter == 'auto' else args.tar_filter
 
 
         tarstream = dash_open(args.tarfile, 'wb')
         tarstream = dash_open(args.tarfile, 'wb')
         tarstream_close = args.tarfile != '-'
         tarstream_close = args.tarfile != '-'
 
 
-        if filter:
-            # When we put a filter between us and the final destination,
-            # the selected output (tarstream until now) becomes the output of the filter (=filterout).
-            # The decision whether to close that or not remains the same.
-            filterout = tarstream
-            filterout_close = tarstream_close
-            env = prepare_subprocess_env(system=True)
-            # There is no deadlock potential here (the subprocess docs warn about this), because
-            # communication with the process is a one-way road, i.e. the process can never block
-            # for us to do something while we block on the process for something different.
-            filterproc = popen_with_error_handling(filter, stdin=subprocess.PIPE, stdout=filterout,
-                                                   log_prefix='--tar-filter: ', env=env)
-            if not filterproc:
-                return EXIT_ERROR
-            # Always close the pipe, otherwise the filter process would not notice when we are done.
-            tarstream = filterproc.stdin
-            tarstream_close = True
-
-        self._export_tar(args, archive, tarstream)
-
-        if tarstream_close:
-            tarstream.close()
-
-        if filter:
-            logger.debug('Done creating tar, waiting for filter to die...')
-            rc = filterproc.wait()
-            if rc:
-                logger.error('--tar-filter exited with code %d, output file is likely unusable!', rc)
-                self.exit_code = EXIT_ERROR
-            else:
-                logger.debug('filter exited with code %d', rc)
-
-            if filterout_close:
-                filterout.close()
+        with create_filter_process(filter, stream=tarstream, stream_close=tarstream_close, inbound=False) as _stream:
+            self._export_tar(args, archive, _stream)
 
 
         return self.exit_code
         return self.exit_code
 
 
@@ -1695,33 +1663,13 @@ class Archiver:
         self.output_filter = args.output_filter
         self.output_filter = args.output_filter
         self.output_list = args.output_list
         self.output_list = args.output_list
 
 
-        filter = get_tar_filter(args.tarfile) if args.tar_filter == 'auto' else args.tar_filter
+        filter = get_tar_filter(args.tarfile, decompress=True) if args.tar_filter == 'auto' else args.tar_filter
 
 
         tarstream = dash_open(args.tarfile, 'rb')
         tarstream = dash_open(args.tarfile, 'rb')
         tarstream_close = args.tarfile != '-'
         tarstream_close = args.tarfile != '-'
 
 
-        if filter:
-            filterin = tarstream
-            filterin_close = tarstream_close
-            filterproc = subprocess.Popen(shlex.split(filter), stdin=filterin.fileno(), stdout=subprocess.PIPE)
-            # We can't deadlock. Why? Because we just give the FD of the stream to the filter process,
-            # it can read as much data as it wants!
-            tarstream = filterproc.stdout
-            tarstream_close = False
-
-        self._import_tar(args, repository, manifest, key, cache, tarstream)
-
-        if filter:
-            logger.debug('Done creating archive, waiting for filter to die...')
-            rc = filterproc.wait()
-            logger.debug('filter exited with code %d', rc)
-            self.exit_code = max(self.exit_code, rc)
-
-            if filterin_close:
-                filterin.close()
-
-        if tarstream_close:
-            tarstream.close()
+        with create_filter_process(filter, stream=tarstream, stream_close=tarstream_close, inbound=True) as _stream:
+            self._import_tar(args, repository, manifest, key, cache, _stream)
 
 
         return self.exit_code
         return self.exit_code
 
 

+ 1 - 1
src/borg/helpers/misc.py

@@ -240,7 +240,7 @@ def iter_separated(fd, sep=None, read_size=4096):
         yield part
         yield part
 
 
 
 
-def get_tar_filter(fname):
+def get_tar_filter(fname, decompress):
     # Note that filter is None if fname is '-'.
     # Note that filter is None if fname is '-'.
     if fname.endswith(('.tar.gz', '.tgz')):
     if fname.endswith(('.tar.gz', '.tgz')):
         filter = 'gzip -d' if decompress else 'gzip'
         filter = 'gzip -d' if decompress else 'gzip'

+ 43 - 1
src/borg/helpers/process.py

@@ -15,7 +15,7 @@ from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin
 from ..logger import create_logger
 from ..logger import create_logger
 logger = create_logger()
 logger = create_logger()
 
 
-from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE
+from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE, Error
 
 
 
 
 @contextlib.contextmanager
 @contextlib.contextmanager
@@ -300,3 +300,45 @@ def prepare_subprocess_env(system, env=None):
     # for information, give borg version to the subprocess
     # for information, give borg version to the subprocess
     env['BORG_VERSION'] = __version__
     env['BORG_VERSION'] = __version__
     return env
     return env
+
+
+@contextlib.contextmanager
+def create_filter_process(cmd, stream, stream_close, inbound=True):
+    if cmd:
+        # put a filter process between stream and us (e.g. a [de]compression command)
+        # inbound: <stream> --> filter --> us
+        # outbound: us --> filter --> <stream>
+        filter_stream = stream
+        filter_stream_close = stream_close
+        env = prepare_subprocess_env(system=True)
+        # There is no deadlock potential here (the subprocess docs warn about this), because
+        # communication with the process is a one-way road, i.e. the process can never block
+        # for us to do something while we block on the process for something different.
+        if inbound:
+            proc = popen_with_error_handling(cmd, stdout=subprocess.PIPE, stdin=filter_stream,
+                                             log_prefix='filter-process: ', env=env)
+        else:
+            proc = popen_with_error_handling(cmd, stdin=subprocess.PIPE, stdout=filter_stream,
+                                             log_prefix='filter-process: ', env=env)
+        if not proc:
+            raise Error('filter %s: process creation failed' % (cmd, ))
+        stream = proc.stdout if inbound else proc.stdin
+        # inbound: do not close the pipe (this is the task of the filter process [== writer])
+        # outbound: close the pipe, otherwise the filter process would not notice when we are done.
+        stream_close = not inbound
+
+    try:
+        yield stream
+
+    finally:
+        if stream_close:
+            stream.close()
+
+        if cmd:
+            logger.debug('Done, waiting for filter to die...')
+            rc = proc.wait()
+            logger.debug('filter cmd exited with code %d', rc)
+            if filter_stream_close:
+                filter_stream.close()
+            if rc:
+                raise Error('filter %s failed, rc=%d' % (cmd, rc))

+ 14 - 0
src/borg/testsuite/archiver.py

@@ -3424,6 +3424,20 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
             self.cmd('extract', self.repository_location + '::dst')
             self.cmd('extract', self.repository_location + '::dst')
         self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
         self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
 
 
+    @requires_gzip
+    def test_import_tar_gz(self):
+        if not shutil.which('gzip'):
+            pytest.skip('gzip is not installed')
+        self.create_test_files()
+        os.unlink('input/flagfile')
+        self.cmd('init', '--encryption=none', self.repository_location)
+        self.cmd('create', self.repository_location + '::src', 'input')
+        self.cmd('export-tar', self.repository_location + '::src', 'simple.tgz')
+        self.cmd('import-tar', self.repository_location + '::dst', 'simple.tgz')
+        with changedir(self.output_path):
+            self.cmd('extract', self.repository_location + '::dst')
+        self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
+
     def test_detect_attic_repo(self):
     def test_detect_attic_repo(self):
         path = make_attic_repo(self.repository_path)
         path = make_attic_repo(self.repository_path)
         cmds = [
         cmds = [