2
0

archive.py 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372
  1. from datetime import datetime, timezone
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. from .logger import create_logger
  6. logger = create_logger()
  7. from .key import key_factory
  8. from .remote import cache_if_remote
  9. import os
  10. from shutil import get_terminal_size
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from . import xattr
  17. from .compress import COMPR_BUFFER
  18. from .constants import * # NOQA
  19. from .helpers import Chunk, Error, uid2user, user2uid, gid2group, group2gid, \
  20. parse_timestamp, to_localtime, format_time, format_timedelta, safe_encode, safe_decode, \
  21. Manifest, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, bin_to_hex, \
  22. ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, \
  23. PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume, \
  24. CompressionDecider1, CompressionDecider2, CompressionSpec, \
  25. IntegrityError
  26. from .repository import Repository
  27. if sys.platform == 'win32':
  28. from .platform import get_owner, set_owner
  29. from .platform import acl_get, acl_set, set_flags, get_flags, swidth
  30. from .chunker import Chunker
  31. from .hashindex import ChunkIndex, ChunkIndexEntry
  32. from .cache import ChunkListEntry
  33. import msgpack
  34. has_lchmod = hasattr(os, 'lchmod')
  35. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  36. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  37. class Statistics:
  38. def __init__(self):
  39. self.osize = self.csize = self.usize = self.nfiles = 0
  40. self.last_progress = 0 # timestamp when last progress was shown
  41. def update(self, size, csize, unique):
  42. self.osize += size
  43. self.csize += csize
  44. if unique:
  45. self.usize += csize
  46. summary = """\
  47. Original size Compressed size Deduplicated size
  48. {label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"""
  49. def __str__(self):
  50. return self.summary.format(stats=self, label='This archive:')
  51. def __repr__(self):
  52. return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
  53. cls=type(self).__name__, hash=id(self), self=self)
  54. @property
  55. def osize_fmt(self):
  56. return format_file_size(self.osize)
  57. @property
  58. def usize_fmt(self):
  59. return format_file_size(self.usize)
  60. @property
  61. def csize_fmt(self):
  62. return format_file_size(self.csize)
  63. def show_progress(self, item=None, final=False, stream=None, dt=None):
  64. now = time.time()
  65. if dt is None or now - self.last_progress > dt:
  66. self.last_progress = now
  67. columns, lines = get_terminal_size()
  68. if not final:
  69. msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
  70. path = remove_surrogates(item[b'path']) if item else ''
  71. space = columns - swidth(msg)
  72. if space < swidth('...') + swidth(path):
  73. path = '%s...%s' % (path[:(space // 2) - swidth('...')], path[-space // 2:])
  74. msg += "{0:<{space}}".format(path, space=space)
  75. else:
  76. msg = ' ' * columns
  77. print(msg, file=stream or sys.stderr, end="\r", flush=True)
  78. class DownloadPipeline:
  79. def __init__(self, repository, key):
  80. self.repository = repository
  81. self.key = key
  82. def unpack_many(self, ids, filter=None, preload=False):
  83. unpacker = msgpack.Unpacker(use_list=False)
  84. for _, data in self.fetch_many(ids):
  85. unpacker.feed(data)
  86. items = [decode_dict(item, ITEM_TEXT_KEYS) for item in unpacker]
  87. if filter:
  88. items = [item for item in items if filter(item)]
  89. for item in items:
  90. if b'chunks' in item:
  91. item[b'chunks'] = [ChunkListEntry(*e) for e in item[b'chunks']]
  92. if preload:
  93. for item in items:
  94. if b'chunks' in item:
  95. self.repository.preload([c.id for c in item[b'chunks']])
  96. for item in items:
  97. yield item
  98. def fetch_many(self, ids, is_preloaded=False):
  99. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  100. yield self.key.decrypt(id_, data)
  101. class ChunkBuffer:
  102. BUFFER_SIZE = 1 * 1024 * 1024
  103. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  104. self.buffer = BytesIO()
  105. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  106. self.chunks = []
  107. self.key = key
  108. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  109. def add(self, item):
  110. self.buffer.write(self.packer.pack(StableDict(item)))
  111. if self.is_full():
  112. self.flush()
  113. def write_chunk(self, chunk):
  114. raise NotImplementedError
  115. def flush(self, flush=False):
  116. if self.buffer.tell() == 0:
  117. return
  118. self.buffer.seek(0)
  119. chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
  120. self.buffer.seek(0)
  121. self.buffer.truncate(0)
  122. # Leave the last partial chunk in the buffer unless flush is True
  123. end = None if flush or len(chunks) == 1 else -1
  124. for chunk in chunks[:end]:
  125. self.chunks.append(self.write_chunk(chunk))
  126. if end == -1:
  127. self.buffer.write(chunks[-1].data)
  128. def is_full(self):
  129. return self.buffer.tell() > self.BUFFER_SIZE
  130. class CacheChunkBuffer(ChunkBuffer):
  131. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  132. super().__init__(key, chunker_params)
  133. self.cache = cache
  134. self.stats = stats
  135. def write_chunk(self, chunk):
  136. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
  137. return id_
  138. class Archive:
  139. class DoesNotExist(Error):
  140. """Archive {} does not exist"""
  141. class AlreadyExists(Error):
  142. """Archive {} already exists"""
  143. class IncompatibleFilesystemEncodingError(Error):
  144. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  145. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  146. checkpoint_interval=300, numeric_owner=False, progress=False,
  147. chunker_params=CHUNKER_PARAMS, start=None, end=None, compression=None, compression_files=None):
  148. self.cwd = os.getcwd()
  149. self.key = key
  150. self.repository = repository
  151. self.cache = cache
  152. self.manifest = manifest
  153. self.hard_links = {}
  154. self.stats = Statistics()
  155. self.show_progress = progress
  156. self.name = name
  157. self.checkpoint_interval = checkpoint_interval
  158. self.numeric_owner = numeric_owner
  159. if start is None:
  160. start = datetime.utcnow()
  161. self.chunker_params = chunker_params
  162. self.start = start
  163. if end is None:
  164. end = datetime.utcnow()
  165. self.end = end
  166. self.pipeline = DownloadPipeline(self.repository, self.key)
  167. if create:
  168. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  169. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  170. self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
  171. compression_files or [])
  172. key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
  173. if name in manifest.archives:
  174. raise self.AlreadyExists(name)
  175. self.last_checkpoint = time.time()
  176. i = 0
  177. while True:
  178. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  179. if self.checkpoint_name not in manifest.archives:
  180. break
  181. i += 1
  182. else:
  183. if name not in self.manifest.archives:
  184. raise self.DoesNotExist(name)
  185. info = self.manifest.archives[name]
  186. self.load(info[b'id'])
  187. self.zeros = b'\0' * (1 << chunker_params[1])
  188. def _load_meta(self, id):
  189. _, data = self.key.decrypt(id, self.repository.get(id))
  190. metadata = msgpack.unpackb(data)
  191. if metadata[b'version'] != 1:
  192. raise Exception('Unknown archive metadata version')
  193. return metadata
  194. def load(self, id):
  195. self.id = id
  196. self.metadata = self._load_meta(self.id)
  197. decode_dict(self.metadata, ARCHIVE_TEXT_KEYS)
  198. self.metadata[b'cmdline'] = [safe_decode(arg) for arg in self.metadata[b'cmdline']]
  199. self.name = self.metadata[b'name']
  200. @property
  201. def ts(self):
  202. """Timestamp of archive creation (start) in UTC"""
  203. ts = self.metadata[b'time']
  204. return parse_timestamp(ts)
  205. @property
  206. def ts_end(self):
  207. """Timestamp of archive creation (end) in UTC"""
  208. # fall back to time if there is no time_end present in metadata
  209. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  210. return parse_timestamp(ts)
  211. @property
  212. def fpr(self):
  213. return bin_to_hex(self.id)
  214. @property
  215. def duration(self):
  216. return format_timedelta(self.end - self.start)
  217. @property
  218. def duration_from_meta(self):
  219. return format_timedelta(self.ts_end - self.ts)
  220. def __str__(self):
  221. return '''\
  222. Archive name: {0.name}
  223. Archive fingerprint: {0.fpr}
  224. Time (start): {start}
  225. Time (end): {end}
  226. Duration: {0.duration}
  227. Number of files: {0.stats.nfiles}'''.format(
  228. self,
  229. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  230. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  231. def __repr__(self):
  232. return 'Archive(%r)' % self.name
  233. def iter_items(self, filter=None, preload=False):
  234. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  235. yield item
  236. def add_item(self, item):
  237. unknown_keys = set(item) - ITEM_KEYS
  238. assert not unknown_keys, ('unknown item metadata keys detected, please update constants.ITEM_KEYS: %s',
  239. ','.join(k.decode('ascii') for k in unknown_keys))
  240. if self.show_progress:
  241. self.stats.show_progress(item=item, dt=0.2)
  242. self.items_buffer.add(item)
  243. if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
  244. self.write_checkpoint()
  245. self.last_checkpoint = time.time()
  246. def write_checkpoint(self):
  247. self.save(self.checkpoint_name)
  248. del self.manifest.archives[self.checkpoint_name]
  249. self.cache.chunk_decref(self.id, self.stats)
  250. def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
  251. name = name or self.name
  252. if name in self.manifest.archives:
  253. raise self.AlreadyExists(name)
  254. self.items_buffer.flush(flush=True)
  255. if timestamp is None:
  256. self.end = datetime.utcnow()
  257. start = self.start
  258. end = self.end
  259. else:
  260. self.end = timestamp
  261. start = timestamp
  262. end = timestamp # we only have 1 value
  263. metadata = {
  264. 'version': 1,
  265. 'name': name,
  266. 'comment': comment,
  267. 'items': self.items_buffer.chunks,
  268. 'cmdline': sys.argv,
  269. 'hostname': socket.gethostname(),
  270. 'username': getuser(),
  271. 'time': start.isoformat(),
  272. 'time_end': end.isoformat(),
  273. 'chunker_params': self.chunker_params,
  274. }
  275. metadata.update(additional_metadata or {})
  276. data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
  277. self.id = self.key.id_hash(data)
  278. self.cache.add_chunk(self.id, Chunk(data), self.stats)
  279. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  280. self.manifest.write()
  281. self.repository.commit()
  282. self.cache.commit()
  283. def calc_stats(self, cache):
  284. def add(id):
  285. count, size, csize = cache.chunks[id]
  286. stats.update(size, csize, count == 1)
  287. cache.chunks[id] = count - 1, size, csize
  288. def add_file_chunks(chunks):
  289. for id, _, _ in chunks:
  290. add(id)
  291. # This function is a bit evil since it abuses the cache to calculate
  292. # the stats. The cache transaction must be rolled back afterwards
  293. unpacker = msgpack.Unpacker(use_list=False)
  294. cache.begin_txn()
  295. stats = Statistics()
  296. add(self.id)
  297. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  298. add(id)
  299. _, data = self.key.decrypt(id, chunk)
  300. unpacker.feed(data)
  301. for item in unpacker:
  302. if b'chunks' in item:
  303. stats.nfiles += 1
  304. add_file_chunks(item[b'chunks'])
  305. cache.rollback()
  306. return stats
  307. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
  308. hardlink_masters=None, original_path=None):
  309. """
  310. Extract archive item.
  311. :param item: the item to extract
  312. :param restore_attrs: restore file attributes
  313. :param dry_run: do not write any data
  314. :param stdout: write extracted data to stdout
  315. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  316. :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
  317. :param original_path: b'path' key as stored in archive
  318. """
  319. if dry_run or stdout:
  320. if b'chunks' in item:
  321. for _, data in self.pipeline.fetch_many([c.id for c in item[b'chunks']], is_preloaded=True):
  322. if stdout:
  323. sys.stdout.buffer.write(data)
  324. if stdout:
  325. sys.stdout.buffer.flush()
  326. return
  327. original_path = original_path or item[b'path']
  328. dest = self.cwd
  329. if item[b'path'].startswith('/') or item[b'path'].startswith('..') or (sys.platform == 'win32' and item[b'path'][1] == ':'):
  330. raise Exception('Path should be relative and local')
  331. path = os.path.join(dest, item[b'path'])
  332. # Attempt to remove existing files, ignore errors on failure
  333. try:
  334. st = os.lstat(path)
  335. if stat.S_ISDIR(st.st_mode):
  336. os.rmdir(path)
  337. else:
  338. os.unlink(path)
  339. except UnicodeEncodeError:
  340. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  341. except OSError:
  342. pass
  343. mode = item[b'mode']
  344. if stat.S_ISREG(mode):
  345. if not os.path.exists(os.path.dirname(path)):
  346. os.makedirs(os.path.dirname(path))
  347. # Hard link?
  348. if b'source' in item:
  349. source = os.path.join(dest, item[b'source'])
  350. if os.path.exists(path):
  351. os.unlink(path)
  352. if not hardlink_masters:
  353. os.link(source, path)
  354. return
  355. item[b'chunks'], link_target = hardlink_masters[item[b'source']]
  356. if link_target:
  357. # Hard link was extracted previously, just link
  358. os.link(link_target, path)
  359. return
  360. # Extract chunks, since the item which had the chunks was not extracted
  361. with open(path, 'wb') as fd:
  362. ids = [c.id for c in item[b'chunks']]
  363. for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
  364. if sparse and self.zeros.startswith(data):
  365. # all-zero chunk: create a hole in a sparse file
  366. fd.seek(len(data), 1)
  367. else:
  368. fd.write(data)
  369. pos = fd.tell()
  370. fd.truncate(pos)
  371. fd.flush()
  372. if sys.platform != 'win32':
  373. self.restore_attrs(path, item, fd=fd.fileno())
  374. else:
  375. # File needs to be closed or timestamps are rewritten at close
  376. fd.close()
  377. self.restore_attrs(path, item)
  378. if hardlink_masters:
  379. # Update master entry with extracted file path, so that following hardlinks don't extract twice.
  380. hardlink_masters[item.get(b'source') or original_path] = (None, path)
  381. elif stat.S_ISDIR(mode):
  382. if not os.path.exists(path):
  383. os.makedirs(path)
  384. if restore_attrs:
  385. self.restore_attrs(path, item)
  386. elif stat.S_ISLNK(mode):
  387. if not os.path.exists(os.path.dirname(path)):
  388. os.makedirs(os.path.dirname(path))
  389. source = item[b'source']
  390. if os.path.exists(path):
  391. os.unlink(path)
  392. try:
  393. os.symlink(source, path)
  394. except UnicodeEncodeError:
  395. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  396. self.restore_attrs(path, item, symlink=True)
  397. elif stat.S_ISFIFO(mode):
  398. if not os.path.exists(os.path.dirname(path)):
  399. os.makedirs(os.path.dirname(path))
  400. os.mkfifo(path)
  401. self.restore_attrs(path, item)
  402. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  403. os.mknod(path, item[b'mode'], item[b'rdev'])
  404. self.restore_attrs(path, item)
  405. else:
  406. raise Exception('Unknown archive item type %r' % item[b'mode'])
  407. def restore_attrs(self, path, item, symlink=False, fd=None):
  408. uid = gid = None
  409. if not self.numeric_owner:
  410. uid = user2uid(item[b'user'])
  411. gid = group2gid(item[b'group'])
  412. uid = item[b'uid'] if uid is None else uid
  413. gid = item[b'gid'] if gid is None else gid
  414. # This code is a bit of a mess due to os specific differences
  415. if sys.platform != 'win32':
  416. try:
  417. if fd:
  418. os.fchown(fd, uid, gid)
  419. else:
  420. os.lchown(path, uid, gid)
  421. except OSError:
  422. pass
  423. else:
  424. try:
  425. set_owner(path, item[b'user'], safe_decode(item[b'uid']))
  426. except OSError:
  427. pass
  428. if sys.platform != 'win32':
  429. if fd:
  430. os.fchmod(fd, item[b'mode'])
  431. elif not symlink:
  432. os.chmod(path, item[b'mode'])
  433. elif has_lchmod: # Not available on Linux
  434. os.lchmod(path, item[b'mode'])
  435. mtime = bigint_to_int(item[b'mtime'])
  436. if b'atime' in item:
  437. atime = bigint_to_int(item[b'atime'])
  438. else:
  439. # old archives only had mtime in item metadata
  440. atime = mtime
  441. if sys.platform == 'win32':
  442. os.utime(path, ns=(atime, mtime))
  443. elif fd:
  444. os.utime(fd, None, ns=(atime, mtime))
  445. else:
  446. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  447. acl_set(path, item, self.numeric_owner)
  448. if b'bsdflags' in item:
  449. try:
  450. set_flags(path, item[b'bsdflags'], fd=fd)
  451. except OSError:
  452. pass
  453. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  454. # the Linux capabilities in the "security.capability" attribute.
  455. xattrs = item.get(b'xattrs', {})
  456. for k, v in xattrs.items():
  457. try:
  458. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  459. except OSError as e:
  460. if e.errno not in (errno.ENOTSUP, errno.EACCES):
  461. # only raise if the errno is not on our ignore list:
  462. # ENOTSUP == xattrs not supported here
  463. # EACCES == permission denied to set this specific xattr
  464. # (this may happen related to security.* keys)
  465. raise
  466. def set_meta(self, key, value):
  467. metadata = StableDict(self._load_meta(self.id))
  468. metadata[key] = value
  469. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  470. new_id = self.key.id_hash(data)
  471. self.cache.add_chunk(new_id, Chunk(data), self.stats)
  472. self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
  473. self.cache.chunk_decref(self.id, self.stats)
  474. self.id = new_id
  475. def rename(self, name):
  476. if name in self.manifest.archives:
  477. raise self.AlreadyExists(name)
  478. oldname = self.name
  479. self.name = name
  480. self.set_meta(b'name', name)
  481. del self.manifest.archives[oldname]
  482. def delete(self, stats, progress=False):
  483. unpacker = msgpack.Unpacker(use_list=False)
  484. items_ids = self.metadata[b'items']
  485. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  486. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  487. if progress:
  488. pi.show(i)
  489. _, data = self.key.decrypt(items_id, data)
  490. unpacker.feed(data)
  491. self.cache.chunk_decref(items_id, stats)
  492. for item in unpacker:
  493. if b'chunks' in item:
  494. for chunk_id, size, csize in item[b'chunks']:
  495. self.cache.chunk_decref(chunk_id, stats)
  496. if progress:
  497. pi.finish()
  498. self.cache.chunk_decref(self.id, stats)
  499. del self.manifest.archives[self.name]
  500. def stat_attrs(self, st, path):
  501. item = {}
  502. if sys.platform == 'win32':
  503. owner = get_owner(path)
  504. item = {
  505. b'mode': st.st_mode,
  506. b'uid': owner[1], b'user': owner[0],
  507. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  508. b'atime': int_to_bigint(st.st_atime_ns),
  509. b'ctime': int_to_bigint(st.st_ctime_ns),
  510. b'mtime': int_to_bigint(st.st_mtime_ns),
  511. }
  512. else:
  513. item = {
  514. b'mode': st.st_mode,
  515. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  516. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  517. b'atime': int_to_bigint(st.st_atime_ns),
  518. b'ctime': int_to_bigint(st.st_ctime_ns),
  519. b'mtime': int_to_bigint(st.st_mtime_ns),
  520. }
  521. if self.numeric_owner:
  522. item[b'user'] = item[b'group'] = None
  523. xattrs = xattr.get_all(path, follow_symlinks=False)
  524. if xattrs:
  525. item[b'xattrs'] = StableDict(xattrs)
  526. bsdflags = get_flags(path, st)
  527. if bsdflags:
  528. item[b'bsdflags'] = bsdflags
  529. acl_get(path, item, st, self.numeric_owner)
  530. return item
  531. def process_dir(self, path, st):
  532. item = {b'path': make_path_safe(path)}
  533. item.update(self.stat_attrs(st, path))
  534. self.add_item(item)
  535. return 'd' # directory
  536. def process_fifo(self, path, st):
  537. item = {b'path': make_path_safe(path)}
  538. item.update(self.stat_attrs(st, path))
  539. self.add_item(item)
  540. return 'f' # fifo
  541. def process_dev(self, path, st):
  542. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  543. item.update(self.stat_attrs(st, path))
  544. self.add_item(item)
  545. if stat.S_ISCHR(st.st_mode):
  546. return 'c' # char device
  547. elif stat.S_ISBLK(st.st_mode):
  548. return 'b' # block device
  549. def process_symlink(self, path, st):
  550. source = os.readlink(path)
  551. item = {b'path': make_path_safe(path), b'source': source}
  552. item.update(self.stat_attrs(st, path))
  553. self.add_item(item)
  554. return 's' # symlink
  555. def process_stdin(self, path, cache):
  556. uid, gid = 0, 0
  557. fd = sys.stdin.buffer # binary
  558. chunks = []
  559. for data in self.chunker.chunkify(fd):
  560. chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
  561. self.stats.nfiles += 1
  562. t = int_to_bigint(int(time.time()) * 1000000000)
  563. item = {
  564. b'path': path,
  565. b'chunks': chunks,
  566. b'mode': 0o100660, # regular file, ug=rw
  567. b'uid': uid, b'user': uid2user(uid),
  568. b'gid': gid, b'group': gid2group(gid),
  569. b'mtime': t, b'atime': t, b'ctime': t,
  570. }
  571. self.add_item(item)
  572. return 'i' # stdin
  573. def process_file(self, path, st, cache, ignore_inode=False):
  574. status = None
  575. safe_path = make_path_safe(path)
  576. # Is it a hard link?
  577. if st.st_nlink > 1:
  578. source = self.hard_links.get((st.st_ino, st.st_dev))
  579. if (st.st_ino, st.st_dev) in self.hard_links:
  580. item = self.stat_attrs(st, path)
  581. item.update({
  582. b'path': safe_path,
  583. b'source': source,
  584. })
  585. self.add_item(item)
  586. status = 'h' # regular file, hardlink (to already seen inodes)
  587. return status
  588. else:
  589. self.hard_links[st.st_ino, st.st_dev] = safe_path
  590. path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
  591. first_run = not cache.files
  592. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  593. if first_run:
  594. logger.debug('Processing files ...')
  595. chunks = None
  596. if ids is not None:
  597. # Make sure all ids are available
  598. for id_ in ids:
  599. if not cache.seen_chunk(id_):
  600. break
  601. else:
  602. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  603. status = 'U' # regular file, unchanged
  604. else:
  605. status = 'A' # regular file, added
  606. item = {
  607. b'path': safe_path,
  608. b'hardlink_master': st.st_nlink > 1, # item is a hard link and has the chunks
  609. }
  610. # Only chunkify the file if needed
  611. if chunks is None:
  612. compress = self.compression_decider1.decide(path)
  613. logger.debug('%s -> compression %s', path, compress['name'])
  614. fh = Archive._open_rb(path)
  615. with os.fdopen(fh, 'rb') as fd:
  616. chunks = []
  617. for data in self.chunker.chunkify(fd, fh):
  618. chunks.append(cache.add_chunk(self.key.id_hash(data),
  619. Chunk(data, compress=compress),
  620. self.stats))
  621. if self.show_progress:
  622. self.stats.show_progress(item=item, dt=0.2)
  623. cache.memorize_file(path_hash, st, [c.id for c in chunks])
  624. status = status or 'M' # regular file, modified (if not 'A' already)
  625. item[b'chunks'] = chunks
  626. item.update(self.stat_attrs(st, path))
  627. self.stats.nfiles += 1
  628. self.add_item(item)
  629. return status
  630. @staticmethod
  631. def list_archives(repository, key, manifest, cache=None):
  632. # expensive! see also Manifest.list_archive_infos.
  633. for name, info in manifest.archives.items():
  634. yield Archive(repository, key, manifest, name, cache=cache)
  635. @staticmethod
  636. def _open_rb(path):
  637. try:
  638. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  639. return os.open(path, flags_noatime)
  640. except PermissionError:
  641. if flags_noatime == flags_normal:
  642. # we do not have O_NOATIME, no need to try again:
  643. raise
  644. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  645. return os.open(path, flags_normal)
  646. class RobustUnpacker:
  647. """A restartable/robust version of the streaming msgpack unpacker
  648. """
  649. def __init__(self, validator):
  650. super().__init__()
  651. self.item_keys = [msgpack.packb(name) for name in ITEM_KEYS]
  652. self.validator = validator
  653. self._buffered_data = []
  654. self._resync = False
  655. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  656. def resync(self):
  657. self._buffered_data = []
  658. self._resync = True
  659. def feed(self, data):
  660. if self._resync:
  661. self._buffered_data.append(data)
  662. else:
  663. self._unpacker.feed(data)
  664. def __iter__(self):
  665. return self
  666. def __next__(self):
  667. if self._resync:
  668. data = b''.join(self._buffered_data)
  669. while self._resync:
  670. if not data:
  671. raise StopIteration
  672. # Abort early if the data does not look like a serialized dict
  673. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  674. data = data[1:]
  675. continue
  676. # Make sure it looks like an item dict
  677. for pattern in self.item_keys:
  678. if data[1:].startswith(pattern):
  679. break
  680. else:
  681. data = data[1:]
  682. continue
  683. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  684. self._unpacker.feed(data)
  685. try:
  686. item = next(self._unpacker)
  687. if self.validator(item):
  688. self._resync = False
  689. return item
  690. # Ignore exceptions that might be raised when feeding
  691. # msgpack with invalid data
  692. except (TypeError, ValueError, StopIteration):
  693. pass
  694. data = data[1:]
  695. else:
  696. return next(self._unpacker)
  697. class ArchiveChecker:
  698. def __init__(self):
  699. self.error_found = False
  700. self.possibly_superseded = set()
  701. def check(self, repository, repair=False, archive=None, last=None, prefix=None, verify_data=False,
  702. save_space=False):
  703. """Perform a set of checks on 'repository'
  704. :param repair: enable repair mode, write updated or corrected data into repository
  705. :param archive: only check this archive
  706. :param last: only check this number of recent archives
  707. :param prefix: only check archives with this prefix
  708. :param verify_data: integrity verification of data referenced by archives
  709. :param save_space: Repository.commit(save_space)
  710. """
  711. logger.info('Starting archive consistency check...')
  712. self.check_all = archive is None and last is None and prefix is None
  713. self.repair = repair
  714. self.repository = repository
  715. self.init_chunks()
  716. self.key = self.identify_key(repository)
  717. if Manifest.MANIFEST_ID not in self.chunks:
  718. logger.error("Repository manifest not found!")
  719. self.error_found = True
  720. self.manifest = self.rebuild_manifest()
  721. else:
  722. self.manifest, _ = Manifest.load(repository, key=self.key)
  723. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  724. if verify_data:
  725. self.verify_data()
  726. self.orphan_chunks_check()
  727. self.finish(save_space=save_space)
  728. if self.error_found:
  729. logger.error('Archive consistency check complete, problems found.')
  730. else:
  731. logger.info('Archive consistency check complete, no problems found.')
  732. return self.repair or not self.error_found
  733. def init_chunks(self):
  734. """Fetch a list of all object keys from repository
  735. """
  736. # Explicitly set the initial hash table capacity to avoid performance issues
  737. # due to hash table "resonance"
  738. capacity = int(len(self.repository) * 1.2)
  739. self.chunks = ChunkIndex(capacity)
  740. marker = None
  741. while True:
  742. result = self.repository.list(limit=10000, marker=marker)
  743. if not result:
  744. break
  745. marker = result[-1]
  746. init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
  747. for id_ in result:
  748. self.chunks[id_] = init_entry
  749. def identify_key(self, repository):
  750. cdata = repository.get(next(self.chunks.iteritems())[0])
  751. return key_factory(repository, cdata)
  752. def verify_data(self):
  753. logger.info('Starting cryptographic data integrity verification...')
  754. pi = ProgressIndicatorPercent(total=len(self.chunks), msg="Verifying data %6.2f%%", step=0.01, same_line=True)
  755. count = errors = 0
  756. for chunk_id, (refcount, *_) in self.chunks.iteritems():
  757. pi.show()
  758. if not refcount:
  759. continue
  760. encrypted_data = self.repository.get(chunk_id)
  761. try:
  762. _, data = self.key.decrypt(chunk_id, encrypted_data)
  763. except IntegrityError as integrity_error:
  764. self.error_found = True
  765. errors += 1
  766. logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
  767. count += 1
  768. pi.finish()
  769. log = logger.error if errors else logger.info
  770. log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.', count, errors)
  771. def rebuild_manifest(self):
  772. """Rebuild the manifest object if it is missing
  773. Iterates through all objects in the repository looking for archive metadata blocks.
  774. """
  775. logger.info('Rebuilding missing manifest, this might take some time...')
  776. manifest = Manifest(self.key, self.repository)
  777. for chunk_id, _ in self.chunks.iteritems():
  778. cdata = self.repository.get(chunk_id)
  779. _, data = self.key.decrypt(chunk_id, cdata)
  780. # Some basic sanity checks of the payload before feeding it into msgpack
  781. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  782. continue
  783. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  784. continue
  785. try:
  786. archive = msgpack.unpackb(data)
  787. # Ignore exceptions that might be raised when feeding
  788. # msgpack with invalid data
  789. except (TypeError, ValueError, StopIteration):
  790. continue
  791. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  792. logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
  793. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  794. logger.info('Manifest rebuild complete.')
  795. return manifest
  796. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  797. """Rebuild object reference counts by walking the metadata
  798. Missing and/or incorrect data is repaired when detected
  799. """
  800. # Exclude the manifest from chunks
  801. del self.chunks[Manifest.MANIFEST_ID]
  802. def mark_as_possibly_superseded(id_):
  803. if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
  804. self.possibly_superseded.add(id_)
  805. def add_callback(chunk):
  806. id_ = self.key.id_hash(chunk.data)
  807. cdata = self.key.encrypt(chunk)
  808. add_reference(id_, len(chunk.data), len(cdata), cdata)
  809. return id_
  810. def add_reference(id_, size, csize, cdata=None):
  811. try:
  812. self.chunks.incref(id_)
  813. except KeyError:
  814. assert cdata is not None
  815. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
  816. if self.repair:
  817. self.repository.put(id_, cdata)
  818. def verify_file_chunks(item):
  819. """Verifies that all file chunks are present
  820. Missing file chunks will be replaced with new chunks of the same
  821. length containing all zeros.
  822. """
  823. offset = 0
  824. chunk_list = []
  825. for chunk_id, size, csize in item[b'chunks']:
  826. if chunk_id not in self.chunks:
  827. # If a file chunk is missing, create an all empty replacement chunk
  828. logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(safe_decode(item[b'path']), offset, offset + size))
  829. self.error_found = True
  830. data = bytes(size)
  831. chunk_id = self.key.id_hash(data)
  832. cdata = self.key.encrypt(Chunk(data))
  833. csize = len(cdata)
  834. add_reference(chunk_id, size, csize, cdata)
  835. else:
  836. add_reference(chunk_id, size, csize)
  837. chunk_list.append((chunk_id, size, csize))
  838. offset += size
  839. item[b'chunks'] = chunk_list
  840. def robust_iterator(archive):
  841. """Iterates through all archive items
  842. Missing item chunks will be skipped and the msgpack stream will be restarted
  843. """
  844. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  845. _state = 0
  846. def missing_chunk_detector(chunk_id):
  847. nonlocal _state
  848. if _state % 2 != int(chunk_id not in self.chunks):
  849. _state += 1
  850. return _state
  851. def report(msg, chunk_id, chunk_no):
  852. cid = bin_to_hex(chunk_id)
  853. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  854. self.error_found = True
  855. logger.error(msg)
  856. i = 0
  857. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  858. items = list(items)
  859. if state % 2:
  860. for chunk_id in items:
  861. report('item metadata chunk missing', chunk_id, i)
  862. i += 1
  863. continue
  864. if state > 0:
  865. unpacker.resync()
  866. for chunk_id, cdata in zip(items, repository.get_many(items)):
  867. _, data = self.key.decrypt(chunk_id, cdata)
  868. unpacker.feed(data)
  869. try:
  870. for item in unpacker:
  871. if isinstance(item, dict):
  872. yield item
  873. else:
  874. report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
  875. except Exception:
  876. report('Exception while unpacking item metadata', chunk_id, i)
  877. raise
  878. i += 1
  879. if archive is None:
  880. # we need last N or all archives
  881. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  882. key=lambda name_info: name_info[1][b'time'])
  883. if prefix is not None:
  884. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  885. num_archives = len(archive_items)
  886. end = None if last is None else min(num_archives, last)
  887. else:
  888. # we only want one specific archive
  889. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  890. if not archive_items:
  891. logger.error("Archive '%s' not found.", archive)
  892. num_archives = 1
  893. end = 1
  894. with cache_if_remote(self.repository) as repository:
  895. for i, (name, info) in enumerate(archive_items[:end]):
  896. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  897. archive_id = info[b'id']
  898. if archive_id not in self.chunks:
  899. logger.error('Archive metadata block is missing!')
  900. self.error_found = True
  901. del self.manifest.archives[name]
  902. continue
  903. mark_as_possibly_superseded(archive_id)
  904. cdata = self.repository.get(archive_id)
  905. _, data = self.key.decrypt(archive_id, cdata)
  906. archive = StableDict(msgpack.unpackb(data))
  907. if archive[b'version'] != 1:
  908. raise Exception('Unknown archive metadata version')
  909. decode_dict(archive, ARCHIVE_TEXT_KEYS)
  910. archive[b'cmdline'] = [safe_decode(arg) for arg in archive[b'cmdline']]
  911. items_buffer = ChunkBuffer(self.key)
  912. items_buffer.write_chunk = add_callback
  913. for item in robust_iterator(archive):
  914. if b'chunks' in item:
  915. verify_file_chunks(item)
  916. items_buffer.add(item)
  917. items_buffer.flush(flush=True)
  918. for previous_item_id in archive[b'items']:
  919. mark_as_possibly_superseded(previous_item_id)
  920. archive[b'items'] = items_buffer.chunks
  921. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  922. new_archive_id = self.key.id_hash(data)
  923. cdata = self.key.encrypt(Chunk(data))
  924. add_reference(new_archive_id, len(data), len(cdata), cdata)
  925. info[b'id'] = new_archive_id
  926. def orphan_chunks_check(self):
  927. if self.check_all:
  928. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  929. orphaned = unused - self.possibly_superseded
  930. if orphaned:
  931. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  932. self.error_found = True
  933. if self.repair:
  934. for id_ in unused:
  935. self.repository.delete(id_)
  936. else:
  937. logger.info('Orphaned objects check skipped (needs all archives checked).')
  938. def finish(self, save_space=False):
  939. if self.repair:
  940. self.manifest.write()
  941. self.repository.commit(save_space=save_space)
  942. class ArchiveRecreater:
  943. AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
  944. """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
  945. class FakeTargetArchive:
  946. def __init__(self):
  947. self.stats = Statistics()
  948. class Interrupted(Exception):
  949. def __init__(self, metadata=None):
  950. self.metadata = metadata or {}
  951. @staticmethod
  952. def is_temporary_archive(archive_name):
  953. return archive_name.endswith('.recreate')
  954. def __init__(self, repository, manifest, key, cache, matcher,
  955. exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
  956. chunker_params=None, compression=None, compression_files=None,
  957. dry_run=False, stats=False, progress=False, file_status_printer=None):
  958. self.repository = repository
  959. self.key = key
  960. self.manifest = manifest
  961. self.cache = cache
  962. self.matcher = matcher
  963. self.exclude_caches = exclude_caches
  964. self.exclude_if_present = exclude_if_present or []
  965. self.keep_tag_files = keep_tag_files
  966. self.chunker_params = chunker_params or CHUNKER_PARAMS
  967. self.recompress = bool(compression)
  968. self.compression = compression or CompressionSpec('none')
  969. self.seen_chunks = set()
  970. self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
  971. compression_files or [])
  972. key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
  973. self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
  974. logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
  975. self.dry_run = dry_run
  976. self.stats = stats
  977. self.progress = progress
  978. self.print_file_status = file_status_printer or (lambda *args: None)
  979. self.interrupt = False
  980. self.errors = False
  981. def recreate(self, archive_name, comment=None):
  982. assert not self.is_temporary_archive(archive_name)
  983. archive = self.open_archive(archive_name)
  984. target, resume_from = self.create_target_or_resume(archive)
  985. if self.exclude_if_present or self.exclude_caches:
  986. self.matcher_add_tagged_dirs(archive)
  987. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  988. logger.info("Skipping archive %s, nothing to do", archive_name)
  989. return True
  990. try:
  991. self.process_items(archive, target, resume_from)
  992. except self.Interrupted as e:
  993. return self.save(archive, target, completed=False, metadata=e.metadata)
  994. return self.save(archive, target, comment)
  995. def process_items(self, archive, target, resume_from=None):
  996. matcher = self.matcher
  997. target_is_subset = not matcher.empty()
  998. hardlink_masters = {} if target_is_subset else None
  999. def item_is_hardlink_master(item):
  1000. return (target_is_subset and
  1001. stat.S_ISREG(item[b'mode']) and
  1002. item.get(b'hardlink_master', True) and
  1003. b'source' not in item and
  1004. not matcher.match(item[b'path']))
  1005. for item in archive.iter_items():
  1006. if item_is_hardlink_master(item):
  1007. # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
  1008. hardlink_masters[item[b'path']] = (item.get(b'chunks'), None)
  1009. continue
  1010. if resume_from:
  1011. # Fast forward to after the last processed file
  1012. if item[b'path'] == resume_from:
  1013. logger.info('Fast-forwarded to %s', remove_surrogates(item[b'path']))
  1014. resume_from = None
  1015. continue
  1016. if not matcher.match(item[b'path']):
  1017. self.print_file_status('x', item[b'path'])
  1018. continue
  1019. if target_is_subset and stat.S_ISREG(item[b'mode']) and item.get(b'source') in hardlink_masters:
  1020. # master of this hard link is outside the target subset
  1021. chunks, new_source = hardlink_masters[item[b'source']]
  1022. if new_source is None:
  1023. # First item to use this master, move the chunks
  1024. item[b'chunks'] = chunks
  1025. hardlink_masters[item[b'source']] = (None, item[b'path'])
  1026. del item[b'source']
  1027. else:
  1028. # Master was already moved, only update this item's source
  1029. item[b'source'] = new_source
  1030. if self.dry_run:
  1031. self.print_file_status('-', item[b'path'])
  1032. else:
  1033. try:
  1034. self.process_item(archive, target, item)
  1035. except self.Interrupted:
  1036. if self.progress:
  1037. target.stats.show_progress(final=True)
  1038. raise
  1039. if self.progress:
  1040. target.stats.show_progress(final=True)
  1041. def process_item(self, archive, target, item):
  1042. if b'chunks' in item:
  1043. item[b'chunks'] = self.process_chunks(archive, target, item)
  1044. target.stats.nfiles += 1
  1045. target.add_item(item)
  1046. self.print_file_status(file_status(item[b'mode']), item[b'path'])
  1047. if self.interrupt:
  1048. raise self.Interrupted
  1049. def process_chunks(self, archive, target, item):
  1050. """Return new chunk ID list for 'item'."""
  1051. # TODO: support --compression-from
  1052. if not self.recompress and not target.recreate_rechunkify:
  1053. for chunk_id, size, csize in item[b'chunks']:
  1054. self.cache.chunk_incref(chunk_id, target.stats)
  1055. return item[b'chunks']
  1056. new_chunks = self.process_partial_chunks(target)
  1057. chunk_iterator = self.create_chunk_iterator(archive, target, item)
  1058. consume(chunk_iterator, len(new_chunks))
  1059. for chunk in chunk_iterator:
  1060. chunk_id = self.key.id_hash(chunk.data)
  1061. if chunk_id in self.seen_chunks:
  1062. new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
  1063. else:
  1064. # TODO: detect / skip / --always-recompress
  1065. chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
  1066. new_chunks.append((chunk_id, size, csize))
  1067. self.seen_chunks.add(chunk_id)
  1068. if self.recompress:
  1069. # This tracks how many bytes are uncommitted but compactable, since we are recompressing
  1070. # existing chunks.
  1071. target.recreate_uncomitted_bytes += csize
  1072. if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
  1073. # Issue commits to limit additional space usage when recompressing chunks
  1074. target.recreate_uncomitted_bytes = 0
  1075. self.repository.commit()
  1076. if self.progress:
  1077. target.stats.show_progress(item=item, dt=0.2)
  1078. if self.interrupt:
  1079. raise self.Interrupted({
  1080. 'recreate_partial_chunks': new_chunks,
  1081. })
  1082. return new_chunks
  1083. def create_chunk_iterator(self, archive, target, item):
  1084. """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
  1085. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item[b'chunks']])
  1086. if target.recreate_rechunkify:
  1087. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  1088. # (does not load the entire file into memory)
  1089. file = ChunkIteratorFileWrapper(chunk_iterator)
  1090. def _chunk_iterator():
  1091. for data in target.chunker.chunkify(file):
  1092. yield Chunk(data)
  1093. chunk_iterator = _chunk_iterator()
  1094. return chunk_iterator
  1095. def process_partial_chunks(self, target):
  1096. """Return chunks from a previous run for archive 'target' (if any) or an empty list."""
  1097. if not target.recreate_partial_chunks:
  1098. return []
  1099. # No incref, create_target_or_resume already did that before to deleting the old target archive
  1100. # So just copy these over
  1101. partial_chunks = target.recreate_partial_chunks
  1102. target.recreate_partial_chunks = None
  1103. for chunk_id, size, csize in partial_chunks:
  1104. self.seen_chunks.add(chunk_id)
  1105. logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
  1106. return partial_chunks
  1107. def save(self, archive, target, comment=None, completed=True, metadata=None):
  1108. """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
  1109. if self.dry_run:
  1110. return completed
  1111. if completed:
  1112. timestamp = archive.ts.replace(tzinfo=None)
  1113. if comment is None:
  1114. comment = archive.metadata.get(b'comment', '')
  1115. target.save(timestamp=timestamp, comment=comment, additional_metadata={
  1116. 'cmdline': archive.metadata[b'cmdline'],
  1117. 'recreate_cmdline': sys.argv,
  1118. })
  1119. archive.delete(Statistics(), progress=self.progress)
  1120. target.rename(archive.name)
  1121. if self.stats:
  1122. target.end = datetime.utcnow()
  1123. log_multi(DASHES,
  1124. str(target),
  1125. DASHES,
  1126. str(target.stats),
  1127. str(self.cache),
  1128. DASHES)
  1129. else:
  1130. additional_metadata = metadata or {}
  1131. additional_metadata.update({
  1132. 'recreate_source_id': archive.id,
  1133. 'recreate_args': sys.argv[1:],
  1134. })
  1135. target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
  1136. logger.info('Run the same command again to resume.')
  1137. return completed
  1138. def matcher_add_tagged_dirs(self, archive):
  1139. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  1140. def exclude(dir, tag_item):
  1141. if self.keep_tag_files:
  1142. tag_files.append(PathPrefixPattern(tag_item[b'path']))
  1143. tagged_dirs.append(FnmatchPattern(dir + '/'))
  1144. else:
  1145. tagged_dirs.append(PathPrefixPattern(dir))
  1146. matcher = self.matcher
  1147. tag_files = []
  1148. tagged_dirs = []
  1149. # build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
  1150. cachedir_masters = {}
  1151. for item in archive.iter_items(
  1152. filter=lambda item: item[b'path'].endswith(CACHE_TAG_NAME) or matcher.match(item[b'path'])):
  1153. if item[b'path'].endswith(CACHE_TAG_NAME):
  1154. cachedir_masters[item[b'path']] = item
  1155. if stat.S_ISREG(item[b'mode']):
  1156. dir, tag_file = os.path.split(item[b'path'])
  1157. if tag_file in self.exclude_if_present:
  1158. exclude(dir, item)
  1159. if self.exclude_caches and tag_file == CACHE_TAG_NAME:
  1160. if b'chunks' in item:
  1161. file = open_item(archive, item)
  1162. else:
  1163. file = open_item(archive, cachedir_masters[item[b'source']])
  1164. if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
  1165. exclude(dir, item)
  1166. matcher.add(tag_files, True)
  1167. matcher.add(tagged_dirs, False)
  1168. def create_target_or_resume(self, archive):
  1169. """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
  1170. if self.dry_run:
  1171. return self.FakeTargetArchive(), None
  1172. target_name = archive.name + '.recreate'
  1173. resume = target_name in self.manifest.archives
  1174. target, resume_from = None, None
  1175. if resume:
  1176. target, resume_from = self.try_resume(archive, target_name)
  1177. if not target:
  1178. target = self.create_target_archive(target_name)
  1179. # If the archives use the same chunker params, then don't rechunkify
  1180. target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
  1181. return target, resume_from
  1182. def try_resume(self, archive, target_name):
  1183. """Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
  1184. logger.info('Found %s, will resume interrupted operation', target_name)
  1185. old_target = self.open_archive(target_name)
  1186. resume_id = old_target.metadata[b'recreate_source_id']
  1187. resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
  1188. if resume_id != archive.id:
  1189. logger.warning('Source archive changed, will discard %s and start over', target_name)
  1190. logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
  1191. logger.warning('Current fingerprint: %s', archive.fpr)
  1192. old_target.delete(Statistics(), progress=self.progress)
  1193. return None, None # can't resume
  1194. if resume_args != sys.argv[1:]:
  1195. logger.warning('Command line changed, this might lead to inconsistencies')
  1196. logger.warning('Saved: %s', repr(resume_args))
  1197. logger.warning('Current: %s', repr(sys.argv[1:]))
  1198. target = self.create_target_archive(target_name + '.temp')
  1199. logger.info('Replaying items from interrupted operation...')
  1200. item = None
  1201. for item in old_target.iter_items():
  1202. if b'chunks' in item:
  1203. for chunk in item[b'chunks']:
  1204. self.cache.chunk_incref(chunk.id, target.stats)
  1205. target.stats.nfiles += 1
  1206. target.add_item(item)
  1207. if item:
  1208. resume_from = item[b'path']
  1209. else:
  1210. resume_from = None
  1211. if self.progress:
  1212. old_target.stats.show_progress(final=True)
  1213. target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
  1214. for chunk_id, size, csize in target.recreate_partial_chunks:
  1215. if not self.cache.seen_chunk(chunk_id):
  1216. try:
  1217. # Repository has __contains__, RemoteRepository doesn't
  1218. self.repository.get(chunk_id)
  1219. except Repository.ObjectNotFound:
  1220. # delete/prune/check between invocations: these chunks are gone.
  1221. target.recreate_partial_chunks = None
  1222. break
  1223. # fast-lane insert into chunks cache
  1224. self.cache.chunks[chunk_id] = (1, size, csize)
  1225. target.stats.update(size, csize, True)
  1226. continue
  1227. # incref now, otherwise old_target.delete() might delete these chunks
  1228. self.cache.chunk_incref(chunk_id, target.stats)
  1229. old_target.delete(Statistics(), progress=self.progress)
  1230. logger.info('Done replaying items')
  1231. return target, resume_from
  1232. def create_target_archive(self, name):
  1233. target = Archive(self.repository, self.key, self.manifest, name, create=True,
  1234. progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
  1235. checkpoint_interval=0, compression=self.compression)
  1236. target.recreate_partial_chunks = None
  1237. target.recreate_uncomitted_bytes = 0
  1238. return target
  1239. def open_archive(self, name, **kwargs):
  1240. return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)