remote.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. import errno
  2. import fcntl
  3. import logging
  4. import os
  5. import select
  6. import shlex
  7. import sys
  8. import tempfile
  9. import textwrap
  10. import time
  11. from subprocess import Popen, PIPE
  12. from . import __version__
  13. from .helpers import Error, IntegrityError, sysinfo
  14. from .helpers import replace_placeholders
  15. from .helpers import bin_to_hex
  16. from .repository import Repository
  17. from .logger import create_logger
  18. import msgpack
  19. logger = create_logger(__name__)
  20. RPC_PROTOCOL_VERSION = 2
  21. BUFSIZE = 10 * 1024 * 1024
  22. MAX_INFLIGHT = 100
  23. def os_write(fd, data):
  24. """os.write wrapper so we do not lose data for partial writes."""
  25. # This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
  26. # and also due to its different blocking pipe behaviour compared to Linux/*BSD.
  27. # Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
  28. # signal, in which case serve() would terminate.
  29. amount = remaining = len(data)
  30. while remaining:
  31. count = os.write(fd, data)
  32. remaining -= count
  33. if not remaining:
  34. break
  35. data = data[count:]
  36. time.sleep(count * 1e-09)
  37. return amount
  38. class ConnectionClosed(Error):
  39. """Connection closed by remote host"""
  40. class ConnectionClosedWithHint(ConnectionClosed):
  41. """Connection closed by remote host. {}"""
  42. class PathNotAllowed(Error):
  43. """Repository path not allowed"""
  44. class InvalidRPCMethod(Error):
  45. """RPC method {} is not valid"""
  46. class UnexpectedRPCDataFormatFromClient(Error):
  47. """Borg {}: Got unexpected RPC data format from client."""
  48. class UnexpectedRPCDataFormatFromServer(Error):
  49. """Got unexpected RPC data format from server:\n{}"""
  50. def __init__(self, data):
  51. try:
  52. data = data.decode()[:128]
  53. except UnicodeDecodeError:
  54. data = data[:128]
  55. data = ['%02X' % byte for byte in data]
  56. data = textwrap.fill(' '.join(data), 16 * 3)
  57. super().__init__(data)
  58. class RepositoryServer: # pragma: no cover
  59. rpc_methods = (
  60. '__len__',
  61. 'check',
  62. 'commit',
  63. 'delete',
  64. 'destroy',
  65. 'get',
  66. 'list',
  67. 'negotiate',
  68. 'open',
  69. 'put',
  70. 'rollback',
  71. 'save_key',
  72. 'load_key',
  73. 'break_lock',
  74. )
  75. def __init__(self, restrict_to_paths, append_only):
  76. self.repository = None
  77. self.restrict_to_paths = restrict_to_paths
  78. self.append_only = append_only
  79. def serve(self):
  80. stdin_fd = sys.stdin.fileno()
  81. stdout_fd = sys.stdout.fileno()
  82. stderr_fd = sys.stdout.fileno()
  83. # Make stdin non-blocking
  84. fl = fcntl.fcntl(stdin_fd, fcntl.F_GETFL)
  85. fcntl.fcntl(stdin_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
  86. # Make stdout blocking
  87. fl = fcntl.fcntl(stdout_fd, fcntl.F_GETFL)
  88. fcntl.fcntl(stdout_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  89. # Make stderr blocking
  90. fl = fcntl.fcntl(stderr_fd, fcntl.F_GETFL)
  91. fcntl.fcntl(stderr_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
  92. unpacker = msgpack.Unpacker(use_list=False)
  93. while True:
  94. r, w, es = select.select([stdin_fd], [], [], 10)
  95. if r:
  96. data = os.read(stdin_fd, BUFSIZE)
  97. if not data:
  98. if self.repository is not None:
  99. self.repository.close()
  100. else:
  101. os_write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n"
  102. .format(__version__).encode())
  103. return
  104. unpacker.feed(data)
  105. for unpacked in unpacker:
  106. if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
  107. if self.repository is not None:
  108. self.repository.close()
  109. raise UnexpectedRPCDataFormatFromClient(__version__)
  110. type, msgid, method, args = unpacked
  111. method = method.decode('ascii')
  112. try:
  113. if method not in self.rpc_methods:
  114. raise InvalidRPCMethod(method)
  115. try:
  116. f = getattr(self, method)
  117. except AttributeError:
  118. f = getattr(self.repository, method)
  119. res = f(*args)
  120. except BaseException as e:
  121. # These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
  122. # and will be handled just like locally raised exceptions. Suppress the remote traceback
  123. # for these, except ErrorWithTraceback, which should always display a traceback.
  124. if not isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
  125. logging.exception('Borg %s: exception in RPC call:', __version__)
  126. logging.error(sysinfo())
  127. exc = "Remote Exception (see remote log for the traceback)"
  128. os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
  129. else:
  130. os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
  131. if es:
  132. self.repository.close()
  133. return
  134. def negotiate(self, versions):
  135. return RPC_PROTOCOL_VERSION
  136. def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False):
  137. path = os.fsdecode(path)
  138. if path.startswith('/~'): # /~/x = path x relative to home dir, /~username/x = relative to "user" home dir
  139. path = path[1:]
  140. elif path.startswith('/./'): # /./x = path x relative to cwd
  141. path = path[3:]
  142. path = os.path.realpath(os.path.expanduser(path))
  143. if self.restrict_to_paths:
  144. # if --restrict-to-path P is given, we make sure that we only operate in/below path P.
  145. # for the prefix check, it is important that the compared pathes both have trailing slashes,
  146. # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
  147. path_with_sep = os.path.join(path, '') # make sure there is a trailing slash (os.sep)
  148. for restrict_to_path in self.restrict_to_paths:
  149. restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), '') # trailing slash
  150. if path_with_sep.startswith(restrict_to_path_with_sep):
  151. break
  152. else:
  153. raise PathNotAllowed(path)
  154. self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
  155. append_only=self.append_only or append_only,
  156. exclusive=exclusive)
  157. self.repository.__enter__() # clean exit handled by serve() method
  158. return self.repository.id
  159. class RemoteRepository:
  160. extra_test_args = []
  161. class RPCError(Exception):
  162. def __init__(self, name, remote_type):
  163. self.name = name
  164. self.remote_type = remote_type
  165. class NoAppendOnlyOnServer(Error):
  166. """Server does not support --append-only."""
  167. def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None):
  168. self.location = self._location = location
  169. self.preload_ids = []
  170. self.msgid = 0
  171. self.to_send = b''
  172. self.cache = {}
  173. self.ignore_responses = set()
  174. self.responses = {}
  175. self.unpacker = msgpack.Unpacker(use_list=False)
  176. self.p = None
  177. testing = location.host == '__testsuite__'
  178. borg_cmd = self.borg_cmd(args, testing)
  179. env = dict(os.environ)
  180. if not testing:
  181. borg_cmd = self.ssh_cmd(location) + borg_cmd
  182. # pyinstaller binary adds LD_LIBRARY_PATH=/tmp/_ME... but we do not want
  183. # that the system's ssh binary picks up (non-matching) libraries from there
  184. env.pop('LD_LIBRARY_PATH', None)
  185. env.pop('BORG_PASSPHRASE', None) # security: do not give secrets to subprocess
  186. env['BORG_VERSION'] = __version__
  187. logger.debug('SSH command line: %s', borg_cmd)
  188. self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
  189. self.stdin_fd = self.p.stdin.fileno()
  190. self.stdout_fd = self.p.stdout.fileno()
  191. self.stderr_fd = self.p.stderr.fileno()
  192. fcntl.fcntl(self.stdin_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdin_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  193. fcntl.fcntl(self.stdout_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdout_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  194. fcntl.fcntl(self.stderr_fd, fcntl.F_SETFL, fcntl.fcntl(self.stderr_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
  195. self.r_fds = [self.stdout_fd, self.stderr_fd]
  196. self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
  197. try:
  198. try:
  199. version = self.call('negotiate', RPC_PROTOCOL_VERSION)
  200. except ConnectionClosed:
  201. raise ConnectionClosedWithHint('Is borg working on the server?') from None
  202. if version != RPC_PROTOCOL_VERSION:
  203. raise Exception('Server insisted on using unsupported protocol version %d' % version)
  204. try:
  205. self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only)
  206. except self.RPCError as err:
  207. if err.remote_type != 'TypeError':
  208. raise
  209. msg = """\
  210. Please note:
  211. If you see a TypeError complaining about the number of positional arguments
  212. given to open(), you can ignore it if it comes from a borg version < 1.0.7.
  213. This TypeError is a cosmetic side effect of the compatibility code borg
  214. clients >= 1.0.7 have to support older borg servers.
  215. This problem will go away as soon as the server has been upgraded to 1.0.7+.
  216. """
  217. # emit this msg in the same way as the "Remote: ..." lines that show the remote TypeError
  218. sys.stderr.write(msg)
  219. if append_only:
  220. raise self.NoAppendOnlyOnServer()
  221. self.id = self.call('open', self.location.path, create, lock_wait, lock)
  222. except Exception:
  223. self.close()
  224. raise
  225. def __del__(self):
  226. if len(self.responses):
  227. logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),))
  228. if self.p:
  229. self.close()
  230. assert False, "cleanup happened in Repository.__del__"
  231. def __repr__(self):
  232. return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path())
  233. def __enter__(self):
  234. return self
  235. def __exit__(self, exc_type, exc_val, exc_tb):
  236. try:
  237. if exc_type is not None:
  238. self.rollback()
  239. finally:
  240. # in any case, we want to cleanly close the repo, even if the
  241. # rollback can not succeed (e.g. because the connection was
  242. # already closed) and raised another exception:
  243. self.close()
  244. def borg_cmd(self, args, testing):
  245. """return a borg serve command line"""
  246. # give some args/options to "borg serve" process as they were given to us
  247. opts = []
  248. if args is not None:
  249. opts.append('--umask=%03o' % args.umask)
  250. root_logger = logging.getLogger()
  251. if root_logger.isEnabledFor(logging.DEBUG):
  252. opts.append('--debug')
  253. elif root_logger.isEnabledFor(logging.INFO):
  254. opts.append('--info')
  255. elif root_logger.isEnabledFor(logging.WARNING):
  256. pass # warning is default
  257. elif root_logger.isEnabledFor(logging.ERROR):
  258. opts.append('--error')
  259. elif root_logger.isEnabledFor(logging.CRITICAL):
  260. opts.append('--critical')
  261. else:
  262. raise ValueError('log level missing, fix this code')
  263. if testing:
  264. return [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
  265. else: # pragma: no cover
  266. remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
  267. remote_path = replace_placeholders(remote_path)
  268. return [remote_path, 'serve'] + opts
  269. def ssh_cmd(self, location):
  270. """return a ssh command line that can be prefixed to a borg command line"""
  271. args = shlex.split(os.environ.get('BORG_RSH', 'ssh'))
  272. if location.port:
  273. args += ['-p', str(location.port)]
  274. if location.user:
  275. args.append('%s@%s' % (location.user, location.host))
  276. else:
  277. args.append('%s' % location.host)
  278. return args
  279. def call(self, cmd, *args, **kw):
  280. for resp in self.call_many(cmd, [args], **kw):
  281. return resp
  282. def call_many(self, cmd, calls, wait=True, is_preloaded=False):
  283. if not calls:
  284. return
  285. def fetch_from_cache(args):
  286. msgid = self.cache[args].pop(0)
  287. if not self.cache[args]:
  288. del self.cache[args]
  289. return msgid
  290. def handle_error(error, res):
  291. if error == b'DoesNotExist':
  292. raise Repository.DoesNotExist(self.location.orig)
  293. elif error == b'AlreadyExists':
  294. raise Repository.AlreadyExists(self.location.orig)
  295. elif error == b'CheckNeeded':
  296. raise Repository.CheckNeeded(self.location.orig)
  297. elif error == b'IntegrityError':
  298. raise IntegrityError(res)
  299. elif error == b'PathNotAllowed':
  300. raise PathNotAllowed(*res)
  301. elif error == b'ObjectNotFound':
  302. raise Repository.ObjectNotFound(res[0], self.location.orig)
  303. elif error == b'InvalidRPCMethod':
  304. raise InvalidRPCMethod(*res)
  305. else:
  306. raise self.RPCError(res.decode('utf-8'), error.decode('utf-8'))
  307. calls = list(calls)
  308. waiting_for = []
  309. while wait or calls:
  310. while waiting_for:
  311. try:
  312. error, res = self.responses.pop(waiting_for[0])
  313. waiting_for.pop(0)
  314. if error:
  315. handle_error(error, res)
  316. else:
  317. yield res
  318. if not waiting_for and not calls:
  319. return
  320. except KeyError:
  321. break
  322. if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
  323. w_fds = [self.stdin_fd]
  324. else:
  325. w_fds = []
  326. r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
  327. if x:
  328. raise Exception('FD exception occurred')
  329. for fd in r:
  330. if fd is self.stdout_fd:
  331. data = os.read(fd, BUFSIZE)
  332. if not data:
  333. raise ConnectionClosed()
  334. self.unpacker.feed(data)
  335. for unpacked in self.unpacker:
  336. if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
  337. raise UnexpectedRPCDataFormatFromServer(data)
  338. type, msgid, error, res = unpacked
  339. if msgid in self.ignore_responses:
  340. self.ignore_responses.remove(msgid)
  341. if error:
  342. handle_error(error, res)
  343. else:
  344. self.responses[msgid] = error, res
  345. elif fd is self.stderr_fd:
  346. data = os.read(fd, 32768)
  347. if not data:
  348. raise ConnectionClosed()
  349. data = data.decode('utf-8')
  350. for line in data.splitlines(keepends=True):
  351. if line.startswith('$LOG '):
  352. _, level, msg = line.split(' ', 2)
  353. level = getattr(logging, level, logging.CRITICAL) # str -> int
  354. logging.log(level, msg.rstrip())
  355. else:
  356. sys.stderr.write("Remote: " + line)
  357. if w:
  358. while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
  359. if calls:
  360. if is_preloaded:
  361. if calls[0] in self.cache:
  362. waiting_for.append(fetch_from_cache(calls.pop(0)))
  363. else:
  364. args = calls.pop(0)
  365. if cmd == 'get' and args in self.cache:
  366. waiting_for.append(fetch_from_cache(args))
  367. else:
  368. self.msgid += 1
  369. waiting_for.append(self.msgid)
  370. self.to_send = msgpack.packb((1, self.msgid, cmd, args))
  371. if not self.to_send and self.preload_ids:
  372. args = (self.preload_ids.pop(0),)
  373. self.msgid += 1
  374. self.cache.setdefault(args, []).append(self.msgid)
  375. self.to_send = msgpack.packb((1, self.msgid, cmd, args))
  376. if self.to_send:
  377. try:
  378. self.to_send = self.to_send[os.write(self.stdin_fd, self.to_send):]
  379. except OSError as e:
  380. # io.write might raise EAGAIN even though select indicates
  381. # that the fd should be writable
  382. if e.errno != errno.EAGAIN:
  383. raise
  384. self.ignore_responses |= set(waiting_for)
  385. def check(self, repair=False, save_space=False):
  386. return self.call('check', repair, save_space)
  387. def commit(self, save_space=False):
  388. return self.call('commit', save_space)
  389. def rollback(self, *args):
  390. return self.call('rollback')
  391. def destroy(self):
  392. return self.call('destroy')
  393. def __len__(self):
  394. return self.call('__len__')
  395. def list(self, limit=None, marker=None):
  396. return self.call('list', limit, marker)
  397. def get(self, id_):
  398. for resp in self.get_many([id_]):
  399. return resp
  400. def get_many(self, ids, is_preloaded=False):
  401. for resp in self.call_many('get', [(id_,) for id_ in ids], is_preloaded=is_preloaded):
  402. yield resp
  403. def put(self, id_, data, wait=True):
  404. return self.call('put', id_, data, wait=wait)
  405. def delete(self, id_, wait=True):
  406. return self.call('delete', id_, wait=wait)
  407. def save_key(self, keydata):
  408. return self.call('save_key', keydata)
  409. def load_key(self):
  410. return self.call('load_key')
  411. def break_lock(self):
  412. return self.call('break_lock')
  413. def close(self):
  414. if self.p:
  415. self.p.stdin.close()
  416. self.p.stdout.close()
  417. self.p.wait()
  418. self.p = None
  419. def preload(self, ids):
  420. self.preload_ids += ids
  421. class RepositoryNoCache:
  422. """A not caching Repository wrapper, passes through to repository.
  423. Just to have same API (including the context manager) as RepositoryCache.
  424. """
  425. def __init__(self, repository):
  426. self.repository = repository
  427. def close(self):
  428. pass
  429. def __enter__(self):
  430. return self
  431. def __exit__(self, exc_type, exc_val, exc_tb):
  432. self.close()
  433. def get(self, key):
  434. return next(self.get_many([key]))
  435. def get_many(self, keys):
  436. for data in self.repository.get_many(keys):
  437. yield data
  438. class RepositoryCache(RepositoryNoCache):
  439. """A caching Repository wrapper
  440. Caches Repository GET operations using a local temporary Repository.
  441. """
  442. # maximum object size that will be cached, 64 kiB.
  443. THRESHOLD = 2**16
  444. def __init__(self, repository):
  445. super().__init__(repository)
  446. tmppath = tempfile.mkdtemp(prefix='borg-tmp')
  447. self.caching_repo = Repository(tmppath, create=True, exclusive=True)
  448. self.caching_repo.__enter__() # handled by context manager in base class
  449. def close(self):
  450. if self.caching_repo is not None:
  451. self.caching_repo.destroy()
  452. self.caching_repo = None
  453. def get_many(self, keys):
  454. unknown_keys = [key for key in keys if key not in self.caching_repo]
  455. repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
  456. for key in keys:
  457. try:
  458. yield self.caching_repo.get(key)
  459. except Repository.ObjectNotFound:
  460. for key_, data in repository_iterator:
  461. if key_ == key:
  462. if len(data) <= self.THRESHOLD:
  463. self.caching_repo.put(key, data)
  464. yield data
  465. break
  466. # Consume any pending requests
  467. for _ in repository_iterator:
  468. pass
  469. def cache_if_remote(repository):
  470. if isinstance(repository, RemoteRepository):
  471. return RepositoryCache(repository)
  472. else:
  473. return RepositoryNoCache(repository)