archive.py 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243
  1. from binascii import hexlify
  2. from datetime import datetime, timezone
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. from .logger import create_logger
  7. logger = create_logger()
  8. from .key import key_factory
  9. from .remote import cache_if_remote
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from . import xattr
  17. from .compress import Compressor, COMPR_BUFFER
  18. from .constants import * # NOQA
  19. from .helpers import Chunk, Error, uid2user, user2uid, gid2group, group2gid, \
  20. parse_timestamp, to_localtime, format_time, format_timedelta, \
  21. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  22. ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, \
  23. PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume
  24. from .repository import Repository
  25. from .platform import acl_get, acl_set
  26. from .chunker import Chunker
  27. from .hashindex import ChunkIndex, ChunkIndexEntry
  28. from .cache import ChunkListEntry
  29. import msgpack
  30. has_lchmod = hasattr(os, 'lchmod')
  31. has_lchflags = hasattr(os, 'lchflags')
  32. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  33. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  34. class DownloadPipeline:
  35. def __init__(self, repository, key):
  36. self.repository = repository
  37. self.key = key
  38. def unpack_many(self, ids, filter=None, preload=False):
  39. unpacker = msgpack.Unpacker(use_list=False)
  40. for _, data in self.fetch_many(ids):
  41. unpacker.feed(data)
  42. items = [decode_dict(item, ITEM_TEXT_KEYS) for item in unpacker]
  43. if filter:
  44. items = [item for item in items if filter(item)]
  45. for item in items:
  46. if b'chunks' in item:
  47. item[b'chunks'] = [ChunkListEntry(*e) for e in item[b'chunks']]
  48. if preload:
  49. for item in items:
  50. if b'chunks' in item:
  51. self.repository.preload([c.id for c in item[b'chunks']])
  52. for item in items:
  53. yield item
  54. def fetch_many(self, ids, is_preloaded=False):
  55. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  56. yield self.key.decrypt(id_, data)
  57. class ChunkBuffer:
  58. BUFFER_SIZE = 1 * 1024 * 1024
  59. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  60. self.buffer = BytesIO()
  61. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  62. self.chunks = []
  63. self.key = key
  64. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  65. def add(self, item):
  66. self.buffer.write(self.packer.pack(StableDict(item)))
  67. if self.is_full():
  68. self.flush()
  69. def write_chunk(self, chunk):
  70. raise NotImplementedError
  71. def flush(self, flush=False):
  72. if self.buffer.tell() == 0:
  73. return
  74. self.buffer.seek(0)
  75. chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
  76. self.buffer.seek(0)
  77. self.buffer.truncate(0)
  78. # Leave the last partial chunk in the buffer unless flush is True
  79. end = None if flush or len(chunks) == 1 else -1
  80. for chunk in chunks[:end]:
  81. self.chunks.append(self.write_chunk(chunk))
  82. if end == -1:
  83. self.buffer.write(chunks[-1])
  84. def is_full(self):
  85. return self.buffer.tell() > self.BUFFER_SIZE
  86. class CacheChunkBuffer(ChunkBuffer):
  87. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  88. super().__init__(key, chunker_params)
  89. self.cache = cache
  90. self.stats = stats
  91. def write_chunk(self, chunk):
  92. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
  93. return id_
  94. class Archive:
  95. class DoesNotExist(Error):
  96. """Archive {} does not exist"""
  97. class AlreadyExists(Error):
  98. """Archive {} already exists"""
  99. class IncompatibleFilesystemEncodingError(Error):
  100. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  101. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  102. checkpoint_interval=300, numeric_owner=False, progress=False,
  103. chunker_params=CHUNKER_PARAMS, start=None, end=None):
  104. self.cwd = os.getcwd()
  105. self.key = key
  106. self.repository = repository
  107. self.cache = cache
  108. self.manifest = manifest
  109. self.hard_links = {}
  110. self.stats = Statistics()
  111. self.show_progress = progress
  112. self.name = name
  113. self.checkpoint_interval = checkpoint_interval
  114. self.numeric_owner = numeric_owner
  115. if start is None:
  116. start = datetime.utcnow()
  117. self.chunker_params = chunker_params
  118. self.start = start
  119. if end is None:
  120. end = datetime.utcnow()
  121. self.end = end
  122. self.pipeline = DownloadPipeline(self.repository, self.key)
  123. if create:
  124. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  125. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  126. if name in manifest.archives:
  127. raise self.AlreadyExists(name)
  128. self.last_checkpoint = time.time()
  129. i = 0
  130. while True:
  131. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  132. if self.checkpoint_name not in manifest.archives:
  133. break
  134. i += 1
  135. else:
  136. if name not in self.manifest.archives:
  137. raise self.DoesNotExist(name)
  138. info = self.manifest.archives[name]
  139. self.load(info[b'id'])
  140. self.zeros = b'\0' * (1 << chunker_params[1])
  141. def _load_meta(self, id):
  142. _, data = self.key.decrypt(id, self.repository.get(id))
  143. metadata = msgpack.unpackb(data)
  144. if metadata[b'version'] != 1:
  145. raise Exception('Unknown archive metadata version')
  146. return metadata
  147. def load(self, id):
  148. self.id = id
  149. self.metadata = self._load_meta(self.id)
  150. decode_dict(self.metadata, ARCHIVE_TEXT_KEYS)
  151. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  152. self.name = self.metadata[b'name']
  153. @property
  154. def ts(self):
  155. """Timestamp of archive creation (start) in UTC"""
  156. ts = self.metadata[b'time']
  157. return parse_timestamp(ts)
  158. @property
  159. def ts_end(self):
  160. """Timestamp of archive creation (end) in UTC"""
  161. # fall back to time if there is no time_end present in metadata
  162. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  163. return parse_timestamp(ts)
  164. @property
  165. def fpr(self):
  166. return hexlify(self.id).decode('ascii')
  167. @property
  168. def duration(self):
  169. return format_timedelta(self.end - self.start)
  170. def __str__(self):
  171. return '''\
  172. Archive name: {0.name}
  173. Archive fingerprint: {0.fpr}
  174. Time (start): {start}
  175. Time (end): {end}
  176. Duration: {0.duration}
  177. Number of files: {0.stats.nfiles}'''.format(
  178. self,
  179. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  180. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  181. def __repr__(self):
  182. return 'Archive(%r)' % self.name
  183. def iter_items(self, filter=None, preload=False):
  184. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  185. yield item
  186. def add_item(self, item):
  187. unknown_keys = set(item) - ITEM_KEYS
  188. assert not unknown_keys, ('unknown item metadata keys detected, please update constants.ITEM_KEYS: %s',
  189. ','.join(k.decode('ascii') for k in unknown_keys))
  190. if self.show_progress:
  191. self.stats.show_progress(item=item, dt=0.2)
  192. self.items_buffer.add(item)
  193. if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
  194. self.write_checkpoint()
  195. self.last_checkpoint = time.time()
  196. def write_checkpoint(self):
  197. self.save(self.checkpoint_name)
  198. del self.manifest.archives[self.checkpoint_name]
  199. self.cache.chunk_decref(self.id, self.stats)
  200. def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
  201. name = name or self.name
  202. if name in self.manifest.archives:
  203. raise self.AlreadyExists(name)
  204. self.items_buffer.flush(flush=True)
  205. if timestamp is None:
  206. self.end = datetime.utcnow()
  207. start = self.start
  208. end = self.end
  209. else:
  210. self.end = timestamp
  211. start = timestamp
  212. end = timestamp # we only have 1 value
  213. metadata = {
  214. 'version': 1,
  215. 'name': name,
  216. 'comment': comment,
  217. 'items': self.items_buffer.chunks,
  218. 'cmdline': sys.argv,
  219. 'hostname': socket.gethostname(),
  220. 'username': getuser(),
  221. 'time': start.isoformat(),
  222. 'time_end': end.isoformat(),
  223. 'chunker_params': self.chunker_params,
  224. }
  225. metadata.update(additional_metadata or {})
  226. data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
  227. self.id = self.key.id_hash(data)
  228. self.cache.add_chunk(self.id, Chunk(data), self.stats)
  229. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  230. self.manifest.write()
  231. self.repository.commit()
  232. self.cache.commit()
  233. def calc_stats(self, cache):
  234. def add(id):
  235. count, size, csize = cache.chunks[id]
  236. stats.update(size, csize, count == 1)
  237. cache.chunks[id] = count - 1, size, csize
  238. def add_file_chunks(chunks):
  239. for id, _, _ in chunks:
  240. add(id)
  241. # This function is a bit evil since it abuses the cache to calculate
  242. # the stats. The cache transaction must be rolled back afterwards
  243. unpacker = msgpack.Unpacker(use_list=False)
  244. cache.begin_txn()
  245. stats = Statistics()
  246. add(self.id)
  247. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  248. add(id)
  249. _, data = self.key.decrypt(id, chunk)
  250. unpacker.feed(data)
  251. for item in unpacker:
  252. if b'chunks' in item:
  253. stats.nfiles += 1
  254. add_file_chunks(item[b'chunks'])
  255. cache.rollback()
  256. return stats
  257. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
  258. hardlink_masters=None, original_path=None):
  259. """
  260. Extract archive item.
  261. :param item: the item to extract
  262. :param restore_attrs: restore file attributes
  263. :param dry_run: do not write any data
  264. :param stdout: write extracted data to stdout
  265. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  266. :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
  267. :param original_path: b'path' key as stored in archive
  268. """
  269. if dry_run or stdout:
  270. if b'chunks' in item:
  271. for data in self.pipeline.fetch_many([c.id for c in item[b'chunks']], is_preloaded=True):
  272. if stdout:
  273. sys.stdout.buffer.write(data)
  274. if stdout:
  275. sys.stdout.buffer.flush()
  276. return
  277. original_path = original_path or item[b'path']
  278. dest = self.cwd
  279. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  280. raise Exception('Path should be relative and local')
  281. path = os.path.join(dest, item[b'path'])
  282. # Attempt to remove existing files, ignore errors on failure
  283. try:
  284. st = os.lstat(path)
  285. if stat.S_ISDIR(st.st_mode):
  286. os.rmdir(path)
  287. else:
  288. os.unlink(path)
  289. except UnicodeEncodeError:
  290. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  291. except OSError:
  292. pass
  293. mode = item[b'mode']
  294. if stat.S_ISREG(mode):
  295. if not os.path.exists(os.path.dirname(path)):
  296. os.makedirs(os.path.dirname(path))
  297. # Hard link?
  298. if b'source' in item:
  299. source = os.path.join(dest, item[b'source'])
  300. if os.path.exists(path):
  301. os.unlink(path)
  302. if not hardlink_masters:
  303. os.link(source, path)
  304. return
  305. item[b'chunks'], link_target = hardlink_masters[item[b'source']]
  306. if link_target:
  307. # Hard link was extracted previously, just link
  308. os.link(link_target, path)
  309. return
  310. # Extract chunks, since the item which had the chunks was not extracted
  311. with open(path, 'wb') as fd:
  312. ids = [c.id for c in item[b'chunks']]
  313. for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
  314. if sparse and self.zeros.startswith(data):
  315. # all-zero chunk: create a hole in a sparse file
  316. fd.seek(len(data), 1)
  317. else:
  318. fd.write(data)
  319. pos = fd.tell()
  320. fd.truncate(pos)
  321. fd.flush()
  322. self.restore_attrs(path, item, fd=fd.fileno())
  323. if hardlink_masters:
  324. # Update master entry with extracted file path, so that following hardlinks don't extract twice.
  325. hardlink_masters[item.get(b'source') or original_path] = (None, path)
  326. elif stat.S_ISDIR(mode):
  327. if not os.path.exists(path):
  328. os.makedirs(path)
  329. if restore_attrs:
  330. self.restore_attrs(path, item)
  331. elif stat.S_ISLNK(mode):
  332. if not os.path.exists(os.path.dirname(path)):
  333. os.makedirs(os.path.dirname(path))
  334. source = item[b'source']
  335. if os.path.exists(path):
  336. os.unlink(path)
  337. try:
  338. os.symlink(source, path)
  339. except UnicodeEncodeError:
  340. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  341. self.restore_attrs(path, item, symlink=True)
  342. elif stat.S_ISFIFO(mode):
  343. if not os.path.exists(os.path.dirname(path)):
  344. os.makedirs(os.path.dirname(path))
  345. os.mkfifo(path)
  346. self.restore_attrs(path, item)
  347. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  348. os.mknod(path, item[b'mode'], item[b'rdev'])
  349. self.restore_attrs(path, item)
  350. else:
  351. raise Exception('Unknown archive item type %r' % item[b'mode'])
  352. def restore_attrs(self, path, item, symlink=False, fd=None):
  353. xattrs = item.get(b'xattrs', {})
  354. for k, v in xattrs.items():
  355. try:
  356. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  357. except OSError as e:
  358. if e.errno not in (errno.ENOTSUP, errno.EACCES, ):
  359. # only raise if the errno is not on our ignore list:
  360. # ENOTSUP == xattrs not supported here
  361. # EACCES == permission denied to set this specific xattr
  362. # (this may happen related to security.* keys)
  363. raise
  364. uid = gid = None
  365. if not self.numeric_owner:
  366. uid = user2uid(item[b'user'])
  367. gid = group2gid(item[b'group'])
  368. uid = item[b'uid'] if uid is None else uid
  369. gid = item[b'gid'] if gid is None else gid
  370. # This code is a bit of a mess due to os specific differences
  371. try:
  372. if fd:
  373. os.fchown(fd, uid, gid)
  374. else:
  375. os.lchown(path, uid, gid)
  376. except OSError:
  377. pass
  378. if fd:
  379. os.fchmod(fd, item[b'mode'])
  380. elif not symlink:
  381. os.chmod(path, item[b'mode'])
  382. elif has_lchmod: # Not available on Linux
  383. os.lchmod(path, item[b'mode'])
  384. mtime = bigint_to_int(item[b'mtime'])
  385. if b'atime' in item:
  386. atime = bigint_to_int(item[b'atime'])
  387. else:
  388. # old archives only had mtime in item metadata
  389. atime = mtime
  390. if fd:
  391. os.utime(fd, None, ns=(atime, mtime))
  392. else:
  393. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  394. acl_set(path, item, self.numeric_owner)
  395. # Only available on OS X and FreeBSD
  396. if has_lchflags and b'bsdflags' in item:
  397. try:
  398. os.lchflags(path, item[b'bsdflags'])
  399. except OSError:
  400. pass
  401. def set_meta(self, key, value):
  402. metadata = StableDict(self._load_meta(self.id))
  403. metadata[key] = value
  404. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  405. new_id = self.key.id_hash(data)
  406. self.cache.add_chunk(new_id, Chunk(data), self.stats)
  407. self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
  408. self.cache.chunk_decref(self.id, self.stats)
  409. self.id = new_id
  410. def rename(self, name):
  411. if name in self.manifest.archives:
  412. raise self.AlreadyExists(name)
  413. oldname = self.name
  414. self.name = name
  415. self.set_meta(b'name', name)
  416. del self.manifest.archives[oldname]
  417. def delete(self, stats, progress=False):
  418. unpacker = msgpack.Unpacker(use_list=False)
  419. items_ids = self.metadata[b'items']
  420. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  421. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  422. if progress:
  423. pi.show(i)
  424. _, data = self.key.decrypt(items_id, data)
  425. unpacker.feed(data)
  426. self.cache.chunk_decref(items_id, stats)
  427. for item in unpacker:
  428. if b'chunks' in item:
  429. for chunk_id, size, csize in item[b'chunks']:
  430. self.cache.chunk_decref(chunk_id, stats)
  431. if progress:
  432. pi.finish()
  433. self.cache.chunk_decref(self.id, stats)
  434. del self.manifest.archives[self.name]
  435. def stat_attrs(self, st, path):
  436. item = {
  437. b'mode': st.st_mode,
  438. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  439. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  440. b'atime': int_to_bigint(st.st_atime_ns),
  441. b'ctime': int_to_bigint(st.st_ctime_ns),
  442. b'mtime': int_to_bigint(st.st_mtime_ns),
  443. }
  444. if self.numeric_owner:
  445. item[b'user'] = item[b'group'] = None
  446. xattrs = xattr.get_all(path, follow_symlinks=False)
  447. if xattrs:
  448. item[b'xattrs'] = StableDict(xattrs)
  449. if has_lchflags and st.st_flags:
  450. item[b'bsdflags'] = st.st_flags
  451. acl_get(path, item, st, self.numeric_owner)
  452. return item
  453. def process_dir(self, path, st):
  454. item = {b'path': make_path_safe(path)}
  455. item.update(self.stat_attrs(st, path))
  456. self.add_item(item)
  457. return 'd' # directory
  458. def process_fifo(self, path, st):
  459. item = {b'path': make_path_safe(path)}
  460. item.update(self.stat_attrs(st, path))
  461. self.add_item(item)
  462. return 'f' # fifo
  463. def process_dev(self, path, st):
  464. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  465. item.update(self.stat_attrs(st, path))
  466. self.add_item(item)
  467. if stat.S_ISCHR(st.st_mode):
  468. return 'c' # char device
  469. elif stat.S_ISBLK(st.st_mode):
  470. return 'b' # block device
  471. def process_symlink(self, path, st):
  472. source = os.readlink(path)
  473. item = {b'path': make_path_safe(path), b'source': source}
  474. item.update(self.stat_attrs(st, path))
  475. self.add_item(item)
  476. return 's' # symlink
  477. def process_stdin(self, path, cache):
  478. uid, gid = 0, 0
  479. fd = sys.stdin.buffer # binary
  480. chunks = []
  481. for data in self.chunker.chunkify(fd):
  482. chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
  483. self.stats.nfiles += 1
  484. t = int_to_bigint(int(time.time()) * 1000000000)
  485. item = {
  486. b'path': path,
  487. b'chunks': chunks,
  488. b'mode': 0o100660, # regular file, ug=rw
  489. b'uid': uid, b'user': uid2user(uid),
  490. b'gid': gid, b'group': gid2group(gid),
  491. b'mtime': t, b'atime': t, b'ctime': t,
  492. }
  493. self.add_item(item)
  494. return 'i' # stdin
  495. def process_file(self, path, st, cache, ignore_inode=False):
  496. status = None
  497. safe_path = make_path_safe(path)
  498. # Is it a hard link?
  499. if st.st_nlink > 1:
  500. source = self.hard_links.get((st.st_ino, st.st_dev))
  501. if (st.st_ino, st.st_dev) in self.hard_links:
  502. item = self.stat_attrs(st, path)
  503. item.update({
  504. b'path': safe_path,
  505. b'source': source,
  506. })
  507. self.add_item(item)
  508. status = 'h' # regular file, hardlink (to already seen inodes)
  509. return status
  510. else:
  511. self.hard_links[st.st_ino, st.st_dev] = safe_path
  512. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  513. first_run = not cache.files
  514. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  515. if first_run:
  516. logger.debug('Processing files ...')
  517. chunks = None
  518. if ids is not None:
  519. # Make sure all ids are available
  520. for id_ in ids:
  521. if not cache.seen_chunk(id_):
  522. break
  523. else:
  524. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  525. status = 'U' # regular file, unchanged
  526. else:
  527. status = 'A' # regular file, added
  528. item = {
  529. b'path': safe_path,
  530. b'hardlink_master': st.st_nlink > 1, # item is a hard link and has the chunks
  531. }
  532. # Only chunkify the file if needed
  533. if chunks is None:
  534. fh = Archive._open_rb(path)
  535. with os.fdopen(fh, 'rb') as fd:
  536. chunks = []
  537. for data in self.chunker.chunkify(fd, fh):
  538. chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
  539. if self.show_progress:
  540. self.stats.show_progress(item=item, dt=0.2)
  541. cache.memorize_file(path_hash, st, [c.id for c in chunks])
  542. status = status or 'M' # regular file, modified (if not 'A' already)
  543. item[b'chunks'] = chunks
  544. item.update(self.stat_attrs(st, path))
  545. self.stats.nfiles += 1
  546. self.add_item(item)
  547. return status
  548. @staticmethod
  549. def list_archives(repository, key, manifest, cache=None):
  550. # expensive! see also Manifest.list_archive_infos.
  551. for name, info in manifest.archives.items():
  552. yield Archive(repository, key, manifest, name, cache=cache)
  553. @staticmethod
  554. def _open_rb(path):
  555. try:
  556. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  557. return os.open(path, flags_noatime)
  558. except PermissionError:
  559. if flags_noatime == flags_normal:
  560. # we do not have O_NOATIME, no need to try again:
  561. raise
  562. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  563. return os.open(path, flags_normal)
  564. class RobustUnpacker:
  565. """A restartable/robust version of the streaming msgpack unpacker
  566. """
  567. def __init__(self, validator):
  568. super().__init__()
  569. self.item_keys = [msgpack.packb(name) for name in ITEM_KEYS]
  570. self.validator = validator
  571. self._buffered_data = []
  572. self._resync = False
  573. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  574. def resync(self):
  575. self._buffered_data = []
  576. self._resync = True
  577. def feed(self, data):
  578. if self._resync:
  579. self._buffered_data.append(data)
  580. else:
  581. self._unpacker.feed(data)
  582. def __iter__(self):
  583. return self
  584. def __next__(self):
  585. if self._resync:
  586. data = b''.join(self._buffered_data)
  587. while self._resync:
  588. if not data:
  589. raise StopIteration
  590. # Abort early if the data does not look like a serialized dict
  591. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  592. data = data[1:]
  593. continue
  594. # Make sure it looks like an item dict
  595. for pattern in self.item_keys:
  596. if data[1:].startswith(pattern):
  597. break
  598. else:
  599. data = data[1:]
  600. continue
  601. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  602. self._unpacker.feed(data)
  603. try:
  604. item = next(self._unpacker)
  605. if self.validator(item):
  606. self._resync = False
  607. return item
  608. # Ignore exceptions that might be raised when feeding
  609. # msgpack with invalid data
  610. except (TypeError, ValueError, StopIteration):
  611. pass
  612. data = data[1:]
  613. else:
  614. return next(self._unpacker)
  615. class ArchiveChecker:
  616. def __init__(self):
  617. self.error_found = False
  618. self.possibly_superseded = set()
  619. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  620. logger.info('Starting archive consistency check...')
  621. self.check_all = archive is None and last is None and prefix is None
  622. self.repair = repair
  623. self.repository = repository
  624. self.init_chunks()
  625. self.key = self.identify_key(repository)
  626. if Manifest.MANIFEST_ID not in self.chunks:
  627. logger.error("Repository manifest not found!")
  628. self.error_found = True
  629. self.manifest = self.rebuild_manifest()
  630. else:
  631. self.manifest, _ = Manifest.load(repository, key=self.key)
  632. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  633. self.orphan_chunks_check()
  634. self.finish(save_space=save_space)
  635. if self.error_found:
  636. logger.error('Archive consistency check complete, problems found.')
  637. else:
  638. logger.info('Archive consistency check complete, no problems found.')
  639. return self.repair or not self.error_found
  640. def init_chunks(self):
  641. """Fetch a list of all object keys from repository
  642. """
  643. # Explicitly set the initial hash table capacity to avoid performance issues
  644. # due to hash table "resonance"
  645. capacity = int(len(self.repository) * 1.2)
  646. self.chunks = ChunkIndex(capacity)
  647. marker = None
  648. while True:
  649. result = self.repository.list(limit=10000, marker=marker)
  650. if not result:
  651. break
  652. marker = result[-1]
  653. init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
  654. for id_ in result:
  655. self.chunks[id_] = init_entry
  656. def identify_key(self, repository):
  657. cdata = repository.get(next(self.chunks.iteritems())[0])
  658. return key_factory(repository, cdata)
  659. def rebuild_manifest(self):
  660. """Rebuild the manifest object if it is missing
  661. Iterates through all objects in the repository looking for archive metadata blocks.
  662. """
  663. logger.info('Rebuilding missing manifest, this might take some time...')
  664. manifest = Manifest(self.key, self.repository)
  665. for chunk_id, _ in self.chunks.iteritems():
  666. cdata = self.repository.get(chunk_id)
  667. _, data = self.key.decrypt(chunk_id, cdata)
  668. # Some basic sanity checks of the payload before feeding it into msgpack
  669. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  670. continue
  671. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  672. continue
  673. try:
  674. archive = msgpack.unpackb(data)
  675. # Ignore exceptions that might be raised when feeding
  676. # msgpack with invalid data
  677. except (TypeError, ValueError, StopIteration):
  678. continue
  679. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  680. logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
  681. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  682. logger.info('Manifest rebuild complete.')
  683. return manifest
  684. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  685. """Rebuild object reference counts by walking the metadata
  686. Missing and/or incorrect data is repaired when detected
  687. """
  688. # Exclude the manifest from chunks
  689. del self.chunks[Manifest.MANIFEST_ID]
  690. def mark_as_possibly_superseded(id_):
  691. if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
  692. self.possibly_superseded.add(id_)
  693. def add_callback(chunk):
  694. id_ = self.key.id_hash(chunk.data)
  695. cdata = self.key.encrypt(chunk)
  696. add_reference(id_, len(chunk.data), len(cdata), cdata)
  697. return id_
  698. def add_reference(id_, size, csize, cdata=None):
  699. try:
  700. self.chunks.incref(id_)
  701. except KeyError:
  702. assert cdata is not None
  703. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
  704. if self.repair:
  705. self.repository.put(id_, cdata)
  706. def verify_file_chunks(item):
  707. """Verifies that all file chunks are present
  708. Missing file chunks will be replaced with new chunks of the same
  709. length containing all zeros.
  710. """
  711. offset = 0
  712. chunk_list = []
  713. for chunk_id, size, csize in item[b'chunks']:
  714. if chunk_id not in self.chunks:
  715. # If a file chunk is missing, create an all empty replacement chunk
  716. logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  717. self.error_found = True
  718. data = bytes(size)
  719. chunk_id = self.key.id_hash(data)
  720. cdata = self.key.encrypt(Chunk(data))
  721. csize = len(cdata)
  722. add_reference(chunk_id, size, csize, cdata)
  723. else:
  724. add_reference(chunk_id, size, csize)
  725. chunk_list.append((chunk_id, size, csize))
  726. offset += size
  727. item[b'chunks'] = chunk_list
  728. def robust_iterator(archive):
  729. """Iterates through all archive items
  730. Missing item chunks will be skipped and the msgpack stream will be restarted
  731. """
  732. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  733. _state = 0
  734. def missing_chunk_detector(chunk_id):
  735. nonlocal _state
  736. if _state % 2 != int(chunk_id not in self.chunks):
  737. _state += 1
  738. return _state
  739. def report(msg, chunk_id, chunk_no):
  740. cid = hexlify(chunk_id).decode('ascii')
  741. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  742. self.error_found = True
  743. logger.error(msg)
  744. i = 0
  745. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  746. items = list(items)
  747. if state % 2:
  748. for chunk_id in items:
  749. report('item metadata chunk missing', chunk_id, i)
  750. i += 1
  751. continue
  752. if state > 0:
  753. unpacker.resync()
  754. for chunk_id, cdata in zip(items, repository.get_many(items)):
  755. _, data = self.key.decrypt(chunk_id, cdata)
  756. unpacker.feed(data)
  757. try:
  758. for item in unpacker:
  759. if isinstance(item, dict):
  760. yield item
  761. else:
  762. report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
  763. except Exception:
  764. report('Exception while unpacking item metadata', chunk_id, i)
  765. raise
  766. i += 1
  767. if archive is None:
  768. # we need last N or all archives
  769. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  770. key=lambda name_info: name_info[1][b'time'])
  771. if prefix is not None:
  772. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  773. num_archives = len(archive_items)
  774. end = None if last is None else min(num_archives, last)
  775. else:
  776. # we only want one specific archive
  777. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  778. num_archives = 1
  779. end = 1
  780. with cache_if_remote(self.repository) as repository:
  781. for i, (name, info) in enumerate(archive_items[:end]):
  782. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  783. archive_id = info[b'id']
  784. if archive_id not in self.chunks:
  785. logger.error('Archive metadata block is missing!')
  786. self.error_found = True
  787. del self.manifest.archives[name]
  788. continue
  789. mark_as_possibly_superseded(archive_id)
  790. cdata = self.repository.get(archive_id)
  791. _, data = self.key.decrypt(archive_id, cdata)
  792. archive = StableDict(msgpack.unpackb(data))
  793. if archive[b'version'] != 1:
  794. raise Exception('Unknown archive metadata version')
  795. decode_dict(archive, ARCHIVE_TEXT_KEYS)
  796. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  797. items_buffer = ChunkBuffer(self.key)
  798. items_buffer.write_chunk = add_callback
  799. for item in robust_iterator(archive):
  800. if b'chunks' in item:
  801. verify_file_chunks(item)
  802. items_buffer.add(item)
  803. items_buffer.flush(flush=True)
  804. for previous_item_id in archive[b'items']:
  805. mark_as_possibly_superseded(previous_item_id)
  806. archive[b'items'] = items_buffer.chunks
  807. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  808. new_archive_id = self.key.id_hash(data)
  809. cdata = self.key.encrypt(Chunk(data))
  810. add_reference(new_archive_id, len(data), len(cdata), cdata)
  811. info[b'id'] = new_archive_id
  812. def orphan_chunks_check(self):
  813. if self.check_all:
  814. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  815. orphaned = unused - self.possibly_superseded
  816. if orphaned:
  817. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  818. self.error_found = True
  819. if self.repair:
  820. for id_ in unused:
  821. self.repository.delete(id_)
  822. else:
  823. logger.info('Orphaned objects check skipped (needs all archives checked).')
  824. def finish(self, save_space=False):
  825. if self.repair:
  826. self.manifest.write()
  827. self.repository.commit(save_space=save_space)
  828. class ArchiveRecreater:
  829. AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
  830. """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
  831. class FakeTargetArchive:
  832. def __init__(self):
  833. self.stats = Statistics()
  834. class Interrupted(Exception):
  835. def __init__(self, metadata=None):
  836. self.metadata = metadata or {}
  837. @staticmethod
  838. def is_temporary_archive(archive_name):
  839. return archive_name.endswith('.recreate')
  840. def __init__(self, repository, manifest, key, cache, matcher,
  841. exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
  842. chunker_params=None, compression=None,
  843. dry_run=False, stats=False, progress=False, file_status_printer=None):
  844. self.repository = repository
  845. self.key = key
  846. self.manifest = manifest
  847. self.cache = cache
  848. self.matcher = matcher
  849. self.exclude_caches = exclude_caches
  850. self.exclude_if_present = exclude_if_present or []
  851. self.keep_tag_files = keep_tag_files
  852. self.chunker_params = chunker_params or CHUNKER_PARAMS
  853. self.compression = compression or dict(name='none')
  854. self.seen_chunks = set()
  855. self.recompress = bool(compression)
  856. compr_args = dict(buffer=COMPR_BUFFER)
  857. compr_args.update(self.compression)
  858. key.compressor = Compressor(**compr_args)
  859. self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
  860. logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
  861. self.dry_run = dry_run
  862. self.stats = stats
  863. self.progress = progress
  864. self.print_file_status = file_status_printer or (lambda *args: None)
  865. self.interrupt = False
  866. self.errors = False
  867. def recreate(self, archive_name, comment=None):
  868. assert not self.is_temporary_archive(archive_name)
  869. archive = self.open_archive(archive_name)
  870. target, resume_from = self.create_target_or_resume(archive)
  871. if self.exclude_if_present or self.exclude_caches:
  872. self.matcher_add_tagged_dirs(archive)
  873. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  874. logger.info("Skipping archive %s, nothing to do", archive_name)
  875. return True
  876. try:
  877. self.process_items(archive, target, resume_from)
  878. except self.Interrupted as e:
  879. return self.save(archive, target, completed=False, metadata=e.metadata)
  880. return self.save(archive, target, comment)
  881. def process_items(self, archive, target, resume_from=None):
  882. matcher = self.matcher
  883. target_is_subset = not matcher.empty()
  884. hardlink_masters = {} if target_is_subset else None
  885. def item_is_hardlink_master(item):
  886. return (target_is_subset and
  887. stat.S_ISREG(item[b'mode']) and
  888. item.get(b'hardlink_master', True) and
  889. b'source' not in item and
  890. not matcher.match(item[b'path']))
  891. for item in archive.iter_items():
  892. if item_is_hardlink_master(item):
  893. # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
  894. hardlink_masters[item[b'path']] = (item.get(b'chunks'), None)
  895. continue
  896. if resume_from:
  897. # Fast forward to after the last processed file
  898. if item[b'path'] == resume_from:
  899. logger.info('Fast-forwarded to %s', remove_surrogates(item[b'path']))
  900. resume_from = None
  901. continue
  902. if not matcher.match(item[b'path']):
  903. self.print_file_status('x', item[b'path'])
  904. continue
  905. if target_is_subset and stat.S_ISREG(item[b'mode']) and item.get(b'source') in hardlink_masters:
  906. # master of this hard link is outside the target subset
  907. chunks, new_source = hardlink_masters[item[b'source']]
  908. if new_source is None:
  909. # First item to use this master, move the chunks
  910. item[b'chunks'] = chunks
  911. hardlink_masters[item[b'source']] = (None, item[b'path'])
  912. del item[b'source']
  913. else:
  914. # Master was already moved, only update this item's source
  915. item[b'source'] = new_source
  916. if self.dry_run:
  917. self.print_file_status('-', item[b'path'])
  918. else:
  919. try:
  920. self.process_item(archive, target, item)
  921. except self.Interrupted:
  922. if self.progress:
  923. target.stats.show_progress(final=True)
  924. raise
  925. if self.progress:
  926. target.stats.show_progress(final=True)
  927. def process_item(self, archive, target, item):
  928. if b'chunks' in item:
  929. item[b'chunks'] = self.process_chunks(archive, target, item)
  930. target.stats.nfiles += 1
  931. target.add_item(item)
  932. self.print_file_status(file_status(item[b'mode']), item[b'path'])
  933. if self.interrupt:
  934. raise self.Interrupted
  935. def process_chunks(self, archive, target, item):
  936. """Return new chunk ID list for 'item'."""
  937. if not self.recompress and not target.recreate_rechunkify:
  938. for chunk_id, size, csize in item[b'chunks']:
  939. self.cache.chunk_incref(chunk_id, target.stats)
  940. return item[b'chunks']
  941. new_chunks = self.process_partial_chunks(target)
  942. chunk_iterator = self.create_chunk_iterator(archive, target, item)
  943. consume(chunk_iterator, len(new_chunks))
  944. for chunk in chunk_iterator:
  945. chunk_id = self.key.id_hash(chunk.data)
  946. if chunk_id in self.seen_chunks:
  947. new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
  948. else:
  949. # TODO: detect / skip / --always-recompress
  950. chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
  951. new_chunks.append((chunk_id, size, csize))
  952. self.seen_chunks.add(chunk_id)
  953. if self.recompress:
  954. # This tracks how many bytes are uncommitted but compactable, since we are recompressing
  955. # existing chunks.
  956. target.recreate_uncomitted_bytes += csize
  957. if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
  958. # Issue commits to limit additional space usage when recompressing chunks
  959. target.recreate_uncomitted_bytes = 0
  960. self.repository.commit()
  961. if self.progress:
  962. target.stats.show_progress(item=item, dt=0.2)
  963. if self.interrupt:
  964. raise self.Interrupted({
  965. 'recreate_partial_chunks': new_chunks,
  966. })
  967. return new_chunks
  968. def create_chunk_iterator(self, archive, target, item):
  969. """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
  970. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item[b'chunks']])
  971. if target.recreate_rechunkify:
  972. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  973. # (does not load the entire file into memory)
  974. file = ChunkIteratorFileWrapper(chunk_iterator)
  975. def _chunk_iterator():
  976. for data in target.chunker.chunkify(file):
  977. yield Chunk(data)
  978. chunk_iterator = _chunk_iterator()
  979. return chunk_iterator
  980. def process_partial_chunks(self, target):
  981. """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
  982. if not target.recreate_partial_chunks:
  983. return []
  984. # No incref, create_target_or_resume already did that before to deleting the old target archive
  985. # So just copy these over
  986. partial_chunks = target.recreate_partial_chunks
  987. target.recreate_partial_chunks = None
  988. for chunk_id, size, csize in partial_chunks:
  989. self.seen_chunks.add(chunk_id)
  990. logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
  991. return partial_chunks
  992. def save(self, archive, target, comment=None, completed=True, metadata=None):
  993. """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
  994. if self.dry_run:
  995. return completed
  996. if completed:
  997. timestamp = archive.ts.replace(tzinfo=None)
  998. if comment is None:
  999. comment = archive.metadata.get(b'comment', '')
  1000. target.save(timestamp=timestamp, comment=comment, additional_metadata={
  1001. 'cmdline': archive.metadata[b'cmdline'],
  1002. 'recreate_cmdline': sys.argv,
  1003. })
  1004. archive.delete(Statistics(), progress=self.progress)
  1005. target.rename(archive.name)
  1006. if self.stats:
  1007. target.end = datetime.utcnow()
  1008. log_multi(DASHES,
  1009. str(target),
  1010. DASHES,
  1011. str(target.stats),
  1012. str(self.cache),
  1013. DASHES)
  1014. else:
  1015. additional_metadata = metadata or {}
  1016. additional_metadata.update({
  1017. 'recreate_source_id': archive.id,
  1018. 'recreate_args': sys.argv[1:],
  1019. })
  1020. target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
  1021. logger.info('Run the same command again to resume.')
  1022. return completed
  1023. def matcher_add_tagged_dirs(self, archive):
  1024. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  1025. def exclude(dir, tag_item):
  1026. if self.keep_tag_files:
  1027. tag_files.append(PathPrefixPattern(tag_item[b'path']))
  1028. tagged_dirs.append(FnmatchPattern(dir + '/'))
  1029. else:
  1030. tagged_dirs.append(PathPrefixPattern(dir))
  1031. matcher = self.matcher
  1032. tag_files = []
  1033. tagged_dirs = []
  1034. # build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
  1035. cachedir_masters = {}
  1036. for item in archive.iter_items(
  1037. filter=lambda item: item[b'path'].endswith(CACHE_TAG_NAME) or matcher.match(item[b'path'])):
  1038. if item[b'path'].endswith(CACHE_TAG_NAME):
  1039. cachedir_masters[item[b'path']] = item
  1040. if stat.S_ISREG(item[b'mode']):
  1041. dir, tag_file = os.path.split(item[b'path'])
  1042. if tag_file in self.exclude_if_present:
  1043. exclude(dir, item)
  1044. if self.exclude_caches and tag_file == CACHE_TAG_NAME:
  1045. if b'chunks' in item:
  1046. file = open_item(archive, item)
  1047. else:
  1048. file = open_item(archive, cachedir_masters[item[b'source']])
  1049. if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
  1050. exclude(dir, item)
  1051. matcher.add(tag_files, True)
  1052. matcher.add(tagged_dirs, False)
  1053. def create_target_or_resume(self, archive):
  1054. """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
  1055. if self.dry_run:
  1056. return self.FakeTargetArchive(), None
  1057. target_name = archive.name + '.recreate'
  1058. resume = target_name in self.manifest.archives
  1059. target, resume_from = None, None
  1060. if resume:
  1061. target, resume_from = self.try_resume(archive, target_name)
  1062. if not target:
  1063. target = self.create_target_archive(target_name)
  1064. # If the archives use the same chunker params, then don't rechunkify
  1065. target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
  1066. return target, resume_from
  1067. def try_resume(self, archive, target_name):
  1068. """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
  1069. logger.info('Found %s, will resume interrupted operation', target_name)
  1070. old_target = self.open_archive(target_name)
  1071. resume_id = old_target.metadata[b'recreate_source_id']
  1072. resume_args = [arg.decode('utf-8', 'surrogateescape') for arg in old_target.metadata[b'recreate_args']]
  1073. if resume_id != archive.id:
  1074. logger.warning('Source archive changed, will discard %s and start over', target_name)
  1075. logger.warning('Saved fingerprint: %s', hexlify(resume_id).decode('ascii'))
  1076. logger.warning('Current fingerprint: %s', archive.fpr)
  1077. old_target.delete(Statistics(), progress=self.progress)
  1078. return None, None # can't resume
  1079. if resume_args != sys.argv[1:]:
  1080. logger.warning('Command line changed, this might lead to inconsistencies')
  1081. logger.warning('Saved: %s', repr(resume_args))
  1082. logger.warning('Current: %s', repr(sys.argv[1:]))
  1083. target = self.create_target_archive(target_name + '.temp')
  1084. logger.info('Replaying items from interrupted operation...')
  1085. item = None
  1086. for item in old_target.iter_items():
  1087. if b'chunks' in item:
  1088. for chunk in item[b'chunks']:
  1089. self.cache.chunk_incref(chunk.id, target.stats)
  1090. target.stats.nfiles += 1
  1091. target.add_item(item)
  1092. if item:
  1093. resume_from = item[b'path']
  1094. else:
  1095. resume_from = None
  1096. if self.progress:
  1097. old_target.stats.show_progress(final=True)
  1098. target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
  1099. for chunk_id, size, csize in target.recreate_partial_chunks:
  1100. if not self.cache.seen_chunk(chunk_id):
  1101. try:
  1102. # Repository has __contains__, RemoteRepository doesn't
  1103. self.repository.get(chunk_id)
  1104. except Repository.ObjectNotFound:
  1105. # delete/prune/check between invocations: these chunks are gone.
  1106. target.recreate_partial_chunks = None
  1107. break
  1108. # fast-lane insert into chunks cache
  1109. self.cache.chunks[chunk_id] = (1, size, csize)
  1110. target.stats.update(size, csize, True)
  1111. continue
  1112. # incref now, otherwise old_target.delete() might delete these chunks
  1113. self.cache.chunk_incref(chunk_id, target.stats)
  1114. old_target.delete(Statistics(), progress=self.progress)
  1115. logger.info('Done replaying items')
  1116. return target, resume_from
  1117. def create_target_archive(self, name):
  1118. target = Archive(self.repository, self.key, self.manifest, name, create=True,
  1119. progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
  1120. checkpoint_interval=0)
  1121. target.recreate_partial_chunks = None
  1122. target.recreate_uncomitted_bytes = 0
  1123. return target
  1124. def open_archive(self, name, **kwargs):
  1125. return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)