archive.py 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285
  1. from datetime import datetime, timezone
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. from .logger import create_logger
  6. logger = create_logger()
  7. from .key import key_factory
  8. from .remote import cache_if_remote
  9. import os
  10. import socket
  11. import stat
  12. import sys
  13. import time
  14. from io import BytesIO
  15. from . import xattr
  16. from .compress import COMPR_BUFFER
  17. from .constants import * # NOQA
  18. from .helpers import Chunk, Error, uid2user, user2uid, gid2group, group2gid, \
  19. parse_timestamp, to_localtime, format_time, format_timedelta, safe_encode, safe_decode, \
  20. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, bin_to_hex, \
  21. ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, \
  22. PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume, \
  23. CompressionDecider1, CompressionDecider2, CompressionSpec
  24. from .repository import Repository
  25. from .platform import acl_get, acl_set
  26. if sys.platform == 'win32':
  27. from .platform import get_owner, set_owner
  28. from .chunker import Chunker
  29. from .hashindex import ChunkIndex, ChunkIndexEntry
  30. from .cache import ChunkListEntry
  31. import msgpack
  32. has_lchmod = hasattr(os, 'lchmod')
  33. has_lchflags = hasattr(os, 'lchflags')
  34. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  35. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  36. class DownloadPipeline:
  37. def __init__(self, repository, key):
  38. self.repository = repository
  39. self.key = key
  40. def unpack_many(self, ids, filter=None, preload=False):
  41. unpacker = msgpack.Unpacker(use_list=False)
  42. for _, data in self.fetch_many(ids):
  43. unpacker.feed(data)
  44. items = [decode_dict(item, ITEM_TEXT_KEYS) for item in unpacker]
  45. if filter:
  46. items = [item for item in items if filter(item)]
  47. for item in items:
  48. if b'chunks' in item:
  49. item[b'chunks'] = [ChunkListEntry(*e) for e in item[b'chunks']]
  50. if preload:
  51. for item in items:
  52. if b'chunks' in item:
  53. self.repository.preload([c.id for c in item[b'chunks']])
  54. for item in items:
  55. yield item
  56. def fetch_many(self, ids, is_preloaded=False):
  57. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  58. yield self.key.decrypt(id_, data)
  59. class ChunkBuffer:
  60. BUFFER_SIZE = 1 * 1024 * 1024
  61. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  62. self.buffer = BytesIO()
  63. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  64. self.chunks = []
  65. self.key = key
  66. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  67. def add(self, item):
  68. self.buffer.write(self.packer.pack(StableDict(item)))
  69. if self.is_full():
  70. self.flush()
  71. def write_chunk(self, chunk):
  72. raise NotImplementedError
  73. def flush(self, flush=False):
  74. if self.buffer.tell() == 0:
  75. return
  76. self.buffer.seek(0)
  77. chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
  78. self.buffer.seek(0)
  79. self.buffer.truncate(0)
  80. # Leave the last partial chunk in the buffer unless flush is True
  81. end = None if flush or len(chunks) == 1 else -1
  82. for chunk in chunks[:end]:
  83. self.chunks.append(self.write_chunk(chunk))
  84. if end == -1:
  85. self.buffer.write(chunks[-1].data)
  86. def is_full(self):
  87. return self.buffer.tell() > self.BUFFER_SIZE
  88. class CacheChunkBuffer(ChunkBuffer):
  89. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  90. super().__init__(key, chunker_params)
  91. self.cache = cache
  92. self.stats = stats
  93. def write_chunk(self, chunk):
  94. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
  95. return id_
  96. class Archive:
  97. class DoesNotExist(Error):
  98. """Archive {} does not exist"""
  99. class AlreadyExists(Error):
  100. """Archive {} already exists"""
  101. class IncompatibleFilesystemEncodingError(Error):
  102. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  103. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  104. checkpoint_interval=300, numeric_owner=False, progress=False,
  105. chunker_params=CHUNKER_PARAMS, start=None, end=None, compression=None, compression_files=None):
  106. self.cwd = os.getcwd()
  107. self.key = key
  108. self.repository = repository
  109. self.cache = cache
  110. self.manifest = manifest
  111. self.hard_links = {}
  112. self.stats = Statistics()
  113. self.show_progress = progress
  114. self.name = name
  115. self.checkpoint_interval = checkpoint_interval
  116. self.numeric_owner = numeric_owner
  117. if start is None:
  118. start = datetime.utcnow()
  119. self.chunker_params = chunker_params
  120. self.start = start
  121. if end is None:
  122. end = datetime.utcnow()
  123. self.end = end
  124. self.pipeline = DownloadPipeline(self.repository, self.key)
  125. if create:
  126. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  127. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  128. self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
  129. compression_files or [])
  130. key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
  131. if name in manifest.archives:
  132. raise self.AlreadyExists(name)
  133. self.last_checkpoint = time.time()
  134. i = 0
  135. while True:
  136. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  137. if self.checkpoint_name not in manifest.archives:
  138. break
  139. i += 1
  140. else:
  141. if name not in self.manifest.archives:
  142. raise self.DoesNotExist(name)
  143. info = self.manifest.archives[name]
  144. self.load(info[b'id'])
  145. self.zeros = b'\0' * (1 << chunker_params[1])
  146. def _load_meta(self, id):
  147. _, data = self.key.decrypt(id, self.repository.get(id))
  148. metadata = msgpack.unpackb(data)
  149. if metadata[b'version'] != 1:
  150. raise Exception('Unknown archive metadata version')
  151. return metadata
  152. def load(self, id):
  153. self.id = id
  154. self.metadata = self._load_meta(self.id)
  155. decode_dict(self.metadata, ARCHIVE_TEXT_KEYS)
  156. self.metadata[b'cmdline'] = [safe_decode(arg) for arg in self.metadata[b'cmdline']]
  157. self.name = self.metadata[b'name']
  158. @property
  159. def ts(self):
  160. """Timestamp of archive creation (start) in UTC"""
  161. ts = self.metadata[b'time']
  162. return parse_timestamp(ts)
  163. @property
  164. def ts_end(self):
  165. """Timestamp of archive creation (end) in UTC"""
  166. # fall back to time if there is no time_end present in metadata
  167. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  168. return parse_timestamp(ts)
  169. @property
  170. def fpr(self):
  171. return bin_to_hex(self.id)
  172. @property
  173. def duration(self):
  174. return format_timedelta(self.end - self.start)
  175. @property
  176. def duration_from_meta(self):
  177. return format_timedelta(self.ts_end - self.ts)
  178. def __str__(self):
  179. return '''\
  180. Archive name: {0.name}
  181. Archive fingerprint: {0.fpr}
  182. Time (start): {start}
  183. Time (end): {end}
  184. Duration: {0.duration}
  185. Number of files: {0.stats.nfiles}'''.format(
  186. self,
  187. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  188. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  189. def __repr__(self):
  190. return 'Archive(%r)' % self.name
  191. def iter_items(self, filter=None, preload=False):
  192. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  193. yield item
  194. def add_item(self, item):
  195. unknown_keys = set(item) - ITEM_KEYS
  196. assert not unknown_keys, ('unknown item metadata keys detected, please update constants.ITEM_KEYS: %s',
  197. ','.join(k.decode('ascii') for k in unknown_keys))
  198. if self.show_progress:
  199. self.stats.show_progress(item=item, dt=0.2)
  200. self.items_buffer.add(item)
  201. if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
  202. self.write_checkpoint()
  203. self.last_checkpoint = time.time()
  204. def write_checkpoint(self):
  205. self.save(self.checkpoint_name)
  206. del self.manifest.archives[self.checkpoint_name]
  207. self.cache.chunk_decref(self.id, self.stats)
  208. def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
  209. name = name or self.name
  210. if name in self.manifest.archives:
  211. raise self.AlreadyExists(name)
  212. self.items_buffer.flush(flush=True)
  213. if timestamp is None:
  214. self.end = datetime.utcnow()
  215. start = self.start
  216. end = self.end
  217. else:
  218. self.end = timestamp
  219. start = timestamp
  220. end = timestamp # we only have 1 value
  221. metadata = {
  222. 'version': 1,
  223. 'name': name,
  224. 'comment': comment,
  225. 'items': self.items_buffer.chunks,
  226. 'cmdline': sys.argv,
  227. 'hostname': socket.gethostname(),
  228. 'username': getuser(),
  229. 'time': start.isoformat(),
  230. 'time_end': end.isoformat(),
  231. 'chunker_params': self.chunker_params,
  232. }
  233. metadata.update(additional_metadata or {})
  234. data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
  235. self.id = self.key.id_hash(data)
  236. self.cache.add_chunk(self.id, Chunk(data), self.stats)
  237. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  238. self.manifest.write()
  239. self.repository.commit()
  240. self.cache.commit()
  241. def calc_stats(self, cache):
  242. def add(id):
  243. count, size, csize = cache.chunks[id]
  244. stats.update(size, csize, count == 1)
  245. cache.chunks[id] = count - 1, size, csize
  246. def add_file_chunks(chunks):
  247. for id, _, _ in chunks:
  248. add(id)
  249. # This function is a bit evil since it abuses the cache to calculate
  250. # the stats. The cache transaction must be rolled back afterwards
  251. unpacker = msgpack.Unpacker(use_list=False)
  252. cache.begin_txn()
  253. stats = Statistics()
  254. add(self.id)
  255. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  256. add(id)
  257. _, data = self.key.decrypt(id, chunk)
  258. unpacker.feed(data)
  259. for item in unpacker:
  260. if b'chunks' in item:
  261. stats.nfiles += 1
  262. add_file_chunks(item[b'chunks'])
  263. cache.rollback()
  264. return stats
  265. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
  266. hardlink_masters=None, original_path=None):
  267. """
  268. Extract archive item.
  269. :param item: the item to extract
  270. :param restore_attrs: restore file attributes
  271. :param dry_run: do not write any data
  272. :param stdout: write extracted data to stdout
  273. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  274. :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
  275. :param original_path: b'path' key as stored in archive
  276. """
  277. if dry_run or stdout:
  278. if b'chunks' in item:
  279. for data in self.pipeline.fetch_many([c.id for c in item[b'chunks']], is_preloaded=True):
  280. if stdout:
  281. sys.stdout.buffer.write(data)
  282. if stdout:
  283. sys.stdout.buffer.flush()
  284. return
  285. original_path = original_path or item[b'path']
  286. dest = self.cwd
  287. if item[b'path'].startswith('/') or item[b'path'].startswith('..') or (sys.platform == 'win32' and item[b'path'][1] == ':'):
  288. raise Exception('Path should be relative and local')
  289. path = os.path.join(dest, item[b'path'])
  290. # Attempt to remove existing files, ignore errors on failure
  291. try:
  292. st = os.lstat(path)
  293. if stat.S_ISDIR(st.st_mode):
  294. os.rmdir(path)
  295. else:
  296. os.unlink(path)
  297. except UnicodeEncodeError:
  298. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  299. except OSError:
  300. pass
  301. mode = item[b'mode']
  302. if stat.S_ISREG(mode):
  303. if not os.path.exists(os.path.dirname(path)):
  304. os.makedirs(os.path.dirname(path))
  305. # Hard link?
  306. if b'source' in item:
  307. source = os.path.join(dest, item[b'source'])
  308. if os.path.exists(path):
  309. os.unlink(path)
  310. if not hardlink_masters:
  311. os.link(source, path)
  312. return
  313. item[b'chunks'], link_target = hardlink_masters[item[b'source']]
  314. if link_target:
  315. # Hard link was extracted previously, just link
  316. os.link(link_target, path)
  317. return
  318. # Extract chunks, since the item which had the chunks was not extracted
  319. with open(path, 'wb') as fd:
  320. ids = [c.id for c in item[b'chunks']]
  321. for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
  322. if sparse and self.zeros.startswith(data):
  323. # all-zero chunk: create a hole in a sparse file
  324. fd.seek(len(data), 1)
  325. else:
  326. fd.write(data)
  327. pos = fd.tell()
  328. fd.truncate(pos)
  329. fd.flush()
  330. if sys.platform != 'win32':
  331. self.restore_attrs(path, item, fd=fd.fileno())
  332. else:
  333. # File needs to be closed or timestamps are rewritten at close
  334. fd.close()
  335. self.restore_attrs(path, item)
  336. if hardlink_masters:
  337. # Update master entry with extracted file path, so that following hardlinks don't extract twice.
  338. hardlink_masters[item.get(b'source') or original_path] = (None, path)
  339. elif stat.S_ISDIR(mode):
  340. if not os.path.exists(path):
  341. os.makedirs(path)
  342. if restore_attrs:
  343. self.restore_attrs(path, item)
  344. elif stat.S_ISLNK(mode):
  345. if not os.path.exists(os.path.dirname(path)):
  346. os.makedirs(os.path.dirname(path))
  347. source = item[b'source']
  348. if os.path.exists(path):
  349. os.unlink(path)
  350. try:
  351. os.symlink(source, path)
  352. except UnicodeEncodeError:
  353. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  354. self.restore_attrs(path, item, symlink=True)
  355. elif stat.S_ISFIFO(mode):
  356. if not os.path.exists(os.path.dirname(path)):
  357. os.makedirs(os.path.dirname(path))
  358. os.mkfifo(path)
  359. self.restore_attrs(path, item)
  360. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  361. os.mknod(path, item[b'mode'], item[b'rdev'])
  362. self.restore_attrs(path, item)
  363. else:
  364. raise Exception('Unknown archive item type %r' % item[b'mode'])
  365. def restore_attrs(self, path, item, symlink=False, fd=None):
  366. uid = gid = None
  367. if not self.numeric_owner:
  368. uid = user2uid(item[b'user'])
  369. gid = group2gid(item[b'group'])
  370. uid = item[b'uid'] if uid is None else uid
  371. gid = item[b'gid'] if gid is None else gid
  372. # This code is a bit of a mess due to os specific differences
  373. if sys.platform != 'win32':
  374. try:
  375. if fd:
  376. os.fchown(fd, uid, gid)
  377. else:
  378. os.lchown(path, uid, gid)
  379. except OSError:
  380. pass
  381. else:
  382. try:
  383. set_owner(path, item[b'user'], safe_decode(item[b'uid']))
  384. except OSError:
  385. pass
  386. if sys.platform != 'win32':
  387. if fd:
  388. os.fchmod(fd, item[b'mode'])
  389. elif not symlink:
  390. os.chmod(path, item[b'mode'])
  391. elif has_lchmod: # Not available on Linux
  392. os.lchmod(path, item[b'mode'])
  393. mtime = bigint_to_int(item[b'mtime'])
  394. if b'atime' in item:
  395. atime = bigint_to_int(item[b'atime'])
  396. else:
  397. # old archives only had mtime in item metadata
  398. atime = mtime
  399. if sys.platform == 'win32':
  400. os.utime(path, ns=(atime, mtime))
  401. elif fd:
  402. os.utime(fd, None, ns=(atime, mtime))
  403. else:
  404. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  405. acl_set(path, item, self.numeric_owner)
  406. # Only available on OS X and FreeBSD
  407. if has_lchflags and b'bsdflags' in item:
  408. try:
  409. os.lchflags(path, item[b'bsdflags'])
  410. except OSError:
  411. pass
  412. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  413. # the Linux capabilities in the "security.capability" attribute.
  414. xattrs = item.get(b'xattrs', {})
  415. for k, v in xattrs.items():
  416. try:
  417. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  418. except OSError as e:
  419. if e.errno not in (errno.ENOTSUP, errno.EACCES):
  420. # only raise if the errno is not on our ignore list:
  421. # ENOTSUP == xattrs not supported here
  422. # EACCES == permission denied to set this specific xattr
  423. # (this may happen related to security.* keys)
  424. raise
  425. def set_meta(self, key, value):
  426. metadata = StableDict(self._load_meta(self.id))
  427. metadata[key] = value
  428. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  429. new_id = self.key.id_hash(data)
  430. self.cache.add_chunk(new_id, Chunk(data), self.stats)
  431. self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
  432. self.cache.chunk_decref(self.id, self.stats)
  433. self.id = new_id
  434. def rename(self, name):
  435. if name in self.manifest.archives:
  436. raise self.AlreadyExists(name)
  437. oldname = self.name
  438. self.name = name
  439. self.set_meta(b'name', name)
  440. del self.manifest.archives[oldname]
  441. def delete(self, stats, progress=False):
  442. unpacker = msgpack.Unpacker(use_list=False)
  443. items_ids = self.metadata[b'items']
  444. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  445. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  446. if progress:
  447. pi.show(i)
  448. _, data = self.key.decrypt(items_id, data)
  449. unpacker.feed(data)
  450. self.cache.chunk_decref(items_id, stats)
  451. for item in unpacker:
  452. if b'chunks' in item:
  453. for chunk_id, size, csize in item[b'chunks']:
  454. self.cache.chunk_decref(chunk_id, stats)
  455. if progress:
  456. pi.finish()
  457. self.cache.chunk_decref(self.id, stats)
  458. del self.manifest.archives[self.name]
  459. def stat_attrs(self, st, path):
  460. item = {}
  461. if sys.platform == 'win32':
  462. owner = get_owner(path)
  463. item = {
  464. b'mode': st.st_mode,
  465. b'uid': owner[1], b'user': owner[0],
  466. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  467. b'atime': int_to_bigint(st.st_atime_ns),
  468. b'ctime': int_to_bigint(st.st_ctime_ns),
  469. b'mtime': int_to_bigint(st.st_mtime_ns),
  470. }
  471. else:
  472. item = {
  473. b'mode': st.st_mode,
  474. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  475. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  476. b'atime': int_to_bigint(st.st_atime_ns),
  477. b'ctime': int_to_bigint(st.st_ctime_ns),
  478. b'mtime': int_to_bigint(st.st_mtime_ns),
  479. }
  480. if self.numeric_owner:
  481. item[b'user'] = item[b'group'] = None
  482. xattrs = xattr.get_all(path, follow_symlinks=False)
  483. if xattrs:
  484. item[b'xattrs'] = StableDict(xattrs)
  485. if has_lchflags and st.st_flags:
  486. item[b'bsdflags'] = st.st_flags
  487. acl_get(path, item, st, self.numeric_owner)
  488. return item
  489. def process_dir(self, path, st):
  490. item = {b'path': make_path_safe(path)}
  491. item.update(self.stat_attrs(st, path))
  492. self.add_item(item)
  493. return 'd' # directory
  494. def process_fifo(self, path, st):
  495. item = {b'path': make_path_safe(path)}
  496. item.update(self.stat_attrs(st, path))
  497. self.add_item(item)
  498. return 'f' # fifo
  499. def process_dev(self, path, st):
  500. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  501. item.update(self.stat_attrs(st, path))
  502. self.add_item(item)
  503. if stat.S_ISCHR(st.st_mode):
  504. return 'c' # char device
  505. elif stat.S_ISBLK(st.st_mode):
  506. return 'b' # block device
  507. def process_symlink(self, path, st):
  508. source = os.readlink(path)
  509. item = {b'path': make_path_safe(path), b'source': source}
  510. item.update(self.stat_attrs(st, path))
  511. self.add_item(item)
  512. return 's' # symlink
  513. def process_stdin(self, path, cache):
  514. uid, gid = 0, 0
  515. fd = sys.stdin.buffer # binary
  516. chunks = []
  517. for data in self.chunker.chunkify(fd):
  518. chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
  519. self.stats.nfiles += 1
  520. t = int_to_bigint(int(time.time()) * 1000000000)
  521. item = {
  522. b'path': path,
  523. b'chunks': chunks,
  524. b'mode': 0o100660, # regular file, ug=rw
  525. b'uid': uid, b'user': uid2user(uid),
  526. b'gid': gid, b'group': gid2group(gid),
  527. b'mtime': t, b'atime': t, b'ctime': t,
  528. }
  529. self.add_item(item)
  530. return 'i' # stdin
  531. def process_file(self, path, st, cache, ignore_inode=False):
  532. status = None
  533. safe_path = make_path_safe(path)
  534. # Is it a hard link?
  535. if st.st_nlink > 1:
  536. source = self.hard_links.get((st.st_ino, st.st_dev))
  537. if (st.st_ino, st.st_dev) in self.hard_links:
  538. item = self.stat_attrs(st, path)
  539. item.update({
  540. b'path': safe_path,
  541. b'source': source,
  542. })
  543. self.add_item(item)
  544. status = 'h' # regular file, hardlink (to already seen inodes)
  545. return status
  546. else:
  547. self.hard_links[st.st_ino, st.st_dev] = safe_path
  548. path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
  549. first_run = not cache.files
  550. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  551. if first_run:
  552. logger.debug('Processing files ...')
  553. chunks = None
  554. if ids is not None:
  555. # Make sure all ids are available
  556. for id_ in ids:
  557. if not cache.seen_chunk(id_):
  558. break
  559. else:
  560. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  561. status = 'U' # regular file, unchanged
  562. else:
  563. status = 'A' # regular file, added
  564. item = {
  565. b'path': safe_path,
  566. b'hardlink_master': st.st_nlink > 1, # item is a hard link and has the chunks
  567. }
  568. # Only chunkify the file if needed
  569. if chunks is None:
  570. compress = self.compression_decider1.decide(path)
  571. logger.debug('%s -> compression %s', path, compress['name'])
  572. fh = Archive._open_rb(path)
  573. with os.fdopen(fh, 'rb') as fd:
  574. chunks = []
  575. for data in self.chunker.chunkify(fd, fh):
  576. chunks.append(cache.add_chunk(self.key.id_hash(data),
  577. Chunk(data, compress=compress),
  578. self.stats))
  579. if self.show_progress:
  580. self.stats.show_progress(item=item, dt=0.2)
  581. cache.memorize_file(path_hash, st, [c.id for c in chunks])
  582. status = status or 'M' # regular file, modified (if not 'A' already)
  583. item[b'chunks'] = chunks
  584. item.update(self.stat_attrs(st, path))
  585. self.stats.nfiles += 1
  586. self.add_item(item)
  587. return status
  588. @staticmethod
  589. def list_archives(repository, key, manifest, cache=None):
  590. # expensive! see also Manifest.list_archive_infos.
  591. for name, info in manifest.archives.items():
  592. yield Archive(repository, key, manifest, name, cache=cache)
  593. @staticmethod
  594. def _open_rb(path):
  595. try:
  596. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  597. return os.open(path, flags_noatime)
  598. except PermissionError:
  599. if flags_noatime == flags_normal:
  600. # we do not have O_NOATIME, no need to try again:
  601. raise
  602. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  603. return os.open(path, flags_normal)
  604. class RobustUnpacker:
  605. """A restartable/robust version of the streaming msgpack unpacker
  606. """
  607. def __init__(self, validator):
  608. super().__init__()
  609. self.item_keys = [msgpack.packb(name) for name in ITEM_KEYS]
  610. self.validator = validator
  611. self._buffered_data = []
  612. self._resync = False
  613. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  614. def resync(self):
  615. self._buffered_data = []
  616. self._resync = True
  617. def feed(self, data):
  618. if self._resync:
  619. self._buffered_data.append(data)
  620. else:
  621. self._unpacker.feed(data)
  622. def __iter__(self):
  623. return self
  624. def __next__(self):
  625. if self._resync:
  626. data = b''.join(self._buffered_data)
  627. while self._resync:
  628. if not data:
  629. raise StopIteration
  630. # Abort early if the data does not look like a serialized dict
  631. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  632. data = data[1:]
  633. continue
  634. # Make sure it looks like an item dict
  635. for pattern in self.item_keys:
  636. if data[1:].startswith(pattern):
  637. break
  638. else:
  639. data = data[1:]
  640. continue
  641. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  642. self._unpacker.feed(data)
  643. try:
  644. item = next(self._unpacker)
  645. if self.validator(item):
  646. self._resync = False
  647. return item
  648. # Ignore exceptions that might be raised when feeding
  649. # msgpack with invalid data
  650. except (TypeError, ValueError, StopIteration):
  651. pass
  652. data = data[1:]
  653. else:
  654. return next(self._unpacker)
  655. class ArchiveChecker:
  656. def __init__(self):
  657. self.error_found = False
  658. self.possibly_superseded = set()
  659. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  660. logger.info('Starting archive consistency check...')
  661. self.check_all = archive is None and last is None and prefix is None
  662. self.repair = repair
  663. self.repository = repository
  664. self.init_chunks()
  665. self.key = self.identify_key(repository)
  666. if Manifest.MANIFEST_ID not in self.chunks:
  667. logger.error("Repository manifest not found!")
  668. self.error_found = True
  669. self.manifest = self.rebuild_manifest()
  670. else:
  671. self.manifest, _ = Manifest.load(repository, key=self.key)
  672. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  673. self.orphan_chunks_check()
  674. self.finish(save_space=save_space)
  675. if self.error_found:
  676. logger.error('Archive consistency check complete, problems found.')
  677. else:
  678. logger.info('Archive consistency check complete, no problems found.')
  679. return self.repair or not self.error_found
  680. def init_chunks(self):
  681. """Fetch a list of all object keys from repository
  682. """
  683. # Explicitly set the initial hash table capacity to avoid performance issues
  684. # due to hash table "resonance"
  685. capacity = int(len(self.repository) * 1.2)
  686. self.chunks = ChunkIndex(capacity)
  687. marker = None
  688. while True:
  689. result = self.repository.list(limit=10000, marker=marker)
  690. if not result:
  691. break
  692. marker = result[-1]
  693. init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
  694. for id_ in result:
  695. self.chunks[id_] = init_entry
  696. def identify_key(self, repository):
  697. cdata = repository.get(next(self.chunks.iteritems())[0])
  698. return key_factory(repository, cdata)
  699. def rebuild_manifest(self):
  700. """Rebuild the manifest object if it is missing
  701. Iterates through all objects in the repository looking for archive metadata blocks.
  702. """
  703. logger.info('Rebuilding missing manifest, this might take some time...')
  704. manifest = Manifest(self.key, self.repository)
  705. for chunk_id, _ in self.chunks.iteritems():
  706. cdata = self.repository.get(chunk_id)
  707. _, data = self.key.decrypt(chunk_id, cdata)
  708. # Some basic sanity checks of the payload before feeding it into msgpack
  709. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  710. continue
  711. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  712. continue
  713. try:
  714. archive = msgpack.unpackb(data)
  715. # Ignore exceptions that might be raised when feeding
  716. # msgpack with invalid data
  717. except (TypeError, ValueError, StopIteration):
  718. continue
  719. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  720. logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
  721. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  722. logger.info('Manifest rebuild complete.')
  723. return manifest
  724. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  725. """Rebuild object reference counts by walking the metadata
  726. Missing and/or incorrect data is repaired when detected
  727. """
  728. # Exclude the manifest from chunks
  729. del self.chunks[Manifest.MANIFEST_ID]
  730. def mark_as_possibly_superseded(id_):
  731. if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
  732. self.possibly_superseded.add(id_)
  733. def add_callback(chunk):
  734. id_ = self.key.id_hash(chunk.data)
  735. cdata = self.key.encrypt(chunk)
  736. add_reference(id_, len(chunk.data), len(cdata), cdata)
  737. return id_
  738. def add_reference(id_, size, csize, cdata=None):
  739. try:
  740. self.chunks.incref(id_)
  741. except KeyError:
  742. assert cdata is not None
  743. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
  744. if self.repair:
  745. self.repository.put(id_, cdata)
  746. def verify_file_chunks(item):
  747. """Verifies that all file chunks are present
  748. Missing file chunks will be replaced with new chunks of the same
  749. length containing all zeros.
  750. """
  751. offset = 0
  752. chunk_list = []
  753. for chunk_id, size, csize in item[b'chunks']:
  754. if chunk_id not in self.chunks:
  755. # If a file chunk is missing, create an all empty replacement chunk
  756. logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(safe_decode(item[b'path']), offset, offset + size))
  757. self.error_found = True
  758. data = bytes(size)
  759. chunk_id = self.key.id_hash(data)
  760. cdata = self.key.encrypt(Chunk(data))
  761. csize = len(cdata)
  762. add_reference(chunk_id, size, csize, cdata)
  763. else:
  764. add_reference(chunk_id, size, csize)
  765. chunk_list.append((chunk_id, size, csize))
  766. offset += size
  767. item[b'chunks'] = chunk_list
  768. def robust_iterator(archive):
  769. """Iterates through all archive items
  770. Missing item chunks will be skipped and the msgpack stream will be restarted
  771. """
  772. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  773. _state = 0
  774. def missing_chunk_detector(chunk_id):
  775. nonlocal _state
  776. if _state % 2 != int(chunk_id not in self.chunks):
  777. _state += 1
  778. return _state
  779. def report(msg, chunk_id, chunk_no):
  780. cid = bin_to_hex(chunk_id)
  781. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  782. self.error_found = True
  783. logger.error(msg)
  784. i = 0
  785. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  786. items = list(items)
  787. if state % 2:
  788. for chunk_id in items:
  789. report('item metadata chunk missing', chunk_id, i)
  790. i += 1
  791. continue
  792. if state > 0:
  793. unpacker.resync()
  794. for chunk_id, cdata in zip(items, repository.get_many(items)):
  795. _, data = self.key.decrypt(chunk_id, cdata)
  796. unpacker.feed(data)
  797. try:
  798. for item in unpacker:
  799. if isinstance(item, dict):
  800. yield item
  801. else:
  802. report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
  803. except Exception:
  804. report('Exception while unpacking item metadata', chunk_id, i)
  805. raise
  806. i += 1
  807. if archive is None:
  808. # we need last N or all archives
  809. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  810. key=lambda name_info: name_info[1][b'time'])
  811. if prefix is not None:
  812. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  813. num_archives = len(archive_items)
  814. end = None if last is None else min(num_archives, last)
  815. else:
  816. # we only want one specific archive
  817. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  818. num_archives = 1
  819. end = 1
  820. with cache_if_remote(self.repository) as repository:
  821. for i, (name, info) in enumerate(archive_items[:end]):
  822. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  823. archive_id = info[b'id']
  824. if archive_id not in self.chunks:
  825. logger.error('Archive metadata block is missing!')
  826. self.error_found = True
  827. del self.manifest.archives[name]
  828. continue
  829. mark_as_possibly_superseded(archive_id)
  830. cdata = self.repository.get(archive_id)
  831. _, data = self.key.decrypt(archive_id, cdata)
  832. archive = StableDict(msgpack.unpackb(data))
  833. if archive[b'version'] != 1:
  834. raise Exception('Unknown archive metadata version')
  835. decode_dict(archive, ARCHIVE_TEXT_KEYS)
  836. archive[b'cmdline'] = [safe_decode(arg) for arg in archive[b'cmdline']]
  837. items_buffer = ChunkBuffer(self.key)
  838. items_buffer.write_chunk = add_callback
  839. for item in robust_iterator(archive):
  840. if b'chunks' in item:
  841. verify_file_chunks(item)
  842. items_buffer.add(item)
  843. items_buffer.flush(flush=True)
  844. for previous_item_id in archive[b'items']:
  845. mark_as_possibly_superseded(previous_item_id)
  846. archive[b'items'] = items_buffer.chunks
  847. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  848. new_archive_id = self.key.id_hash(data)
  849. cdata = self.key.encrypt(Chunk(data))
  850. add_reference(new_archive_id, len(data), len(cdata), cdata)
  851. info[b'id'] = new_archive_id
  852. def orphan_chunks_check(self):
  853. if self.check_all:
  854. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  855. orphaned = unused - self.possibly_superseded
  856. if orphaned:
  857. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  858. self.error_found = True
  859. if self.repair:
  860. for id_ in unused:
  861. self.repository.delete(id_)
  862. else:
  863. logger.info('Orphaned objects check skipped (needs all archives checked).')
  864. def finish(self, save_space=False):
  865. if self.repair:
  866. self.manifest.write()
  867. self.repository.commit(save_space=save_space)
  868. class ArchiveRecreater:
  869. AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
  870. """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
  871. class FakeTargetArchive:
  872. def __init__(self):
  873. self.stats = Statistics()
  874. class Interrupted(Exception):
  875. def __init__(self, metadata=None):
  876. self.metadata = metadata or {}
  877. @staticmethod
  878. def is_temporary_archive(archive_name):
  879. return archive_name.endswith('.recreate')
  880. def __init__(self, repository, manifest, key, cache, matcher,
  881. exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
  882. chunker_params=None, compression=None, compression_files=None,
  883. dry_run=False, stats=False, progress=False, file_status_printer=None):
  884. self.repository = repository
  885. self.key = key
  886. self.manifest = manifest
  887. self.cache = cache
  888. self.matcher = matcher
  889. self.exclude_caches = exclude_caches
  890. self.exclude_if_present = exclude_if_present or []
  891. self.keep_tag_files = keep_tag_files
  892. self.chunker_params = chunker_params or CHUNKER_PARAMS
  893. self.recompress = bool(compression)
  894. self.compression = compression or CompressionSpec('none')
  895. self.seen_chunks = set()
  896. self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
  897. compression_files or [])
  898. key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
  899. self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
  900. logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
  901. self.dry_run = dry_run
  902. self.stats = stats
  903. self.progress = progress
  904. self.print_file_status = file_status_printer or (lambda *args: None)
  905. self.interrupt = False
  906. self.errors = False
  907. def recreate(self, archive_name, comment=None):
  908. assert not self.is_temporary_archive(archive_name)
  909. archive = self.open_archive(archive_name)
  910. target, resume_from = self.create_target_or_resume(archive)
  911. if self.exclude_if_present or self.exclude_caches:
  912. self.matcher_add_tagged_dirs(archive)
  913. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  914. logger.info("Skipping archive %s, nothing to do", archive_name)
  915. return True
  916. try:
  917. self.process_items(archive, target, resume_from)
  918. except self.Interrupted as e:
  919. return self.save(archive, target, completed=False, metadata=e.metadata)
  920. return self.save(archive, target, comment)
  921. def process_items(self, archive, target, resume_from=None):
  922. matcher = self.matcher
  923. target_is_subset = not matcher.empty()
  924. hardlink_masters = {} if target_is_subset else None
  925. def item_is_hardlink_master(item):
  926. return (target_is_subset and
  927. stat.S_ISREG(item[b'mode']) and
  928. item.get(b'hardlink_master', True) and
  929. b'source' not in item and
  930. not matcher.match(item[b'path']))
  931. for item in archive.iter_items():
  932. if item_is_hardlink_master(item):
  933. # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
  934. hardlink_masters[item[b'path']] = (item.get(b'chunks'), None)
  935. continue
  936. if resume_from:
  937. # Fast forward to after the last processed file
  938. if item[b'path'] == resume_from:
  939. logger.info('Fast-forwarded to %s', remove_surrogates(item[b'path']))
  940. resume_from = None
  941. continue
  942. if not matcher.match(item[b'path']):
  943. self.print_file_status('x', item[b'path'])
  944. continue
  945. if target_is_subset and stat.S_ISREG(item[b'mode']) and item.get(b'source') in hardlink_masters:
  946. # master of this hard link is outside the target subset
  947. chunks, new_source = hardlink_masters[item[b'source']]
  948. if new_source is None:
  949. # First item to use this master, move the chunks
  950. item[b'chunks'] = chunks
  951. hardlink_masters[item[b'source']] = (None, item[b'path'])
  952. del item[b'source']
  953. else:
  954. # Master was already moved, only update this item's source
  955. item[b'source'] = new_source
  956. if self.dry_run:
  957. self.print_file_status('-', item[b'path'])
  958. else:
  959. try:
  960. self.process_item(archive, target, item)
  961. except self.Interrupted:
  962. if self.progress:
  963. target.stats.show_progress(final=True)
  964. raise
  965. if self.progress:
  966. target.stats.show_progress(final=True)
  967. def process_item(self, archive, target, item):
  968. if b'chunks' in item:
  969. item[b'chunks'] = self.process_chunks(archive, target, item)
  970. target.stats.nfiles += 1
  971. target.add_item(item)
  972. self.print_file_status(file_status(item[b'mode']), item[b'path'])
  973. if self.interrupt:
  974. raise self.Interrupted
  975. def process_chunks(self, archive, target, item):
  976. """Return new chunk ID list for 'item'."""
  977. # TODO: support --compression-from
  978. if not self.recompress and not target.recreate_rechunkify:
  979. for chunk_id, size, csize in item[b'chunks']:
  980. self.cache.chunk_incref(chunk_id, target.stats)
  981. return item[b'chunks']
  982. new_chunks = self.process_partial_chunks(target)
  983. chunk_iterator = self.create_chunk_iterator(archive, target, item)
  984. consume(chunk_iterator, len(new_chunks))
  985. for chunk in chunk_iterator:
  986. chunk_id = self.key.id_hash(chunk.data)
  987. if chunk_id in self.seen_chunks:
  988. new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
  989. else:
  990. # TODO: detect / skip / --always-recompress
  991. chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
  992. new_chunks.append((chunk_id, size, csize))
  993. self.seen_chunks.add(chunk_id)
  994. if self.recompress:
  995. # This tracks how many bytes are uncommitted but compactable, since we are recompressing
  996. # existing chunks.
  997. target.recreate_uncomitted_bytes += csize
  998. if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
  999. # Issue commits to limit additional space usage when recompressing chunks
  1000. target.recreate_uncomitted_bytes = 0
  1001. self.repository.commit()
  1002. if self.progress:
  1003. target.stats.show_progress(item=item, dt=0.2)
  1004. if self.interrupt:
  1005. raise self.Interrupted({
  1006. 'recreate_partial_chunks': new_chunks,
  1007. })
  1008. return new_chunks
  1009. def create_chunk_iterator(self, archive, target, item):
  1010. """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
  1011. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item[b'chunks']])
  1012. if target.recreate_rechunkify:
  1013. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  1014. # (does not load the entire file into memory)
  1015. file = ChunkIteratorFileWrapper(chunk_iterator)
  1016. def _chunk_iterator():
  1017. for data in target.chunker.chunkify(file):
  1018. yield Chunk(data)
  1019. chunk_iterator = _chunk_iterator()
  1020. return chunk_iterator
  1021. def process_partial_chunks(self, target):
  1022. """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
  1023. if not target.recreate_partial_chunks:
  1024. return []
  1025. # No incref, create_target_or_resume already did that before to deleting the old target archive
  1026. # So just copy these over
  1027. partial_chunks = target.recreate_partial_chunks
  1028. target.recreate_partial_chunks = None
  1029. for chunk_id, size, csize in partial_chunks:
  1030. self.seen_chunks.add(chunk_id)
  1031. logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
  1032. return partial_chunks
  1033. def save(self, archive, target, comment=None, completed=True, metadata=None):
  1034. """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
  1035. if self.dry_run:
  1036. return completed
  1037. if completed:
  1038. timestamp = archive.ts.replace(tzinfo=None)
  1039. if comment is None:
  1040. comment = archive.metadata.get(b'comment', '')
  1041. target.save(timestamp=timestamp, comment=comment, additional_metadata={
  1042. 'cmdline': archive.metadata[b'cmdline'],
  1043. 'recreate_cmdline': sys.argv,
  1044. })
  1045. archive.delete(Statistics(), progress=self.progress)
  1046. target.rename(archive.name)
  1047. if self.stats:
  1048. target.end = datetime.utcnow()
  1049. log_multi(DASHES,
  1050. str(target),
  1051. DASHES,
  1052. str(target.stats),
  1053. str(self.cache),
  1054. DASHES)
  1055. else:
  1056. additional_metadata = metadata or {}
  1057. additional_metadata.update({
  1058. 'recreate_source_id': archive.id,
  1059. 'recreate_args': sys.argv[1:],
  1060. })
  1061. target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
  1062. logger.info('Run the same command again to resume.')
  1063. return completed
  1064. def matcher_add_tagged_dirs(self, archive):
  1065. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  1066. def exclude(dir, tag_item):
  1067. if self.keep_tag_files:
  1068. tag_files.append(PathPrefixPattern(tag_item[b'path']))
  1069. tagged_dirs.append(FnmatchPattern(dir + '/'))
  1070. else:
  1071. tagged_dirs.append(PathPrefixPattern(dir))
  1072. matcher = self.matcher
  1073. tag_files = []
  1074. tagged_dirs = []
  1075. # build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
  1076. cachedir_masters = {}
  1077. for item in archive.iter_items(
  1078. filter=lambda item: item[b'path'].endswith(CACHE_TAG_NAME) or matcher.match(item[b'path'])):
  1079. if item[b'path'].endswith(CACHE_TAG_NAME):
  1080. cachedir_masters[item[b'path']] = item
  1081. if stat.S_ISREG(item[b'mode']):
  1082. dir, tag_file = os.path.split(item[b'path'])
  1083. if tag_file in self.exclude_if_present:
  1084. exclude(dir, item)
  1085. if self.exclude_caches and tag_file == CACHE_TAG_NAME:
  1086. if b'chunks' in item:
  1087. file = open_item(archive, item)
  1088. else:
  1089. file = open_item(archive, cachedir_masters[item[b'source']])
  1090. if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
  1091. exclude(dir, item)
  1092. matcher.add(tag_files, True)
  1093. matcher.add(tagged_dirs, False)
  1094. def create_target_or_resume(self, archive):
  1095. """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
  1096. if self.dry_run:
  1097. return self.FakeTargetArchive(), None
  1098. target_name = archive.name + '.recreate'
  1099. resume = target_name in self.manifest.archives
  1100. target, resume_from = None, None
  1101. if resume:
  1102. target, resume_from = self.try_resume(archive, target_name)
  1103. if not target:
  1104. target = self.create_target_archive(target_name)
  1105. # If the archives use the same chunker params, then don't rechunkify
  1106. target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
  1107. return target, resume_from
  1108. def try_resume(self, archive, target_name):
  1109. """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
  1110. logger.info('Found %s, will resume interrupted operation', target_name)
  1111. old_target = self.open_archive(target_name)
  1112. resume_id = old_target.metadata[b'recreate_source_id']
  1113. resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
  1114. if resume_id != archive.id:
  1115. logger.warning('Source archive changed, will discard %s and start over', target_name)
  1116. logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
  1117. logger.warning('Current fingerprint: %s', archive.fpr)
  1118. old_target.delete(Statistics(), progress=self.progress)
  1119. return None, None # can't resume
  1120. if resume_args != sys.argv[1:]:
  1121. logger.warning('Command line changed, this might lead to inconsistencies')
  1122. logger.warning('Saved: %s', repr(resume_args))
  1123. logger.warning('Current: %s', repr(sys.argv[1:]))
  1124. target = self.create_target_archive(target_name + '.temp')
  1125. logger.info('Replaying items from interrupted operation...')
  1126. item = None
  1127. for item in old_target.iter_items():
  1128. if b'chunks' in item:
  1129. for chunk in item[b'chunks']:
  1130. self.cache.chunk_incref(chunk.id, target.stats)
  1131. target.stats.nfiles += 1
  1132. target.add_item(item)
  1133. if item:
  1134. resume_from = item[b'path']
  1135. else:
  1136. resume_from = None
  1137. if self.progress:
  1138. old_target.stats.show_progress(final=True)
  1139. target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
  1140. for chunk_id, size, csize in target.recreate_partial_chunks:
  1141. if not self.cache.seen_chunk(chunk_id):
  1142. try:
  1143. # Repository has __contains__, RemoteRepository doesn't
  1144. self.repository.get(chunk_id)
  1145. except Repository.ObjectNotFound:
  1146. # delete/prune/check between invocations: these chunks are gone.
  1147. target.recreate_partial_chunks = None
  1148. break
  1149. # fast-lane insert into chunks cache
  1150. self.cache.chunks[chunk_id] = (1, size, csize)
  1151. target.stats.update(size, csize, True)
  1152. continue
  1153. # incref now, otherwise old_target.delete() might delete these chunks
  1154. self.cache.chunk_incref(chunk_id, target.stats)
  1155. old_target.delete(Statistics(), progress=self.progress)
  1156. logger.info('Done replaying items')
  1157. return target, resume_from
  1158. def create_target_archive(self, name):
  1159. target = Archive(self.repository, self.key, self.manifest, name, create=True,
  1160. progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
  1161. checkpoint_interval=0, compression=self.compression)
  1162. target.recreate_partial_chunks = None
  1163. target.recreate_uncomitted_bytes = 0
  1164. return target
  1165. def open_archive(self, name, **kwargs):
  1166. return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)