Sfoglia il codice sorgente

PR #279 - Merge branch 'fadvise' into merge

Thomas Waldmann 10 anni fa
parent
commit
3653acd259
4 ha cambiato i file con 67 aggiunte e 24 eliminazioni
  1. 46 13
      attic/_chunker.c
  2. 6 7
      attic/archive.py
  3. 11 4
      attic/chunker.pyx
  4. 4 0
      attic/repository.py

+ 46 - 13
attic/_chunker.c

@@ -1,4 +1,5 @@
 #include <Python.h>
+#include <fcntl.h>
 
 /* Cyclic polynomial / buzhash: https://en.wikipedia.org/wiki/Rolling_hash */
 
@@ -78,8 +79,9 @@ typedef struct {
     int window_size, chunk_mask, min_size;
     size_t buf_size;
     uint32_t *table;
-    uint8_t *data;
+    uint8_t *data, *read_buf;
     PyObject *fd;
+    int fh;
     int done, eof;
     size_t remaining, bytes_read, bytes_yielded, position, last;
 } Chunker;
@@ -94,15 +96,17 @@ chunker_init(int window_size, int chunk_mask, int min_size, uint32_t seed)
     c->table = buzhash_init_table(seed);
     c->buf_size = 10 * 1024 * 1024;
     c->data = malloc(c->buf_size);
+    c->read_buf = malloc(c->buf_size);
     return c;
 }
 
 static void
-chunker_set_fd(Chunker *c, PyObject *fd)
+chunker_set_fd(Chunker *c, PyObject *fd, int fh)
 {
     Py_XDECREF(c->fd);
     c->fd = fd;
     Py_INCREF(fd);
+    c->fh = fh;
     c->done = 0;
     c->remaining = 0;
     c->bytes_read = 0;
@@ -118,6 +122,7 @@ chunker_free(Chunker *c)
     Py_XDECREF(c->fd);
     free(c->table);
     free(c->data);
+    free(c->read_buf);
     free(c);
 }
 
@@ -133,20 +138,48 @@ chunker_fill(Chunker *c)
     if(c->eof || n == 0) {
         return 1;
     }
-    data = PyObject_CallMethod(c->fd, "read", "i", n);
-    if(!data) {
-        return 0;
-    }
-    n = PyBytes_Size(data);
-    if(n) {
-        memcpy(c->data + c->position + c->remaining, PyBytes_AsString(data), n);
-        c->remaining += n;
-        c->bytes_read += n;
+    if(c->fh >= 0) {
+        // if we have a os-level file descriptor, use os-level API
+        n = read(c->fh, c->read_buf, n);
+        if(n > 0) {
+            memcpy(c->data + c->position + c->remaining, c->read_buf, n);
+            c->remaining += n;
+            c->bytes_read += n;
+        }
+        else
+        if(n == 0) {
+            c->eof = 1;
+        }
+        else {
+            // some error happened
+            return 0;
+        }
+        #if ( _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L )
+        // We tell the OS that we do not need the data of this file any more
+        // that it maybe has in the cache. This avoids that we spoil the
+        // complete cache with data that we only read once and (due to cache
+        // size limit) kick out data from the cache that might be still useful
+        // for the OS or other processes.
+        posix_fadvise(c->fh, (off_t) 0, (off_t) 0, POSIX_FADV_DONTNEED);
+        #endif
     }
     else {
-        c->eof = 1;
+        // no os-level file descriptor, use Python file object API
+        data = PyObject_CallMethod(c->fd, "read", "i", n);
+        if(!data) {
+            return 0;
+        }
+        n = PyBytes_Size(data);
+        if(n) {
+            memcpy(c->data + c->position + c->remaining, PyBytes_AsString(data), n);
+            c->remaining += n;
+            c->bytes_read += n;
+        }
+        else {
+            c->eof = 1;
+        }
+        Py_DECREF(data);
     }
-    Py_DECREF(data);
     return 1;
 }
 

+ 6 - 7
attic/archive.py

@@ -464,9 +464,10 @@ class Archive:
             status = 'A'  # regular file, added
         # Only chunkify the file if needed
         if chunks is None:
-            with Archive._open_rb(path, st) as fd:
+            fh = Archive._open_rb(path, st)
+            with os.fdopen(fh, 'rb') as fd:
                 chunks = []
-                for chunk in self.chunker.chunkify(fd):
+                for chunk in self.chunker.chunkify(fd, fh):
                     chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
             cache.memorize_file(path_hash, st, [c[0] for c in chunks])
             status = status or 'M'  # regular file, modified (if not 'A' already)
@@ -488,12 +489,10 @@ class Archive:
         euid = None
 
         def open_simple(p, s):
-            fd = os.open(p, flags_normal)
-            return os.fdopen(fd, 'rb')
+            return os.open(p, flags_normal)
 
         def open_noatime(p, s):
-            fd = os.open(p, flags_noatime)
-            return os.fdopen(fd, 'rb')
+            return os.open(p, flags_noatime)
 
         def open_noatime_if_owner(p, s):
             if euid == 0 or s.st_uid == euid:
@@ -515,7 +514,7 @@ class Archive:
                 # So in future, let's check whether the file is owned by us
                 # before attempting to use O_NOATIME.
                 Archive._open_rb = open_noatime_if_owner
-            return os.fdopen(fd, 'rb')
+            return fd
 
         if flags_noatime != flags_normal:
             # Always use O_NOATIME version.

+ 11 - 4
attic/chunker.pyx

@@ -9,7 +9,7 @@ cdef extern from "_chunker.c":
     ctypedef struct _Chunker "Chunker":
         pass
     _Chunker *chunker_init(int window_size, int chunk_mask, int min_size, uint32_t seed)
-    void chunker_set_fd(_Chunker *chunker, object fd)
+    void chunker_set_fd(_Chunker *chunker, object f, int fd)
     void chunker_free(_Chunker *chunker)
     object chunker_process(_Chunker *chunker)
     uint32_t *buzhash_init_table(uint32_t seed)
@@ -23,8 +23,15 @@ cdef class Chunker:
     def __cinit__(self, window_size, chunk_mask, min_size, seed):
         self.chunker = chunker_init(window_size, chunk_mask, min_size, seed & 0xffffffff)
 
-    def chunkify(self, fd):
-        chunker_set_fd(self.chunker, fd)
+    def chunkify(self, fd, fh=-1):
+        """
+        Cut a file into chunks.
+
+        :param fd: Python file object
+        :param fh: OS-level file handle (if available),
+                   defaults to -1 which means not to use OS-level fd.
+        """
+        chunker_set_fd(self.chunker, fd, fh)
         return self
 
     def __dealloc__(self):
@@ -52,4 +59,4 @@ def buzhash_update(uint32_t sum, unsigned char remove, unsigned char add, size_t
     table = buzhash_init_table(seed & 0xffffffff)
     sum = c_buzhash_update(sum, remove, add, len, table)
     free(table)
-    return sum
+    return sum

+ 4 - 0
attic/repository.py

@@ -555,6 +555,10 @@ class LoggedIO:
         header = self.header_no_crc_fmt.pack(size, TAG_PUT)
         crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
         fd.write(b''.join((crc, header, id, data)))
+        if hasattr(os, 'posix_fadvise'):  # python >= 3.3, only on UNIX
+            # tell the OS that it does not need to cache what we just wrote,
+            # avoids spoiling the cache for the OS and other processes.
+            os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
         self.offset += size
         return self.segment, offset