ソースを参照

Merge pull request #1234 from enkore/issue/1231

Implement fail-safe error handling for borg-extract
TW 9 年 前
コミット
eea46928ff
3 ファイル変更95 行追加65 行削除
  1. 73 51
      borg/archive.py
  2. 14 6
      borg/archiver.py
  3. 8 8
      borg/testsuite/archive.py

+ 73 - 51
borg/archive.py

@@ -46,8 +46,16 @@ flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
 flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
 
 
-class InputOSError(Exception):
-    """Wrapper for OSError raised while accessing input files."""
+class BackupOSError(Exception):
+    """
+    Wrapper for OSError raised while accessing backup files.
+
+    Borg does different kinds of IO, and IO failures have different consequences.
+    This wrapper represents failures of input file or extraction IO.
+    These are non-critical and are only reported (exit code = 1, warning).
+
+    Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
+    """
     def __init__(self, os_error):
         self.os_error = os_error
         self.errno = os_error.errno
@@ -59,18 +67,18 @@ class InputOSError(Exception):
 
 
 @contextmanager
-def input_io():
-    """Context manager changing OSError to InputOSError."""
+def backup_io():
+    """Context manager changing OSError to BackupOSError."""
     try:
         yield
     except OSError as os_error:
-        raise InputOSError(os_error) from os_error
+        raise BackupOSError(os_error) from os_error
 
 
-def input_io_iter(iterator):
+def backup_io_iter(iterator):
     while True:
         try:
-            with input_io():
+            with backup_io():
                 item = next(iterator)
         except StopIteration:
             return
@@ -356,54 +364,68 @@ Number of files: {0.stats.nfiles}'''.format(
         mode = item[b'mode']
         if stat.S_ISREG(mode):
             if not os.path.exists(os.path.dirname(path)):
-                os.makedirs(os.path.dirname(path))
+                with backup_io():
+                    os.makedirs(os.path.dirname(path))
             # Hard link?
             if b'source' in item:
                 source = os.path.join(dest, item[b'source'])
-                if os.path.exists(path):
-                    os.unlink(path)
-                os.link(source, path)
+                with backup_io():
+                    if os.path.exists(path):
+                        os.unlink(path)
+                    os.link(source, path)
             else:
-                with open(path, 'wb') as fd:
+                with backup_io():
+                    fd = open(path, 'wb')
+                with fd:
                     ids = [c[0] for c in item[b'chunks']]
                     for data in self.pipeline.fetch_many(ids, is_preloaded=True):
-                        if sparse and self.zeros.startswith(data):
-                            # all-zero chunk: create a hole in a sparse file
-                            fd.seek(len(data), 1)
-                        else:
-                            fd.write(data)
-                    pos = fd.tell()
-                    fd.truncate(pos)
-                    fd.flush()
-                    self.restore_attrs(path, item, fd=fd.fileno())
-        elif stat.S_ISDIR(mode):
-            if not os.path.exists(path):
-                os.makedirs(path)
-            if restore_attrs:
+                        with backup_io():
+                            if sparse and self.zeros.startswith(data):
+                                # all-zero chunk: create a hole in a sparse file
+                                fd.seek(len(data), 1)
+                            else:
+                                fd.write(data)
+                    with backup_io():
+                        pos = fd.tell()
+                        fd.truncate(pos)
+                        fd.flush()
+                        self.restore_attrs(path, item, fd=fd.fileno())
+            return
+        with backup_io():
+            # No repository access beyond this point.
+            if stat.S_ISDIR(mode):
+                if not os.path.exists(path):
+                    os.makedirs(path)
+                if restore_attrs:
+                    self.restore_attrs(path, item)
+            elif stat.S_ISLNK(mode):
+                if not os.path.exists(os.path.dirname(path)):
+                    os.makedirs(os.path.dirname(path))
+                source = item[b'source']
+                if os.path.exists(path):
+                    os.unlink(path)
+                try:
+                    os.symlink(source, path)
+                except UnicodeEncodeError:
+                    raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
+                self.restore_attrs(path, item, symlink=True)
+            elif stat.S_ISFIFO(mode):
+                if not os.path.exists(os.path.dirname(path)):
+                    os.makedirs(os.path.dirname(path))
+                os.mkfifo(path)
                 self.restore_attrs(path, item)
-        elif stat.S_ISLNK(mode):
-            if not os.path.exists(os.path.dirname(path)):
-                os.makedirs(os.path.dirname(path))
-            source = item[b'source']
-            if os.path.exists(path):
-                os.unlink(path)
-            try:
-                os.symlink(source, path)
-            except UnicodeEncodeError:
-                raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
-            self.restore_attrs(path, item, symlink=True)
-        elif stat.S_ISFIFO(mode):
-            if not os.path.exists(os.path.dirname(path)):
-                os.makedirs(os.path.dirname(path))
-            os.mkfifo(path)
-            self.restore_attrs(path, item)
-        elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
-            os.mknod(path, item[b'mode'], item[b'rdev'])
-            self.restore_attrs(path, item)
-        else:
-            raise Exception('Unknown archive item type %r' % item[b'mode'])
+            elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
+                os.mknod(path, item[b'mode'], item[b'rdev'])
+                self.restore_attrs(path, item)
+            else:
+                raise Exception('Unknown archive item type %r' % item[b'mode'])
 
     def restore_attrs(self, path, item, symlink=False, fd=None):
+        """
+        Restore filesystem attributes on *path* (*fd*) from *item*.
+
+        Does not access the repository.
+        """
         uid = gid = None
         if not self.numeric_owner:
             uid = user2uid(item[b'user'])
@@ -496,13 +518,13 @@ Number of files: {0.stats.nfiles}'''.format(
         }
         if self.numeric_owner:
             item[b'user'] = item[b'group'] = None
-        with input_io():
+        with backup_io():
             xattrs = xattr.get_all(path, follow_symlinks=False)
         if xattrs:
             item[b'xattrs'] = StableDict(xattrs)
         if has_lchflags and st.st_flags:
             item[b'bsdflags'] = st.st_flags
-        with input_io():
+        with backup_io():
             acl_get(path, item, st, self.numeric_owner)
         return item
 
@@ -538,7 +560,7 @@ Number of files: {0.stats.nfiles}'''.format(
         uid, gid = 0, 0
         fd = sys.stdin.buffer  # binary
         chunks = []
-        for chunk in input_io_iter(self.chunker.chunkify(fd)):
+        for chunk in backup_io_iter(self.chunker.chunkify(fd)):
             chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
         self.stats.nfiles += 1
         t = int_to_bigint(int(time.time()) * 1000000000)
@@ -586,11 +608,11 @@ Number of files: {0.stats.nfiles}'''.format(
         item = {b'path': safe_path}
         # Only chunkify the file if needed
         if chunks is None:
-            with input_io():
+            with backup_io():
                 fh = Archive._open_rb(path)
             with os.fdopen(fh, 'rb') as fd:
                 chunks = []
-                for chunk in input_io_iter(self.chunker.chunkify(fd, fh)):
+                for chunk in backup_io_iter(self.chunker.chunkify(fd, fh)):
                     chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
                     if self.show_progress:
                         self.stats.show_progress(item=item, dt=0.2)

+ 14 - 6
borg/archiver.py

@@ -29,7 +29,7 @@ from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader
 from .repository import Repository
 from .cache import Cache
 from .key import key_creator, RepoKey, PassphraseKey
-from .archive import input_io, InputOSError, Archive, ArchiveChecker, CHUNKER_PARAMS
+from .archive import backup_io, BackupOSError, Archive, ArchiveChecker, CHUNKER_PARAMS
 from .remote import RepositoryServer, RemoteRepository, cache_if_remote
 
 has_lchflags = hasattr(os, 'lchflags')
@@ -198,7 +198,7 @@ class Archiver:
                     if not dry_run:
                         try:
                             status = archive.process_stdin(path, cache)
-                        except InputOSError as e:
+                        except BackupOSError as e:
                             status = 'E'
                             self.print_warning('%s: %s', path, e)
                     else:
@@ -281,7 +281,7 @@ class Archiver:
             if not dry_run:
                 try:
                     status = archive.process_file(path, st, cache, self.ignore_inode)
-                except InputOSError as e:
+                except BackupOSError as e:
                     status = 'E'
                     self.print_warning('%s: %s', path, e)
         elif stat.S_ISDIR(st.st_mode):
@@ -372,7 +372,11 @@ class Archiver:
                     continue
             if not args.dry_run:
                 while dirs and not item[b'path'].startswith(dirs[-1][b'path']):
-                    archive.extract_item(dirs.pop(-1), stdout=stdout)
+                    dir_item = dirs.pop(-1)
+                    try:
+                        archive.extract_item(dir_item, stdout=stdout)
+                    except BackupOSError as e:
+                        self.print_warning('%s: %s', remove_surrogates(dir_item[b'path']), e)
             if output_list:
                 logger.info(remove_surrogates(orig_path))
             try:
@@ -384,12 +388,16 @@ class Archiver:
                         archive.extract_item(item, restore_attrs=False)
                     else:
                         archive.extract_item(item, stdout=stdout, sparse=sparse)
-            except OSError as e:
+            except BackupOSError as e:
                 self.print_warning('%s: %s', remove_surrogates(orig_path), e)
 
         if not args.dry_run:
             while dirs:
-                archive.extract_item(dirs.pop(-1))
+                dir_item = dirs.pop(-1)
+                try:
+                    archive.extract_item(dir_item)
+                except BackupOSError as e:
+                    self.print_warning('%s: %s', remove_surrogates(dir_item[b'path']), e)
         for pattern in include_patterns:
             if pattern.match_count == 0:
                 self.print_warning("Include pattern '%s' never matched.", pattern)

+ 8 - 8
borg/testsuite/archive.py

@@ -5,7 +5,7 @@ import msgpack
 import pytest
 
 from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS
-from ..archive import InputOSError, input_io, input_io_iter
+from ..archive import BackupOSError, backup_io, backup_io_iter
 from ..key import PlaintextKey
 from ..helpers import Manifest
 from . import BaseTestCase
@@ -148,13 +148,13 @@ def test_key_length_msgpacked_items():
     assert valid_msgpacked_dict(msgpack.packb(data), item_keys_serialized)
 
 
-def test_input_io():
-    with pytest.raises(InputOSError):
-        with input_io():
+def test_backup_io():
+    with pytest.raises(BackupOSError):
+        with backup_io():
             raise OSError(123)
 
 
-def test_input_io_iter():
+def test_backup_io_iter():
     class Iterator:
         def __init__(self, exc):
             self.exc = exc
@@ -163,10 +163,10 @@ def test_input_io_iter():
             raise self.exc()
 
     oserror_iterator = Iterator(OSError)
-    with pytest.raises(InputOSError):
-        for _ in input_io_iter(oserror_iterator):
+    with pytest.raises(BackupOSError):
+        for _ in backup_io_iter(oserror_iterator):
             pass
 
     normal_iterator = Iterator(StopIteration)
-    for _ in input_io_iter(normal_iterator):
+    for _ in backup_io_iter(normal_iterator):
         assert False, 'StopIteration handled incorrectly'