2
0
Эх сурвалжийг харах

Merge pull request #2890 from enkore/f/mt-1c

archive: create FilesystemObjectProcessors class
enkore 7 жил өмнө
parent
commit
4f57e3a7c4
2 өөрчлөгдсөн 138 нэмэгдсэн , 94 устгасан
  1. 109 73
      src/borg/archive.py
  2. 29 21
      src/borg/archiver.py

+ 109 - 73
src/borg/archive.py

@@ -314,10 +314,8 @@ class Archive:
         self.create = create
         if self.create:
             self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
-            self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
             if name in manifest.archives:
                 raise self.AlreadyExists(name)
-            self.last_checkpoint = time.monotonic()
             i = 0
             while True:
                 self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
@@ -809,6 +807,25 @@ Utilization of max. archive size: {csize_max:.0%}
             logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
             logger.warning('borg check --repair is required to free all space.')
 
+    @staticmethod
+    def _open_rb(path):
+        try:
+            # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
+            return os.open(path, flags_noatime)
+        except PermissionError:
+            if flags_noatime == flags_normal:
+                # we do not have O_NOATIME, no need to try again:
+                raise
+            # Was this EPERM due to the O_NOATIME flag? Try again without it:
+            return os.open(path, flags_normal)
+
+
+class MetadataCollector:
+    def __init__(self, *, noatime, noctime, numeric_owner):
+        self.noatime = noatime
+        self.noctime = noctime
+        self.numeric_owner = numeric_owner
+
     def stat_simple_attrs(self, st):
         attrs = dict(
             mode=st.st_mode,
@@ -847,53 +864,19 @@ Utilization of max. archive size: {csize_max:.0%}
         attrs.update(self.stat_ext_attrs(st, path))
         return attrs
 
-    @contextmanager
-    def create_helper(self, path, st, status=None, hardlinkable=True):
-        safe_path = make_path_safe(path)
-        item = Item(path=safe_path)
-        hardlink_master = False
-        hardlinked = hardlinkable and st.st_nlink > 1
-        if hardlinked:
-            source = self.hard_links.get((st.st_ino, st.st_dev))
-            if source is not None:
-                item.source = source
-                status = 'h'  # hardlink (to already seen inodes)
-            else:
-                hardlink_master = True
-        yield item, status, hardlinked, hardlink_master
-        # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
-        self.add_item(item)
-        # ... and added to the archive, so we can remember it to refer to it later in the archive:
-        if hardlink_master:
-            self.hard_links[(st.st_ino, st.st_dev)] = safe_path
-
-    def process_dir(self, path, st):
-        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
-            item.update(self.stat_attrs(st, path))
-            return status
-
-    def process_fifo(self, path, st):
-        with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master):  # fifo
-            item.update(self.stat_attrs(st, path))
-            return status
 
-    def process_dev(self, path, st, dev_type):
-        with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master):  # char/block device
-            item.rdev = st.st_rdev
-            item.update(self.stat_attrs(st, path))
-            return status
+class ChunksProcessor:
+    # Processes an iterator of chunks for an Item
 
-    def process_symlink(self, path, st):
-        # note: using hardlinkable=False because we can not support hardlinked symlinks,
-        #       due to the dual-use of item.source, see issue #2343:
-        with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
-            with backup_io('readlink'):
-                source = os.readlink(path)
-            item.source = source
-            if st.st_nlink > 1:
-                logger.warning('hardlinked symlinks will be archived as non-hardlinked symlinks!')
-            item.update(self.stat_attrs(st, path))
-            return status
+    def __init__(self, *, key, cache,
+                 add_item, write_checkpoint,
+                 checkpoint_interval):
+        self.key = key
+        self.cache = cache
+        self.add_item = add_item
+        self.write_checkpoint = write_checkpoint
+        self.checkpoint_interval = checkpoint_interval
+        self.last_checkpoint = time.monotonic()
 
     def write_part_file(self, item, from_chunk, number):
         item = Item(internal_dict=item.as_dict())
@@ -908,7 +891,7 @@ Utilization of max. archive size: {csize_max:.0%}
         self.write_checkpoint()
         return length, number
 
-    def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None):
+    def process_file_chunks(self, item, cache, stats, chunk_iter, chunk_processor=None):
         if not chunk_processor:
             def chunk_processor(data):
                 chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
@@ -920,8 +903,6 @@ Utilization of max. archive size: {csize_max:.0%}
         part_number = 1
         for data in chunk_iter:
             item.chunks.append(chunk_processor(data))
-            if self.show_progress:
-                self.stats.show_progress(item=item, dt=0.2)
             if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
                 from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
                 self.last_checkpoint = time.monotonic()
@@ -939,6 +920,74 @@ Utilization of max. archive size: {csize_max:.0%}
                 for chunk in item.chunks:
                     cache.chunk_incref(chunk.id, stats, size=chunk.size)
 
+
+class FilesystemObjectProcessors:
+    # When ported to threading, then this doesn't need chunker, cache, key any more.
+    # write_checkpoint should then be in the item buffer,
+    # and process_file becomes a callback passed to __init__.
+
+    def __init__(self, *, metadata_collector, cache, key,
+                 add_item, process_file_chunks,
+                 chunker_params):
+        self.metadata_collector = metadata_collector
+        self.cache = cache
+        self.key = key
+        self.add_item = add_item
+        self.process_file_chunks = process_file_chunks
+
+        self.hard_links = {}
+        self.stats = Statistics()  # threading: done by cache (including progress)
+        self.cwd = os.getcwd()
+        self.chunker = Chunker(key.chunk_seed, *chunker_params)
+
+    @contextmanager
+    def create_helper(self, path, st, status=None, hardlinkable=True):
+        safe_path = make_path_safe(path)
+        item = Item(path=safe_path)
+        hardlink_master = False
+        hardlinked = hardlinkable and st.st_nlink > 1
+        if hardlinked:
+            source = self.hard_links.get((st.st_ino, st.st_dev))
+            if source is not None:
+                item.source = source
+                status = 'h'  # hardlink (to already seen inodes)
+            else:
+                hardlink_master = True
+        yield item, status, hardlinked, hardlink_master
+        # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
+        self.add_item(item)
+        # ... and added to the archive, so we can remember it to refer to it later in the archive:
+        if hardlink_master:
+            self.hard_links[(st.st_ino, st.st_dev)] = safe_path
+
+    def process_dir(self, path, st):
+        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
+            item.update(self.metadata_collector.stat_attrs(st, path))
+            return status
+
+    def process_fifo(self, path, st):
+        with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master):  # fifo
+            item.update(self.metadata_collector.stat_attrs(st, path))
+            return status
+
+    def process_dev(self, path, st, dev_type):
+        with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master):  # char/block device
+            item.rdev = st.st_rdev
+            item.update(self.metadata_collector.stat_attrs(st, path))
+            return status
+
+    def process_symlink(self, path, st):
+        # note: using hardlinkable=False because we can not support hardlinked symlinks,
+        #       due to the dual-use of item.source, see issue #2343:
+        with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
+            with backup_io('readlink'):
+                source = os.readlink(path)
+            item.source = source
+            if st.st_nlink > 1:
+                logger.warning('hardlinked symlinks will be archived as non-hardlinked symlinks!')
+            item.update(self.metadata_collector.stat_attrs(st, path))
+            return status
+
     def process_stdin(self, path, cache):
         uid, gid = 0, 0
         t = int(time.time()) * 1000000000
@@ -950,7 +999,7 @@ Utilization of max. archive size: {csize_max:.0%}
             mtime=t, atime=t, ctime=t,
         )
         fd = sys.stdin.buffer  # binary
-        self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
+        self.process_file_chunks(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
         item.get_size(memorize=True)
         self.stats.nfiles += 1
         self.add_item(item)
@@ -983,7 +1032,7 @@ Utilization of max. archive size: {csize_max:.0%}
                 else:
                     status = 'A'  # regular file, added
                 item.hardlink_master = hardlinked
-                item.update(self.stat_simple_attrs(st))
+                item.update(self.metadata_collector.stat_simple_attrs(st))
                 # Only chunkify the file if needed
                 if chunks is not None:
                     item.chunks = chunks
@@ -991,14 +1040,14 @@ Utilization of max. archive size: {csize_max:.0%}
                     with backup_io('open'):
                         fh = Archive._open_rb(path)
                     with os.fdopen(fh, 'rb') as fd:
-                        self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
+                        self.process_file_chunks(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
                     if not is_special_file:
                         # we must not memorize special files, because the contents of e.g. a
                         # block or char device will change without its mtime/size/inode changing.
                         cache.memorize_file(path_hash, st, [c.id for c in item.chunks])
                     status = status or 'M'  # regular file, modified (if not 'A' already)
                 self.stats.nfiles += 1
-            item.update(self.stat_attrs(st, path))
+            item.update(self.metadata_collector.stat_attrs(st, path))
             item.get_size(memorize=True)
             if is_special_file:
                 # we processed a special file like a regular file. reflect that in mode,
@@ -1006,24 +1055,6 @@ Utilization of max. archive size: {csize_max:.0%}
                 item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
             return status
 
-    @staticmethod
-    def list_archives(repository, key, manifest, cache=None):
-        # expensive! see also Manifest.list_archive_infos.
-        for name in manifest.archives:
-            yield Archive(repository, key, manifest, name, cache=cache)
-
-    @staticmethod
-    def _open_rb(path):
-        try:
-            # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
-            return os.open(path, flags_noatime)
-        except PermissionError:
-            if flags_noatime == flags_normal:
-                # we do not have O_NOATIME, no need to try again:
-                raise
-            # Was this EPERM due to the O_NOATIME flag? Try again without it:
-            return os.open(path, flags_normal)
-
 
 def valid_msgpacked_dict(d, keys_serialized):
     """check if the data <d> looks like a msgpacked dict"""
@@ -1663,7 +1694,7 @@ class ArchiveRecreater:
             return item.chunks
         chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
         chunk_processor = partial(self.chunk_processor, target)
-        target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
+        target.process_file_chunks(item, self.cache, target.stats, chunk_iterator, chunk_processor)
 
     def chunk_processor(self, target, data):
         chunk_id = self.key.id_hash(data)
@@ -1759,6 +1790,11 @@ class ArchiveRecreater:
         target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
         if target.recreate_rechunkify:
             logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
+        target.process_file_chunks = ChunksProcessor(
+            cache=self.cache, key=self.key,
+            add_item=target.add_item, write_checkpoint=target.write_checkpoint,
+            checkpoint_interval=self.checkpoint_interval).process_file_chunks
+        target.chunker = Chunker(self.key.chunk_seed, *target.chunker_params)
         return target
 
     def create_target_archive(self, name):

+ 29 - 21
src/borg/archiver.py

@@ -33,10 +33,10 @@ import msgpack
 import borg
 from . import __version__
 from . import helpers
-from . import shellpattern
 from .algorithms.checksums import crc32
 from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special
 from .archive import BackupOSError, backup_io
+from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor
 from .cache import Cache, assert_secure
 from .constants import *  # NOQA
 from .compress import CompressionSpec
@@ -448,7 +448,7 @@ class Archiver:
         matcher = PatternMatcher(fallback=True)
         matcher.add_inclexcl(args.patterns)
 
-        def create_inner(archive, cache):
+        def create_inner(archive, cache, fso):
             # Add cache dir to inode_skip list
             skip_inodes = set()
             try:
@@ -468,7 +468,7 @@ class Archiver:
                     path = 'stdin'
                     if not dry_run:
                         try:
-                            status = archive.process_stdin(path, cache)
+                            status = fso.process_stdin(path, cache)
                         except BackupOSError as e:
                             status = 'E'
                             self.print_warning('%s: %s', path, e)
@@ -486,7 +486,7 @@ class Archiver:
                     restrict_dev = st.st_dev
                 else:
                     restrict_dev = None
-                self._process(archive, cache, matcher, args.exclude_caches, args.exclude_if_present,
+                self._process(fso, cache, matcher, args.exclude_caches, args.exclude_if_present,
                               args.keep_exclude_tags, skip_inodes, path, restrict_dev,
                               read_special=args.read_special, dry_run=dry_run, st=st)
             if not dry_run:
@@ -523,12 +523,20 @@ class Archiver:
                                   progress=args.progress,
                                   chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic,
                                   log_json=args.log_json)
-                create_inner(archive, cache)
+                metadata_collector = MetadataCollector(noatime=args.noatime, noctime=args.noctime,
+                    numeric_owner=args.numeric_owner)
+                cp = ChunksProcessor(cache=cache, key=key,
+                    add_item=archive.add_item, write_checkpoint=archive.write_checkpoint,
+                    checkpoint_interval=args.checkpoint_interval)
+                fso = FilesystemObjectProcessors(metadata_collector=metadata_collector, cache=cache, key=key,
+                    process_file_chunks=cp.process_file_chunks, add_item=archive.add_item,
+                    chunker_params=args.chunker_params)
+                create_inner(archive, cache, fso)
         else:
-            create_inner(None, None)
+            create_inner(None, None, None)
         return self.exit_code
 
-    def _process(self, archive, cache, matcher, exclude_caches, exclude_if_present,
+    def _process(self, fso, cache, matcher, exclude_caches, exclude_if_present,
                  keep_exclude_tags, skip_inodes, path, restrict_dev,
                  read_special=False, dry_run=False, st=None):
         """
@@ -566,33 +574,33 @@ class Archiver:
                     return
             if stat.S_ISREG(st.st_mode):
                 if not dry_run:
-                    status = archive.process_file(path, st, cache, self.ignore_inode)
+                    status = fso.process_file(path, st, cache, self.ignore_inode)
             elif stat.S_ISDIR(st.st_mode):
                 if recurse:
                     tag_paths = dir_is_tagged(path, exclude_caches, exclude_if_present)
                     if tag_paths:
                         if keep_exclude_tags and not dry_run:
-                            archive.process_dir(path, st)
+                            fso.process_dir(path, st)
                             for tag_path in tag_paths:
-                                self._process(archive, cache, matcher, exclude_caches, exclude_if_present,
+                                self._process(fso, cache, matcher, exclude_caches, exclude_if_present,
                                               keep_exclude_tags, skip_inodes, tag_path, restrict_dev,
                                               read_special=read_special, dry_run=dry_run)
                         return
                 if not dry_run:
                     if not recurse_excluded_dir:
-                        status = archive.process_dir(path, st)
+                        status = fso.process_dir(path, st)
                 if recurse:
                     with backup_io('scandir'):
                         entries = helpers.scandir_inorder(path)
                     for dirent in entries:
                         normpath = os.path.normpath(dirent.path)
-                        self._process(archive, cache, matcher, exclude_caches, exclude_if_present,
+                        self._process(fso, cache, matcher, exclude_caches, exclude_if_present,
                                       keep_exclude_tags, skip_inodes, normpath, restrict_dev,
                                       read_special=read_special, dry_run=dry_run)
             elif stat.S_ISLNK(st.st_mode):
                 if not dry_run:
                     if not read_special:
-                        status = archive.process_symlink(path, st)
+                        status = fso.process_symlink(path, st)
                     else:
                         try:
                             st_target = os.stat(path)
@@ -601,27 +609,27 @@ class Archiver:
                         else:
                             special = is_special(st_target.st_mode)
                         if special:
-                            status = archive.process_file(path, st_target, cache)
+                            status = fso.process_file(path, st_target, cache)
                         else:
-                            status = archive.process_symlink(path, st)
+                            status = fso.process_symlink(path, st)
             elif stat.S_ISFIFO(st.st_mode):
                 if not dry_run:
                     if not read_special:
-                        status = archive.process_fifo(path, st)
+                        status = fso.process_fifo(path, st)
                     else:
-                        status = archive.process_file(path, st, cache)
+                        status = fso.process_file(path, st, cache)
             elif stat.S_ISCHR(st.st_mode):
                 if not dry_run:
                     if not read_special:
-                        status = archive.process_dev(path, st, 'c')
+                        status = fso.process_dev(path, st, 'c')
                     else:
-                        status = archive.process_file(path, st, cache)
+                        status = fso.process_file(path, st, cache)
             elif stat.S_ISBLK(st.st_mode):
                 if not dry_run:
                     if not read_special:
-                        status = archive.process_dev(path, st, 'b')
+                        status = fso.process_dev(path, st, 'b')
                     else:
-                        status = archive.process_file(path, st, cache)
+                        status = fso.process_file(path, st, cache)
             elif stat.S_ISSOCK(st.st_mode):
                 # Ignore unix sockets
                 return