remote.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. import errno
  2. import fcntl
  3. import logging
  4. import os
  5. import select
  6. import shlex
  7. from subprocess import Popen, PIPE
  8. import sys
  9. import tempfile
  10. from . import __version__
  11. from .helpers import Error, IntegrityError, sysinfo
  12. from .helpers import replace_placeholders
  13. from .repository import Repository
  14. import msgpack
  15. RPC_PROTOCOL_VERSION = 2
  16. BUFSIZE = 10 * 1024 * 1024
  17. MAX_INFLIGHT = 100
  18. class ConnectionClosed(Error):
  19. """Connection closed by remote host"""
  20. class ConnectionClosedWithHint(ConnectionClosed):
  21. """Connection closed by remote host. {}"""
  22. class PathNotAllowed(Error):
  23. """Repository path not allowed"""
  24. class InvalidRPCMethod(Error):
  25. """RPC method {} is not valid"""
  26. class UnexpectedRPCDataFormatFromClient(Error):
  27. """Borg {}: Got unexpected RPC data format from client."""
  28. class UnexpectedRPCDataFormatFromServer(Error):
  29. """Got unexpected RPC data format from server."""
  30. class RepositoryServer: # pragma: no cover
  31. rpc_methods = (
  32. '__len__',
  33. 'check',
  34. 'commit',
  35. 'delete',
  36. 'destroy',
  37. 'get',
  38. 'list',
  39. 'negotiate',
  40. 'open',
  41. 'put',
  42. 'rollback',
  43. 'save_key',
  44. 'load_key',
  45. 'break_lock',
  46. )
  47. def __init__(self, restrict_to_paths, append_only):
  48. self.repository = None
  49. self.restrict_to_paths = restrict_to_paths
  50. self.append_only = append_only
  51. def serve(self):
  52. stdin_fd = sys.stdin.fileno()
  53. stdout_fd = sys.stdout.fileno()
  54. stderr_fd = sys.stdout.fileno()
  55. # Make stdin non-blocking
  56. fl = fcntl.fcntl(stdin_fd, fcntl.F_GETFL)
  57. fcntl.fcntl(stdin_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
  58. # Make stdout blocking
  59. fl = fcntl.fcntl(stdout_fd, fcntl.F_GETFL)
  60. fcntl.fcntl(stdout_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  61. # Make stderr blocking
  62. fl = fcntl.fcntl(stderr_fd, fcntl.F_GETFL)
  63. fcntl.fcntl(stderr_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  64. unpacker = msgpack.Unpacker(use_list=False)
  65. while True:
  66. r, w, es = select.select([stdin_fd], [], [], 10)
  67. if r:
  68. data = os.read(stdin_fd, BUFSIZE)
  69. if not data:
  70. if self.repository is not None:
  71. self.repository.close()
  72. else:
  73. os.write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n"
  74. .format(__version__).encode())
  75. return
  76. unpacker.feed(data)
  77. for unpacked in unpacker:
  78. if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
  79. if self.repository is not None:
  80. self.repository.close()
  81. raise UnexpectedRPCDataFormatFromClient(__version__)
  82. type, msgid, method, args = unpacked
  83. method = method.decode('ascii')
  84. try:
  85. if method not in self.rpc_methods:
  86. raise InvalidRPCMethod(method)
  87. try:
  88. f = getattr(self, method)
  89. except AttributeError:
  90. f = getattr(self.repository, method)
  91. res = f(*args)
  92. except BaseException as e:
  93. # These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
  94. # and will be handled just like locally raised exceptions. Suppress the remote traceback
  95. # for these, except ErrorWithTraceback, which should always display a traceback.
  96. if not isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
  97. logging.exception('Borg %s: exception in RPC call:', __version__)
  98. logging.error(sysinfo())
  99. exc = "Remote Exception (see remote log for the traceback)"
  100. os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
  101. else:
  102. os.write(stdout_fd, msgpack.packb((1, msgid, None, res)))
  103. if es:
  104. self.repository.close()
  105. return
  106. def negotiate(self, versions):
  107. return RPC_PROTOCOL_VERSION
  108. def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False):
  109. path = os.fsdecode(path)
  110. if path.startswith('/~'): # /~/x = path x relative to home dir, /~username/x = relative to "user" home dir
  111. path = path[1:]
  112. elif path.startswith('/./'): # /./x = path x relative to cwd
  113. path = path[3:]
  114. path = os.path.realpath(os.path.expanduser(path))
  115. if self.restrict_to_paths:
  116. # if --restrict-to-path P is given, we make sure that we only operate in/below path P.
  117. # for the prefix check, it is important that the compared pathes both have trailing slashes,
  118. # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
  119. path_with_sep = os.path.join(path, '') # make sure there is a trailing slash (os.sep)
  120. for restrict_to_path in self.restrict_to_paths:
  121. restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), '') # trailing slash
  122. if path_with_sep.startswith(restrict_to_path_with_sep):
  123. break
  124. else:
  125. raise PathNotAllowed(path)
  126. self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
  127. append_only=self.append_only or append_only,
  128. exclusive=exclusive)
  129. self.repository.__enter__() # clean exit handled by serve() method
  130. return self.repository.id
  131. class RemoteRepository:
  132. extra_test_args = []
  133. class RPCError(Exception):
  134. def __init__(self, name, remote_type):
  135. self.name = name
  136. self.remote_type = remote_type
  137. class NoAppendOnlyOnServer(Error):
  138. """Server does not support --append-only."""
  139. def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None):
  140. self.location = self._location = location
  141. self.preload_ids = []
  142. self.msgid = 0
  143. self.to_send = b''
  144. self.cache = {}
  145. self.ignore_responses = set()
  146. self.responses = {}
  147. self.unpacker = msgpack.Unpacker(use_list=False)
  148. self.p = None
  149. testing = location.host == '__testsuite__'
  150. borg_cmd = self.borg_cmd(args, testing)
  151. env = dict(os.environ)
  152. if not testing:
  153. borg_cmd = self.ssh_cmd(location) + borg_cmd
  154. # pyinstaller binary adds LD_LIBRARY_PATH=/tmp/_ME... but we do not want
  155. # that the system's ssh binary picks up (non-matching) libraries from there
  156. env.pop('LD_LIBRARY_PATH', None)
  157. env.pop('BORG_PASSPHRASE', None) # security: do not give secrets to subprocess
  158. env['BORG_VERSION'] = __version__
  159. self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
  160. self.stdin_fd = self.p.stdin.fileno()
  161. self.stdout_fd = self.p.stdout.fileno()
  162. self.stderr_fd = self.p.stderr.fileno()
  163. fcntl.fcntl(self.stdin_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdin_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  164. fcntl.fcntl(self.stdout_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdout_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  165. fcntl.fcntl(self.stderr_fd, fcntl.F_SETFL, fcntl.fcntl(self.stderr_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  166. self.r_fds = [self.stdout_fd, self.stderr_fd]
  167. self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
  168. try:
  169. try:
  170. version = self.call('negotiate', RPC_PROTOCOL_VERSION)
  171. except ConnectionClosed:
  172. raise ConnectionClosedWithHint('Is borg working on the server?') from None
  173. if version != RPC_PROTOCOL_VERSION:
  174. raise Exception('Server insisted on using unsupported protocol version %d' % version)
  175. try:
  176. self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only)
  177. except self.RPCError as err:
  178. if err.remote_type != 'TypeError':
  179. raise
  180. msg = """\
  181. Please note:
  182. If you see a TypeError complaining about the number of positional arguments
  183. given to open(), you can ignore it if it comes from a borg version < 1.0.7.
  184. This TypeError is a cosmetic side effect of the compatibility code borg
  185. clients >= 1.0.7 have to support older borg servers.
  186. This problem will go away as soon as the server has been upgraded to 1.0.7+.
  187. """
  188. # emit this msg in the same way as the "Remote: ..." lines that show the remote TypeError
  189. sys.stderr.write(msg)
  190. if append_only:
  191. raise self.NoAppendOnlyOnServer()
  192. self.id = self.call('open', self.location.path, create, lock_wait, lock)
  193. except Exception:
  194. self.close()
  195. raise
  196. def __del__(self):
  197. if len(self.responses):
  198. logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),))
  199. if self.p:
  200. self.close()
  201. assert False, "cleanup happened in Repository.__del__"
  202. def __repr__(self):
  203. return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path())
  204. def __enter__(self):
  205. return self
  206. def __exit__(self, exc_type, exc_val, exc_tb):
  207. try:
  208. if exc_type is not None:
  209. self.rollback()
  210. finally:
  211. # in any case, we want to cleanly close the repo, even if the
  212. # rollback can not succeed (e.g. because the connection was
  213. # already closed) and raised another exception:
  214. self.close()
  215. def borg_cmd(self, args, testing):
  216. """return a borg serve command line"""
  217. # give some args/options to "borg serve" process as they were given to us
  218. opts = []
  219. if args is not None:
  220. opts.append('--umask=%03o' % args.umask)
  221. root_logger = logging.getLogger()
  222. if root_logger.isEnabledFor(logging.DEBUG):
  223. opts.append('--debug')
  224. elif root_logger.isEnabledFor(logging.INFO):
  225. opts.append('--info')
  226. elif root_logger.isEnabledFor(logging.WARNING):
  227. pass # warning is default
  228. elif root_logger.isEnabledFor(logging.ERROR):
  229. opts.append('--error')
  230. elif root_logger.isEnabledFor(logging.CRITICAL):
  231. opts.append('--critical')
  232. else:
  233. raise ValueError('log level missing, fix this code')
  234. if testing:
  235. return [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
  236. else: # pragma: no cover
  237. remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
  238. remote_path = replace_placeholders(remote_path)
  239. return [remote_path, 'serve'] + opts
  240. def ssh_cmd(self, location):
  241. """return a ssh command line that can be prefixed to a borg command line"""
  242. args = shlex.split(os.environ.get('BORG_RSH', 'ssh'))
  243. if location.port:
  244. args += ['-p', str(location.port)]
  245. if location.user:
  246. args.append('%s@%s' % (location.user, location.host))
  247. else:
  248. args.append('%s' % location.host)
  249. return args
  250. def call(self, cmd, *args, **kw):
  251. for resp in self.call_many(cmd, [args], **kw):
  252. return resp
  253. def call_many(self, cmd, calls, wait=True, is_preloaded=False):
  254. if not calls:
  255. return
  256. def fetch_from_cache(args):
  257. msgid = self.cache[args].pop(0)
  258. if not self.cache[args]:
  259. del self.cache[args]
  260. return msgid
  261. def handle_error(error, res):
  262. if error == b'DoesNotExist':
  263. raise Repository.DoesNotExist(self.location.orig)
  264. elif error == b'AlreadyExists':
  265. raise Repository.AlreadyExists(self.location.orig)
  266. elif error == b'CheckNeeded':
  267. raise Repository.CheckNeeded(self.location.orig)
  268. elif error == b'IntegrityError':
  269. raise IntegrityError(res)
  270. elif error == b'PathNotAllowed':
  271. raise PathNotAllowed(*res)
  272. elif error == b'ObjectNotFound':
  273. raise Repository.ObjectNotFound(res[0], self.location.orig)
  274. elif error == b'InvalidRPCMethod':
  275. raise InvalidRPCMethod(*res)
  276. else:
  277. raise self.RPCError(res.decode('utf-8'), error.decode('utf-8'))
  278. calls = list(calls)
  279. waiting_for = []
  280. while wait or calls:
  281. while waiting_for:
  282. try:
  283. error, res = self.responses.pop(waiting_for[0])
  284. waiting_for.pop(0)
  285. if error:
  286. handle_error(error, res)
  287. else:
  288. yield res
  289. if not waiting_for and not calls:
  290. return
  291. except KeyError:
  292. break
  293. if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
  294. w_fds = [self.stdin_fd]
  295. else:
  296. w_fds = []
  297. r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
  298. if x:
  299. raise Exception('FD exception occurred')
  300. for fd in r:
  301. if fd is self.stdout_fd:
  302. data = os.read(fd, BUFSIZE)
  303. if not data:
  304. raise ConnectionClosed()
  305. self.unpacker.feed(data)
  306. for unpacked in self.unpacker:
  307. if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
  308. raise UnexpectedRPCDataFormatFromServer()
  309. type, msgid, error, res = unpacked
  310. if msgid in self.ignore_responses:
  311. self.ignore_responses.remove(msgid)
  312. if error:
  313. handle_error(error, res)
  314. else:
  315. self.responses[msgid] = error, res
  316. elif fd is self.stderr_fd:
  317. data = os.read(fd, 32768)
  318. if not data:
  319. raise ConnectionClosed()
  320. data = data.decode('utf-8')
  321. for line in data.splitlines(keepends=True):
  322. if line.startswith('$LOG '):
  323. _, level, msg = line.split(' ', 2)
  324. level = getattr(logging, level, logging.CRITICAL) # str -> int
  325. logging.log(level, msg.rstrip())
  326. else:
  327. sys.stderr.write("Remote: " + line)
  328. if w:
  329. while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
  330. if calls:
  331. if is_preloaded:
  332. if calls[0] in self.cache:
  333. waiting_for.append(fetch_from_cache(calls.pop(0)))
  334. else:
  335. args = calls.pop(0)
  336. if cmd == 'get' and args in self.cache:
  337. waiting_for.append(fetch_from_cache(args))
  338. else:
  339. self.msgid += 1
  340. waiting_for.append(self.msgid)
  341. self.to_send = msgpack.packb((1, self.msgid, cmd, args))
  342. if not self.to_send and self.preload_ids:
  343. args = (self.preload_ids.pop(0),)
  344. self.msgid += 1
  345. self.cache.setdefault(args, []).append(self.msgid)
  346. self.to_send = msgpack.packb((1, self.msgid, cmd, args))
  347. if self.to_send:
  348. try:
  349. self.to_send = self.to_send[os.write(self.stdin_fd, self.to_send):]
  350. except OSError as e:
  351. # io.write might raise EAGAIN even though select indicates
  352. # that the fd should be writable
  353. if e.errno != errno.EAGAIN:
  354. raise
  355. self.ignore_responses |= set(waiting_for)
  356. def check(self, repair=False, save_space=False):
  357. return self.call('check', repair, save_space)
  358. def commit(self, save_space=False):
  359. return self.call('commit', save_space)
  360. def rollback(self, *args):
  361. return self.call('rollback')
  362. def destroy(self):
  363. return self.call('destroy')
  364. def __len__(self):
  365. return self.call('__len__')
  366. def list(self, limit=None, marker=None):
  367. return self.call('list', limit, marker)
  368. def get(self, id_):
  369. for resp in self.get_many([id_]):
  370. return resp
  371. def get_many(self, ids, is_preloaded=False):
  372. for resp in self.call_many('get', [(id_,) for id_ in ids], is_preloaded=is_preloaded):
  373. yield resp
  374. def put(self, id_, data, wait=True):
  375. return self.call('put', id_, data, wait=wait)
  376. def delete(self, id_, wait=True):
  377. return self.call('delete', id_, wait=wait)
  378. def save_key(self, keydata):
  379. return self.call('save_key', keydata)
  380. def load_key(self):
  381. return self.call('load_key')
  382. def break_lock(self):
  383. return self.call('break_lock')
  384. def close(self):
  385. if self.p:
  386. self.p.stdin.close()
  387. self.p.stdout.close()
  388. self.p.wait()
  389. self.p = None
  390. def preload(self, ids):
  391. self.preload_ids += ids
  392. class RepositoryNoCache:
  393. """A not caching Repository wrapper, passes through to repository.
  394. Just to have same API (including the context manager) as RepositoryCache.
  395. """
  396. def __init__(self, repository):
  397. self.repository = repository
  398. def close(self):
  399. pass
  400. def __enter__(self):
  401. return self
  402. def __exit__(self, exc_type, exc_val, exc_tb):
  403. self.close()
  404. def get(self, key):
  405. return next(self.get_many([key]))
  406. def get_many(self, keys):
  407. for data in self.repository.get_many(keys):
  408. yield data
  409. class RepositoryCache(RepositoryNoCache):
  410. """A caching Repository wrapper
  411. Caches Repository GET operations using a local temporary Repository.
  412. """
  413. # maximum object size that will be cached, 64 kiB.
  414. THRESHOLD = 2**16
  415. def __init__(self, repository):
  416. super().__init__(repository)
  417. tmppath = tempfile.mkdtemp(prefix='borg-tmp')
  418. self.caching_repo = Repository(tmppath, create=True, exclusive=True)
  419. self.caching_repo.__enter__() # handled by context manager in base class
  420. def close(self):
  421. if self.caching_repo is not None:
  422. self.caching_repo.destroy()
  423. self.caching_repo = None
  424. def get_many(self, keys):
  425. unknown_keys = [key for key in keys if key not in self.caching_repo]
  426. repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
  427. for key in keys:
  428. try:
  429. yield self.caching_repo.get(key)
  430. except Repository.ObjectNotFound:
  431. for key_, data in repository_iterator:
  432. if key_ == key:
  433. if len(data) <= self.THRESHOLD:
  434. self.caching_repo.put(key, data)
  435. yield data
  436. break
  437. # Consume any pending requests
  438. for _ in repository_iterator:
  439. pass
  440. def cache_if_remote(repository):
  441. if isinstance(repository, RemoteRepository):
  442. return RepositoryCache(repository)
  443. else:
  444. return RepositoryNoCache(repository)