remote.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. import errno
  2. import fcntl
  3. import logging
  4. import os
  5. import select
  6. import shlex
  7. import sys
  8. import tempfile
  9. import textwrap
  10. import time
  11. from subprocess import Popen, PIPE
  12. from . import __version__
  13. from .helpers import Error, IntegrityError, sysinfo
  14. from .helpers import replace_placeholders
  15. from .helpers import BUFSIZE
  16. from .helpers import get_limited_unpacker
  17. from .repository import Repository
  18. from .logger import create_logger
  19. import msgpack
  20. logger = create_logger(__name__)
  21. RPC_PROTOCOL_VERSION = 2
  22. MAX_INFLIGHT = 100
  23. def os_write(fd, data):
  24. """os.write wrapper so we do not lose data for partial writes."""
  25. # TODO: this issue is fixed in cygwin since at least 2.8.0, remove this
  26. # wrapper / workaround when this version is considered ancient.
  27. # This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
  28. # and also due to its different blocking pipe behaviour compared to Linux/*BSD.
  29. # Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
  30. # signal, in which case serve() would terminate.
  31. amount = remaining = len(data)
  32. while remaining:
  33. count = os.write(fd, data)
  34. remaining -= count
  35. if not remaining:
  36. break
  37. data = data[count:]
  38. time.sleep(count * 1e-09)
  39. return amount
  40. class ConnectionClosed(Error):
  41. """Connection closed by remote host"""
  42. class ConnectionClosedWithHint(ConnectionClosed):
  43. """Connection closed by remote host. {}"""
  44. class PathNotAllowed(Error):
  45. """Repository path not allowed"""
  46. class InvalidRPCMethod(Error):
  47. """RPC method {} is not valid"""
  48. class UnexpectedRPCDataFormatFromClient(Error):
  49. """Borg {}: Got unexpected RPC data format from client."""
  50. class UnexpectedRPCDataFormatFromServer(Error):
  51. """Got unexpected RPC data format from server:\n{}"""
  52. def __init__(self, data):
  53. try:
  54. data = data.decode()[:128]
  55. except UnicodeDecodeError:
  56. data = data[:128]
  57. data = ['%02X' % byte for byte in data]
  58. data = textwrap.fill(' '.join(data), 16 * 3)
  59. super().__init__(data)
  60. class RepositoryServer: # pragma: no cover
  61. rpc_methods = (
  62. '__len__',
  63. 'check',
  64. 'commit',
  65. 'delete',
  66. 'destroy',
  67. 'get',
  68. 'list',
  69. 'negotiate',
  70. 'open',
  71. 'put',
  72. 'rollback',
  73. 'save_key',
  74. 'load_key',
  75. 'break_lock',
  76. )
  77. def __init__(self, restrict_to_paths, append_only):
  78. self.repository = None
  79. self.restrict_to_paths = restrict_to_paths
  80. self.append_only = append_only
  81. def serve(self):
  82. stdin_fd = sys.stdin.fileno()
  83. stdout_fd = sys.stdout.fileno()
  84. stderr_fd = sys.stdout.fileno()
  85. # Make stdin non-blocking
  86. fl = fcntl.fcntl(stdin_fd, fcntl.F_GETFL)
  87. fcntl.fcntl(stdin_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
  88. # Make stdout blocking
  89. fl = fcntl.fcntl(stdout_fd, fcntl.F_GETFL)
  90. fcntl.fcntl(stdout_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  91. # Make stderr blocking
  92. fl = fcntl.fcntl(stderr_fd, fcntl.F_GETFL)
  93. fcntl.fcntl(stderr_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  94. unpacker = get_limited_unpacker('server')
  95. while True:
  96. r, w, es = select.select([stdin_fd], [], [], 10)
  97. if r:
  98. data = os.read(stdin_fd, BUFSIZE)
  99. if not data:
  100. if self.repository is not None:
  101. self.repository.close()
  102. else:
  103. os_write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n"
  104. .format(__version__).encode())
  105. return
  106. unpacker.feed(data)
  107. for unpacked in unpacker:
  108. if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
  109. if self.repository is not None:
  110. self.repository.close()
  111. raise UnexpectedRPCDataFormatFromClient(__version__)
  112. type, msgid, method, args = unpacked
  113. method = method.decode('ascii')
  114. try:
  115. if method not in self.rpc_methods:
  116. raise InvalidRPCMethod(method)
  117. try:
  118. f = getattr(self, method)
  119. except AttributeError:
  120. f = getattr(self.repository, method)
  121. res = f(*args)
  122. except BaseException as e:
  123. # These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
  124. # and will be handled just like locally raised exceptions. Suppress the remote traceback
  125. # for these, except ErrorWithTraceback, which should always display a traceback.
  126. if not isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
  127. logging.exception('Borg %s: exception in RPC call:', __version__)
  128. logging.error(sysinfo())
  129. exc = "Remote Exception (see remote log for the traceback)"
  130. os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
  131. else:
  132. os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
  133. if es:
  134. self.repository.close()
  135. return
  136. def negotiate(self, versions):
  137. return RPC_PROTOCOL_VERSION
  138. def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False):
  139. path = os.fsdecode(path)
  140. if path.startswith('/~'): # /~/x = path x relative to home dir, /~username/x = relative to "user" home dir
  141. path = path[1:]
  142. elif path.startswith('/./'): # /./x = path x relative to cwd
  143. path = path[3:]
  144. path = os.path.realpath(os.path.expanduser(path))
  145. if self.restrict_to_paths:
  146. # if --restrict-to-path P is given, we make sure that we only operate in/below path P.
  147. # for the prefix check, it is important that the compared pathes both have trailing slashes,
  148. # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
  149. path_with_sep = os.path.join(path, '') # make sure there is a trailing slash (os.sep)
  150. for restrict_to_path in self.restrict_to_paths:
  151. restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), '') # trailing slash
  152. if path_with_sep.startswith(restrict_to_path_with_sep):
  153. break
  154. else:
  155. raise PathNotAllowed(path)
  156. self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
  157. append_only=self.append_only or append_only,
  158. exclusive=exclusive)
  159. self.repository.__enter__() # clean exit handled by serve() method
  160. return self.repository.id
  161. class RemoteRepository:
  162. extra_test_args = []
  163. class RPCError(Exception):
  164. def __init__(self, name, remote_type):
  165. self.name = name
  166. self.remote_type = remote_type
  167. class NoAppendOnlyOnServer(Error):
  168. """Server does not support --append-only."""
  169. def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None):
  170. self.location = self._location = location
  171. self.preload_ids = []
  172. self.msgid = 0
  173. self.to_send = b''
  174. self.cache = {}
  175. self.ignore_responses = set()
  176. self.responses = {}
  177. self.unpacker = get_limited_unpacker('client')
  178. self.p = None
  179. testing = location.host == '__testsuite__'
  180. borg_cmd = self.borg_cmd(args, testing)
  181. env = dict(os.environ)
  182. if not testing:
  183. borg_cmd = self.ssh_cmd(location) + borg_cmd
  184. # pyinstaller binary adds LD_LIBRARY_PATH=/tmp/_ME... but we do not want
  185. # that the system's ssh binary picks up (non-matching) libraries from there
  186. env.pop('LD_LIBRARY_PATH', None)
  187. env.pop('BORG_PASSPHRASE', None) # security: do not give secrets to subprocess
  188. env['BORG_VERSION'] = __version__
  189. logger.debug('SSH command line: %s', borg_cmd)
  190. self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
  191. self.stdin_fd = self.p.stdin.fileno()
  192. self.stdout_fd = self.p.stdout.fileno()
  193. self.stderr_fd = self.p.stderr.fileno()
  194. fcntl.fcntl(self.stdin_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdin_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  195. fcntl.fcntl(self.stdout_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdout_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  196. fcntl.fcntl(self.stderr_fd, fcntl.F_SETFL, fcntl.fcntl(self.stderr_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  197. self.r_fds = [self.stdout_fd, self.stderr_fd]
  198. self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
  199. try:
  200. try:
  201. version = self.call('negotiate', RPC_PROTOCOL_VERSION)
  202. except ConnectionClosed:
  203. raise ConnectionClosedWithHint('Is borg working on the server?') from None
  204. if version != RPC_PROTOCOL_VERSION:
  205. raise Exception('Server insisted on using unsupported protocol version %d' % version)
  206. try:
  207. self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only)
  208. except self.RPCError as err:
  209. if err.remote_type != 'TypeError':
  210. raise
  211. msg = """\
  212. Please note:
  213. If you see a TypeError complaining about the number of positional arguments
  214. given to open(), you can ignore it if it comes from a borg version < 1.0.7.
  215. This TypeError is a cosmetic side effect of the compatibility code borg
  216. clients >= 1.0.7 have to support older borg servers.
  217. This problem will go away as soon as the server has been upgraded to 1.0.7+.
  218. """
  219. # emit this msg in the same way as the "Remote: ..." lines that show the remote TypeError
  220. sys.stderr.write(msg)
  221. if append_only:
  222. raise self.NoAppendOnlyOnServer()
  223. self.id = self.call('open', self.location.path, create, lock_wait, lock)
  224. except Exception:
  225. self.close()
  226. raise
  227. def __del__(self):
  228. if len(self.responses):
  229. logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),))
  230. if self.p:
  231. self.close()
  232. assert False, "cleanup happened in Repository.__del__"
  233. def __repr__(self):
  234. return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path())
  235. def __enter__(self):
  236. return self
  237. def __exit__(self, exc_type, exc_val, exc_tb):
  238. try:
  239. if exc_type is not None:
  240. self.rollback()
  241. finally:
  242. # in any case, we want to cleanly close the repo, even if the
  243. # rollback can not succeed (e.g. because the connection was
  244. # already closed) and raised another exception:
  245. self.close()
  246. def borg_cmd(self, args, testing):
  247. """return a borg serve command line"""
  248. # give some args/options to "borg serve" process as they were given to us
  249. opts = []
  250. if args is not None:
  251. opts.append('--umask=%03o' % args.umask)
  252. root_logger = logging.getLogger()
  253. if root_logger.isEnabledFor(logging.DEBUG):
  254. opts.append('--debug')
  255. elif root_logger.isEnabledFor(logging.INFO):
  256. opts.append('--info')
  257. elif root_logger.isEnabledFor(logging.WARNING):
  258. pass # warning is default
  259. elif root_logger.isEnabledFor(logging.ERROR):
  260. opts.append('--error')
  261. elif root_logger.isEnabledFor(logging.CRITICAL):
  262. opts.append('--critical')
  263. else:
  264. raise ValueError('log level missing, fix this code')
  265. if testing:
  266. return [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
  267. else: # pragma: no cover
  268. remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
  269. remote_path = replace_placeholders(remote_path)
  270. return [remote_path, 'serve'] + opts
  271. def ssh_cmd(self, location):
  272. """return a ssh command line that can be prefixed to a borg command line"""
  273. args = shlex.split(os.environ.get('BORG_RSH', 'ssh'))
  274. if location.port:
  275. args += ['-p', str(location.port)]
  276. if location.user:
  277. args.append('%s@%s' % (location.user, location.host))
  278. else:
  279. args.append('%s' % location.host)
  280. return args
  281. def call(self, cmd, *args, **kw):
  282. for resp in self.call_many(cmd, [args], **kw):
  283. return resp
  284. def call_many(self, cmd, calls, wait=True, is_preloaded=False):
  285. if not calls:
  286. return
  287. def fetch_from_cache(args):
  288. msgid = self.cache[args].pop(0)
  289. if not self.cache[args]:
  290. del self.cache[args]
  291. return msgid
  292. def handle_error(error, res):
  293. if error == b'DoesNotExist':
  294. raise Repository.DoesNotExist(self.location.orig)
  295. elif error == b'AlreadyExists':
  296. raise Repository.AlreadyExists(self.location.orig)
  297. elif error == b'CheckNeeded':
  298. raise Repository.CheckNeeded(self.location.orig)
  299. elif error == b'IntegrityError':
  300. raise IntegrityError(res)
  301. elif error == b'PathNotAllowed':
  302. raise PathNotAllowed(*res)
  303. elif error == b'ObjectNotFound':
  304. raise Repository.ObjectNotFound(res[0], self.location.orig)
  305. elif error == b'InvalidRPCMethod':
  306. raise InvalidRPCMethod(*res)
  307. else:
  308. raise self.RPCError(res.decode('utf-8'), error.decode('utf-8'))
  309. calls = list(calls)
  310. waiting_for = []
  311. while wait or calls:
  312. while waiting_for:
  313. try:
  314. error, res = self.responses.pop(waiting_for[0])
  315. waiting_for.pop(0)
  316. if error:
  317. handle_error(error, res)
  318. else:
  319. yield res
  320. if not waiting_for and not calls:
  321. return
  322. except KeyError:
  323. break
  324. if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
  325. w_fds = [self.stdin_fd]
  326. else:
  327. w_fds = []
  328. r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
  329. if x:
  330. raise Exception('FD exception occurred')
  331. for fd in r:
  332. if fd is self.stdout_fd:
  333. data = os.read(fd, BUFSIZE)
  334. if not data:
  335. raise ConnectionClosed()
  336. self.unpacker.feed(data)
  337. for unpacked in self.unpacker:
  338. if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
  339. raise UnexpectedRPCDataFormatFromServer(data)
  340. type, msgid, error, res = unpacked
  341. if msgid in self.ignore_responses:
  342. self.ignore_responses.remove(msgid)
  343. if error:
  344. handle_error(error, res)
  345. else:
  346. self.responses[msgid] = error, res
  347. elif fd is self.stderr_fd:
  348. data = os.read(fd, 32768)
  349. if not data:
  350. raise ConnectionClosed()
  351. data = data.decode('utf-8')
  352. for line in data.splitlines(keepends=True):
  353. if line.startswith('$LOG '):
  354. _, level, msg = line.split(' ', 2)
  355. level = getattr(logging, level, logging.CRITICAL) # str -> int
  356. logging.log(level, msg.rstrip())
  357. else:
  358. sys.stderr.write("Remote: " + line)
  359. if w:
  360. while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
  361. if calls:
  362. if is_preloaded:
  363. if calls[0] in self.cache:
  364. waiting_for.append(fetch_from_cache(calls.pop(0)))
  365. else:
  366. args = calls.pop(0)
  367. if cmd == 'get' and args in self.cache:
  368. waiting_for.append(fetch_from_cache(args))
  369. else:
  370. self.msgid += 1
  371. waiting_for.append(self.msgid)
  372. self.to_send = msgpack.packb((1, self.msgid, cmd, args))
  373. if not self.to_send and self.preload_ids:
  374. args = (self.preload_ids.pop(0),)
  375. self.msgid += 1
  376. self.cache.setdefault(args, []).append(self.msgid)
  377. self.to_send = msgpack.packb((1, self.msgid, cmd, args))
  378. if self.to_send:
  379. try:
  380. self.to_send = self.to_send[os.write(self.stdin_fd, self.to_send):]
  381. except OSError as e:
  382. # io.write might raise EAGAIN even though select indicates
  383. # that the fd should be writable
  384. if e.errno != errno.EAGAIN:
  385. raise
  386. self.ignore_responses |= set(waiting_for)
  387. def check(self, repair=False, save_space=False):
  388. return self.call('check', repair, save_space)
  389. def commit(self, save_space=False):
  390. return self.call('commit', save_space)
  391. def rollback(self, *args):
  392. return self.call('rollback')
  393. def destroy(self):
  394. return self.call('destroy')
  395. def __len__(self):
  396. return self.call('__len__')
  397. def list(self, limit=None, marker=None):
  398. return self.call('list', limit, marker)
  399. def get(self, id_):
  400. for resp in self.get_many([id_]):
  401. return resp
  402. def get_many(self, ids, is_preloaded=False):
  403. for resp in self.call_many('get', [(id_,) for id_ in ids], is_preloaded=is_preloaded):
  404. yield resp
  405. def put(self, id_, data, wait=True):
  406. return self.call('put', id_, data, wait=wait)
  407. def delete(self, id_, wait=True):
  408. return self.call('delete', id_, wait=wait)
  409. def save_key(self, keydata):
  410. return self.call('save_key', keydata)
  411. def load_key(self):
  412. return self.call('load_key')
  413. def break_lock(self):
  414. return self.call('break_lock')
  415. def close(self):
  416. if self.p:
  417. self.p.stdin.close()
  418. self.p.stdout.close()
  419. self.p.wait()
  420. self.p = None
  421. def preload(self, ids):
  422. self.preload_ids += ids
  423. class RepositoryNoCache:
  424. """A not caching Repository wrapper, passes through to repository.
  425. Just to have same API (including the context manager) as RepositoryCache.
  426. """
  427. def __init__(self, repository):
  428. self.repository = repository
  429. def close(self):
  430. pass
  431. def __enter__(self):
  432. return self
  433. def __exit__(self, exc_type, exc_val, exc_tb):
  434. self.close()
  435. def get(self, key):
  436. return next(self.get_many([key]))
  437. def get_many(self, keys):
  438. for data in self.repository.get_many(keys):
  439. yield data
  440. class RepositoryCache(RepositoryNoCache):
  441. """A caching Repository wrapper
  442. Caches Repository GET operations using a local temporary Repository.
  443. """
  444. # maximum object size that will be cached, 64 kiB.
  445. THRESHOLD = 2**16
  446. def __init__(self, repository):
  447. super().__init__(repository)
  448. tmppath = tempfile.mkdtemp(prefix='borg-tmp')
  449. self.caching_repo = Repository(tmppath, create=True, exclusive=True)
  450. self.caching_repo.__enter__() # handled by context manager in base class
  451. def close(self):
  452. if self.caching_repo is not None:
  453. self.caching_repo.destroy()
  454. self.caching_repo = None
  455. def get_many(self, keys):
  456. unknown_keys = [key for key in keys if key not in self.caching_repo]
  457. repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
  458. for key in keys:
  459. try:
  460. yield self.caching_repo.get(key)
  461. except Repository.ObjectNotFound:
  462. for key_, data in repository_iterator:
  463. if key_ == key:
  464. if len(data) <= self.THRESHOLD:
  465. self.caching_repo.put(key, data)
  466. yield data
  467. break
  468. # Consume any pending requests
  469. for _ in repository_iterator:
  470. pass
  471. def cache_if_remote(repository):
  472. if isinstance(repository, RemoteRepository):
  473. return RepositoryCache(repository)
  474. else:
  475. return RepositoryNoCache(repository)