|
@@ -4,6 +4,7 @@ import socket
|
|
import stat
|
|
import stat
|
|
import sys
|
|
import sys
|
|
import time
|
|
import time
|
|
|
|
+from contextlib import contextmanager
|
|
from datetime import datetime, timezone
|
|
from datetime import datetime, timezone
|
|
from getpass import getuser
|
|
from getpass import getuser
|
|
from io import BytesIO
|
|
from io import BytesIO
|
|
@@ -97,6 +98,37 @@ class Statistics:
|
|
print(msg, file=stream or sys.stderr, end="\r", flush=True)
|
|
print(msg, file=stream or sys.stderr, end="\r", flush=True)
|
|
|
|
|
|
|
|
|
|
|
|
+class InputOSError(Exception):
|
|
|
|
+ """Wrapper for OSError raised while accessing input files."""
|
|
|
|
+ def __init__(self, os_error):
|
|
|
|
+ self.os_error = os_error
|
|
|
|
+ self.errno = os_error.errno
|
|
|
|
+ self.strerror = os_error.strerror
|
|
|
|
+ self.filename = os_error.filename
|
|
|
|
+
|
|
|
|
+ def __str__(self):
|
|
|
|
+ return str(self.os_error)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@contextmanager
|
|
|
|
+def input_io():
|
|
|
|
+ """Context manager changing OSError to InputOSError."""
|
|
|
|
+ try:
|
|
|
|
+ yield
|
|
|
|
+ except OSError as os_error:
|
|
|
|
+ raise InputOSError(os_error) from os_error
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def input_io_iter(iterator):
|
|
|
|
+ while True:
|
|
|
|
+ try:
|
|
|
|
+ with input_io():
|
|
|
|
+ item = next(iterator)
|
|
|
|
+ except StopIteration:
|
|
|
|
+ return
|
|
|
|
+ yield item
|
|
|
|
+
|
|
|
|
+
|
|
class DownloadPipeline:
|
|
class DownloadPipeline:
|
|
|
|
|
|
def __init__(self, repository, key):
|
|
def __init__(self, repository, key):
|
|
@@ -560,13 +592,15 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
)
|
|
)
|
|
if self.numeric_owner:
|
|
if self.numeric_owner:
|
|
attrs['user'] = attrs['group'] = None
|
|
attrs['user'] = attrs['group'] = None
|
|
- xattrs = xattr.get_all(path, follow_symlinks=False)
|
|
|
|
|
|
+ with input_io():
|
|
|
|
+ xattrs = xattr.get_all(path, follow_symlinks=False)
|
|
if xattrs:
|
|
if xattrs:
|
|
attrs['xattrs'] = StableDict(xattrs)
|
|
attrs['xattrs'] = StableDict(xattrs)
|
|
bsdflags = get_flags(path, st)
|
|
bsdflags = get_flags(path, st)
|
|
if bsdflags:
|
|
if bsdflags:
|
|
attrs['bsdflags'] = bsdflags
|
|
attrs['bsdflags'] = bsdflags
|
|
- acl_get(path, attrs, st, self.numeric_owner)
|
|
|
|
|
|
+ with input_io():
|
|
|
|
+ acl_get(path, attrs, st, self.numeric_owner)
|
|
return attrs
|
|
return attrs
|
|
|
|
|
|
def process_dir(self, path, st):
|
|
def process_dir(self, path, st):
|
|
@@ -601,7 +635,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
uid, gid = 0, 0
|
|
uid, gid = 0, 0
|
|
fd = sys.stdin.buffer # binary
|
|
fd = sys.stdin.buffer # binary
|
|
chunks = []
|
|
chunks = []
|
|
- for data in self.chunker.chunkify(fd):
|
|
|
|
|
|
+ for data in input_io_iter(self.chunker.chunkify(fd)):
|
|
chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
|
|
chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
|
|
self.stats.nfiles += 1
|
|
self.stats.nfiles += 1
|
|
t = int(time.time()) * 1000000000
|
|
t = int(time.time()) * 1000000000
|
|
@@ -654,10 +688,11 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
if chunks is None:
|
|
if chunks is None:
|
|
compress = self.compression_decider1.decide(path)
|
|
compress = self.compression_decider1.decide(path)
|
|
logger.debug('%s -> compression %s', path, compress['name'])
|
|
logger.debug('%s -> compression %s', path, compress['name'])
|
|
- fh = Archive._open_rb(path)
|
|
|
|
|
|
+ with input_io():
|
|
|
|
+ fh = Archive._open_rb(path)
|
|
with os.fdopen(fh, 'rb') as fd:
|
|
with os.fdopen(fh, 'rb') as fd:
|
|
chunks = []
|
|
chunks = []
|
|
- for data in self.chunker.chunkify(fd, fh):
|
|
|
|
|
|
+ for data in input_io_iter(self.chunker.chunkify(fd, fh)):
|
|
chunks.append(cache.add_chunk(self.key.id_hash(data),
|
|
chunks.append(cache.add_chunk(self.key.id_hash(data),
|
|
Chunk(data, compress=compress),
|
|
Chunk(data, compress=compress),
|
|
self.stats))
|
|
self.stats))
|