Pārlūkot izejas kodu

Merge pull request #985 from enkore/feature/fastlio

Improved LoggedIO write performance
enkore 9 gadi atpakaļ
vecāks
revīzija
be555d423e

+ 8 - 2
borg/constants.py

@@ -12,8 +12,14 @@ UMASK_DEFAULT = 0o077
 CACHE_TAG_NAME = 'CACHEDIR.TAG'
 CACHE_TAG_NAME = 'CACHEDIR.TAG'
 CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55'
 CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55'
 
 
-DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
-DEFAULT_SEGMENTS_PER_DIR = 10000
+# A large, but not unreasonably large segment size. Always less than 2 GiB (for legacy file systems). We choose
+# 500 MiB which means that no indirection from the inode is needed for typical Linux file systems.
+# Note that this is a soft-limit and can be exceeded (worst case) by a full maximum chunk size and some metadata
+# bytes. That's why it's 500 MiB instead of 512 MiB.
+DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024
+
+# A few hundred files per directory to go easy on filesystems which don't like too many files per dir (NTFS)
+DEFAULT_SEGMENTS_PER_DIR = 500
 
 
 CHUNK_MIN_EXP = 19  # 2**19 == 512kiB
 CHUNK_MIN_EXP = 19  # 2**19 == 512kiB
 CHUNK_MAX_EXP = 23  # 2**23 == 8MiB
 CHUNK_MAX_EXP = 23  # 2**23 == 8MiB

+ 1 - 1
borg/helpers.py

@@ -82,7 +82,7 @@ def check_extension_modules():
         raise ExtensionModuleError
         raise ExtensionModuleError
     if crypto.API_VERSION != 3:
     if crypto.API_VERSION != 3:
         raise ExtensionModuleError
         raise ExtensionModuleError
-    if platform.API_VERSION != 2:
+    if platform.API_VERSION != 3:
         raise ExtensionModuleError
         raise ExtensionModuleError
 
 
 
 

+ 3 - 9
borg/platform.py

@@ -1,16 +1,10 @@
 import sys
 import sys
 
 
+from .platform_base import acl_get, acl_set, SyncFile, sync_dir, API_VERSION
+
 if sys.platform.startswith('linux'):  # pragma: linux only
 if sys.platform.startswith('linux'):  # pragma: linux only
-    from .platform_linux import acl_get, acl_set, API_VERSION
+    from .platform_linux import acl_get, acl_set, SyncFile, API_VERSION
 elif sys.platform.startswith('freebsd'):  # pragma: freebsd only
 elif sys.platform.startswith('freebsd'):  # pragma: freebsd only
     from .platform_freebsd import acl_get, acl_set, API_VERSION
     from .platform_freebsd import acl_get, acl_set, API_VERSION
 elif sys.platform == 'darwin':  # pragma: darwin only
 elif sys.platform == 'darwin':  # pragma: darwin only
     from .platform_darwin import acl_get, acl_set, API_VERSION
     from .platform_darwin import acl_get, acl_set, API_VERSION
-else:  # pragma: unknown platform only
-    API_VERSION = 2
-
-    def acl_get(path, item, st, numeric_owner=False):
-        pass
-
-    def acl_set(path, item, numeric_owner=False):
-        pass

+ 78 - 0
borg/platform_base.py

@@ -0,0 +1,78 @@
+import os
+
+API_VERSION = 3
+
+fdatasync = getattr(os, 'fdatasync', os.fsync)
+
+
+def acl_get(path, item, st, numeric_owner=False):
+    """
+    Saves ACL Entries
+
+    If `numeric_owner` is True the user/group field is not preserved only uid/gid
+    """
+
+
+def acl_set(path, item, numeric_owner=False):
+    """
+    Restore ACL Entries
+
+    If `numeric_owner` is True the stored uid/gid is used instead
+    of the user/group names
+    """
+
+
+def sync_dir(path):
+    fd = os.open(path, os.O_RDONLY)
+    try:
+        os.fsync(fd)
+    finally:
+        os.close(fd)
+
+
+class SyncFile:
+    """
+    A file class that is supposed to enable write ordering (one way or another) and data durability after close().
+
+    The degree to which either is possible varies with operating system, file system and hardware.
+
+    This fallback implements a naive and slow way of doing this. On some operating systems it can't actually
+    guarantee any of the above, since fsync() doesn't guarantee it. Furthermore it may not be possible at all
+    to satisfy the above guarantees on some hardware or operating systems. In these cases we hope that the thorough
+    checksumming implemented catches any corrupted data due to misordered, delayed or partial writes.
+
+    Note that POSIX doesn't specify *anything* about power failures (or similar failures). A system that
+    routinely loses files or corrupts file on power loss is POSIX compliant.
+
+    TODO: Use F_FULLSYNC on OSX.
+    TODO: A Windows implementation should use CreateFile with FILE_FLAG_WRITE_THROUGH.
+    """
+
+    def __init__(self, path):
+        self.fd = open(path, 'wb')
+        self.fileno = self.fd.fileno()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def write(self, data):
+        self.fd.write(data)
+
+    def sync(self):
+        """
+        Synchronize file contents. Everything written prior to sync() must become durable before anything written
+        after sync().
+        """
+        self.fd.flush()
+        fdatasync(self.fileno)
+        if hasattr(os, 'posix_fadvise'):
+            os.posix_fadvise(self.fileno, 0, 0, os.POSIX_FADV_DONTNEED)
+
+    def close(self):
+        """sync() and close."""
+        self.sync()
+        self.fd.close()
+        sync_dir(os.path.dirname(self.fd.name))

+ 1 - 1
borg/platform_darwin.pyx

@@ -1,7 +1,7 @@
 import os
 import os
 from .helpers import user2uid, group2gid, safe_decode, safe_encode
 from .helpers import user2uid, group2gid, safe_decode, safe_encode
 
 
-API_VERSION = 2
+API_VERSION = 3
 
 
 cdef extern from "sys/acl.h":
 cdef extern from "sys/acl.h":
     ctypedef struct _acl_t:
     ctypedef struct _acl_t:

+ 1 - 1
borg/platform_freebsd.pyx

@@ -1,7 +1,7 @@
 import os
 import os
 from .helpers import posix_acl_use_stored_uid_gid, safe_encode, safe_decode
 from .helpers import posix_acl_use_stored_uid_gid, safe_encode, safe_decode
 
 
-API_VERSION = 2
+API_VERSION = 3
 
 
 cdef extern from "errno.h":
 cdef extern from "errno.h":
     int errno
     int errno

+ 53 - 10
borg/platform_linux.pyx

@@ -1,13 +1,17 @@
 import os
 import os
 import re
 import re
+import resource
 from stat import S_ISLNK
 from stat import S_ISLNK
 from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode
 from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode
+from .platform_base import SyncFile as BaseSyncFile
+from libc cimport errno
 
 
-API_VERSION = 2
+API_VERSION = 3
 
 
 cdef extern from "sys/types.h":
 cdef extern from "sys/types.h":
     int ACL_TYPE_ACCESS
     int ACL_TYPE_ACCESS
     int ACL_TYPE_DEFAULT
     int ACL_TYPE_DEFAULT
+    ctypedef off64_t
 
 
 cdef extern from "sys/acl.h":
 cdef extern from "sys/acl.h":
     ctypedef struct _acl_t:
     ctypedef struct _acl_t:
@@ -23,6 +27,12 @@ cdef extern from "sys/acl.h":
 cdef extern from "acl/libacl.h":
 cdef extern from "acl/libacl.h":
     int acl_extended_file(const char *path)
     int acl_extended_file(const char *path)
 
 
+cdef extern from "fcntl.h":
+    int sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags)
+    unsigned int SYNC_FILE_RANGE_WRITE
+    unsigned int SYNC_FILE_RANGE_WAIT_BEFORE
+    unsigned int SYNC_FILE_RANGE_WAIT_AFTER
+
 
 
 _comment_re = re.compile(' *#.*', re.M)
 _comment_re = re.compile(' *#.*', re.M)
 
 
@@ -77,10 +87,6 @@ cdef acl_numeric_ids(acl):
 
 
 
 
 def acl_get(path, item, st, numeric_owner=False):
 def acl_get(path, item, st, numeric_owner=False):
-    """Saves ACL Entries
-
-    If `numeric_owner` is True the user/group field is not preserved only uid/gid
-    """
     cdef acl_t default_acl = NULL
     cdef acl_t default_acl = NULL
     cdef acl_t access_acl = NULL
     cdef acl_t access_acl = NULL
     cdef char *default_text = NULL
     cdef char *default_text = NULL
@@ -112,11 +118,6 @@ def acl_get(path, item, st, numeric_owner=False):
 
 
 
 
 def acl_set(path, item, numeric_owner=False):
 def acl_set(path, item, numeric_owner=False):
-    """Restore ACL Entries
-
-    If `numeric_owner` is True the stored uid/gid is used instead
-    of the user/group names
-    """
     cdef acl_t access_acl = NULL
     cdef acl_t access_acl = NULL
     cdef acl_t default_acl = NULL
     cdef acl_t default_acl = NULL
 
 
@@ -141,3 +142,45 @@ def acl_set(path, item, numeric_owner=False):
                 acl_set_file(p, ACL_TYPE_DEFAULT, default_acl)
                 acl_set_file(p, ACL_TYPE_DEFAULT, default_acl)
         finally:
         finally:
             acl_free(default_acl)
             acl_free(default_acl)
+
+cdef _sync_file_range(fd, offset, length, flags):
+    assert offset & PAGE_MASK == 0, "offset %d not page-aligned" % offset
+    assert length & PAGE_MASK == 0, "length %d not page-aligned" % length
+    if sync_file_range(fd, offset, length, flags) != 0:
+        raise OSError(errno, os.strerror(errno))
+    os.posix_fadvise(fd, offset, length, os.POSIX_FADV_DONTNEED)
+
+cdef unsigned PAGE_MASK = resource.getpagesize() - 1
+
+
+class SyncFile(BaseSyncFile):
+    """
+    Implemented using sync_file_range for asynchronous write-out and fdatasync for actual durability.
+
+    "write-out" means that dirty pages (= data that was written) are submitted to an I/O queue and will be send to
+    disk in the immediate future.
+    """
+
+    def __init__(self, path):
+        super().__init__(path)
+        self.offset = 0
+        self.write_window = (16 * 1024 ** 2) & ~PAGE_MASK
+        self.last_sync = 0
+        self.pending_sync = None
+
+    def write(self, data):
+        self.offset += self.fd.write(data)
+        offset = self.offset & ~PAGE_MASK
+        if offset >= self.last_sync + self.write_window:
+            self.fd.flush()
+            _sync_file_range(self.fileno, self.last_sync, offset - self.last_sync, SYNC_FILE_RANGE_WRITE)
+            if self.pending_sync is not None:
+                _sync_file_range(self.fileno, self.pending_sync, self.last_sync - self.pending_sync,
+                                 SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WAIT_AFTER)
+            self.pending_sync = self.last_sync
+            self.last_sync = offset
+
+    def sync(self):
+        self.fd.flush()
+        os.fdatasync(self.fileno)
+        os.posix_fadvise(self.fileno, 0, 0, os.POSIX_FADV_DONTNEED)

+ 20 - 18
borg/repository.py

@@ -17,6 +17,7 @@ from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, Progre
 from .hashindex import NSIndex
 from .hashindex import NSIndex
 from .locking import UpgradableLock, LockError, LockErrorT
 from .locking import UpgradableLock, LockError, LockErrorT
 from .lrucache import LRUCache
 from .lrucache import LRUCache
+from .platform import SyncFile, sync_dir
 
 
 MAX_OBJECT_SIZE = 20 * 1024 * 1024
 MAX_OBJECT_SIZE = 20 * 1024 * 1024
 MAGIC = b'BORG_SEG'
 MAGIC = b'BORG_SEG'
@@ -32,7 +33,7 @@ class Repository:
     On disk layout:
     On disk layout:
     dir/README
     dir/README
     dir/config
     dir/config
-    dir/data/<X / SEGMENTS_PER_DIR>/<X>
+    dir/data/<X // SEGMENTS_PER_DIR>/<X>
     dir/index.X
     dir/index.X
     dir/hints.X
     dir/hints.X
     """
     """
@@ -507,7 +508,7 @@ class LoggedIO:
     def __init__(self, path, limit, segments_per_dir, capacity=90):
     def __init__(self, path, limit, segments_per_dir, capacity=90):
         self.path = path
         self.path = path
         self.fds = LRUCache(capacity,
         self.fds = LRUCache(capacity,
-                            dispose=lambda fd: fd.close())
+                            dispose=self.close_fd)
         self.segment = 0
         self.segment = 0
         self.limit = limit
         self.limit = limit
         self.segments_per_dir = segments_per_dir
         self.segments_per_dir = segments_per_dir
@@ -519,6 +520,11 @@ class LoggedIO:
         self.fds.clear()
         self.fds.clear()
         self.fds = None  # Just to make sure we're disabled
         self.fds = None  # Just to make sure we're disabled
 
 
+    def close_fd(self, fd):
+        if hasattr(os, 'posix_fadvise'):  # only on UNIX
+            os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
+        fd.close()
+
     def segment_iterator(self, reverse=False):
     def segment_iterator(self, reverse=False):
         data_path = os.path.join(self.path, 'data')
         data_path = os.path.join(self.path, 'data')
         dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
         dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
@@ -535,7 +541,7 @@ class LoggedIO:
         return None
         return None
 
 
     def get_segments_transaction_id(self):
     def get_segments_transaction_id(self):
-        """Verify that the transaction id is consistent with the index transaction id
+        """Return the last committed segment.
         """
         """
         for segment, filename in self.segment_iterator(reverse=True):
         for segment, filename in self.segment_iterator(reverse=True):
             if self.is_committed_segment(filename):
             if self.is_committed_segment(filename):
@@ -578,7 +584,8 @@ class LoggedIO:
                 dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
                 dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
                 if not os.path.exists(dirname):
                 if not os.path.exists(dirname):
                     os.mkdir(dirname)
                     os.mkdir(dirname)
-            self._write_fd = open(self.segment_filename(self.segment), 'ab')
+                    sync_dir(os.path.join(self.path, 'data'))
+            self._write_fd = SyncFile(self.segment_filename(self.segment))
             self._write_fd.write(MAGIC)
             self._write_fd.write(MAGIC)
             self.offset = MAGIC_LEN
             self.offset = MAGIC_LEN
         return self._write_fd
         return self._write_fd
@@ -591,6 +598,13 @@ class LoggedIO:
             self.fds[segment] = fd
             self.fds[segment] = fd
             return fd
             return fd
 
 
+    def close_segment(self):
+        if self._write_fd:
+            self.segment += 1
+            self.offset = 0
+            self._write_fd.close()
+            self._write_fd = None
+
     def delete_segment(self, segment):
     def delete_segment(self, segment):
         if segment in self.fds:
         if segment in self.fds:
             del self.fds[segment]
             del self.fds[segment]
@@ -641,7 +655,7 @@ class LoggedIO:
 
 
     def read(self, segment, offset, id):
     def read(self, segment, offset, id):
         if segment == self.segment and self._write_fd:
         if segment == self.segment and self._write_fd:
-            self._write_fd.flush()
+            self._write_fd.sync()
         fd = self.get_fd(segment)
         fd = self.get_fd(segment)
         fd.seek(offset)
         fd.seek(offset)
         header = fd.read(self.put_header_fmt.size)
         header = fd.read(self.put_header_fmt.size)
@@ -703,20 +717,8 @@ class LoggedIO:
 
 
     def write_commit(self):
     def write_commit(self):
         fd = self.get_write_fd(no_new=True)
         fd = self.get_write_fd(no_new=True)
+        fd.sync()
         header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
         header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
         crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
         crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
         fd.write(b''.join((crc, header)))
         fd.write(b''.join((crc, header)))
         self.close_segment()
         self.close_segment()
-
-    def close_segment(self):
-        if self._write_fd:
-            self.segment += 1
-            self.offset = 0
-            self._write_fd.flush()
-            os.fsync(self._write_fd.fileno())
-            if hasattr(os, 'posix_fadvise'):  # only on UNIX
-                # tell the OS that it does not need to cache what we just wrote,
-                # avoids spoiling the cache for the OS and other processes.
-                os.posix_fadvise(self._write_fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
-            self._write_fd.close()
-            self._write_fd = None

+ 16 - 0
docs/usage.rst

@@ -170,6 +170,22 @@ Network:
 
 
 In case you are interested in more details, please read the internals documentation.
 In case you are interested in more details, please read the internals documentation.
 
 
+File systems
+~~~~~~~~~~~~
+
+We strongly recommend against using Borg (or any other database-like
+software) on non-journaling file systems like FAT, since it is not
+possible to assume any consistency in case of power failures (or a
+sudden disconnect of an external drive or similar failures).
+
+While Borg uses a data store that is resilient against these failures
+when used on journaling file systems, it is not possible to guarantee
+this with some hardware -- independent of the software used. We don't
+know a list of affected hardware.
+
+If you are suspicious whether your Borg repository is still consistent
+and readable after one of the failures mentioned above occured, run
+``borg check --verify-data`` to make sure it is consistent.
 
 
 Units
 Units
 ~~~~~
 ~~~~~