1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732 |
- import errno
- import json
- import os
- import socket
- import stat
- import sys
- import time
- from contextlib import contextmanager
- from datetime import datetime, timezone, timedelta
- from functools import partial
- from getpass import getuser
- from io import BytesIO
- from itertools import groupby
- from shutil import get_terminal_size
- import msgpack
- from .logger import create_logger
- logger = create_logger()
- from . import xattr
- from .cache import ChunkListEntry
- from .chunker import Chunker
- from .compress import Compressor
- from .constants import * # NOQA
- from .hashindex import ChunkIndex, ChunkIndexEntry
- from .helpers import Manifest
- from .helpers import Chunk, ChunkIteratorFileWrapper, open_item
- from .helpers import Error, IntegrityError
- from .helpers import uid2user, user2uid, gid2group, group2gid
- from .helpers import parse_timestamp, to_localtime
- from .helpers import format_time, format_timedelta, format_file_size, file_status, FileSize
- from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
- from .helpers import StableDict
- from .helpers import bin_to_hex
- from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
- from .helpers import PathPrefixPattern, FnmatchPattern
- from .helpers import CompressionDecider1, CompressionDecider2, CompressionSpec
- from .item import Item, ArchiveItem
- from .key import key_factory
- from .platform import acl_get, acl_set, set_flags, get_flags, swidth
- from .remote import cache_if_remote
- from .repository import Repository, LIST_SCAN_LIMIT
- has_lchmod = hasattr(os, 'lchmod')
- flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
- flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
- class Statistics:
- def __init__(self, output_json=False):
- self.output_json = output_json
- self.osize = self.csize = self.usize = self.nfiles = 0
- self.last_progress = 0 # timestamp when last progress was shown
- def update(self, size, csize, unique):
- self.osize += size
- self.csize += csize
- if unique:
- self.usize += csize
- summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"
- def __str__(self):
- return self.summary.format(stats=self, label='This archive:')
- def __repr__(self):
- return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
- cls=type(self).__name__, hash=id(self), self=self)
- def as_dict(self):
- return {
- 'original_size': FileSize(self.osize),
- 'compressed_size': FileSize(self.csize),
- 'deduplicated_size': FileSize(self.usize),
- 'nfiles': self.nfiles,
- }
- @property
- def osize_fmt(self):
- return format_file_size(self.osize)
- @property
- def usize_fmt(self):
- return format_file_size(self.usize)
- @property
- def csize_fmt(self):
- return format_file_size(self.csize)
- def show_progress(self, item=None, final=False, stream=None, dt=None):
- now = time.monotonic()
- if dt is None or now - self.last_progress > dt:
- self.last_progress = now
- if self.output_json:
- data = self.as_dict()
- data.update({
- 'type': 'archive_progress',
- 'path': remove_surrogates(item.path if item else ''),
- })
- msg = json.dumps(data)
- end = '\n'
- else:
- columns, lines = get_terminal_size()
- if not final:
- msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
- path = remove_surrogates(item.path) if item else ''
- space = columns - swidth(msg)
- if space < 12:
- msg = ''
- space = columns - swidth(msg)
- if space >= 8:
- msg += ellipsis_truncate(path, space)
- else:
- msg = ' ' * columns
- end = '\r'
- print(msg, end=end, file=stream or sys.stderr, flush=True)
- def is_special(mode):
- # file types that get special treatment in --read-special mode
- return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
- class BackupOSError(Exception):
- """
- Wrapper for OSError raised while accessing backup files.
- Borg does different kinds of IO, and IO failures have different consequences.
- This wrapper represents failures of input file or extraction IO.
- These are non-critical and are only reported (exit code = 1, warning).
- Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
- """
- def __init__(self, op, os_error):
- self.op = op
- self.os_error = os_error
- self.errno = os_error.errno
- self.strerror = os_error.strerror
- self.filename = os_error.filename
- def __str__(self):
- if self.op:
- return '%s: %s' % (self.op, self.os_error)
- else:
- return str(self.os_error)
- class BackupIO:
- op = ''
- def __call__(self, op=''):
- self.op = op
- return self
- def __enter__(self):
- pass
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type and issubclass(exc_type, OSError):
- raise BackupOSError(self.op, exc_val) from exc_val
- backup_io = BackupIO()
- def backup_io_iter(iterator):
- backup_io.op = 'read'
- while True:
- try:
- with backup_io:
- item = next(iterator)
- except StopIteration:
- return
- yield item
- class DownloadPipeline:
- def __init__(self, repository, key):
- self.repository = repository
- self.key = key
- def unpack_many(self, ids, filter=None, preload=False):
- """
- Return iterator of items.
- *ids* is a chunk ID list of an item stream. *filter* is a callable
- to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
- Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
- otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
- """
- unpacker = msgpack.Unpacker(use_list=False)
- for _, data in self.fetch_many(ids):
- unpacker.feed(data)
- items = [Item(internal_dict=item) for item in unpacker]
- for item in items:
- if 'chunks' in item:
- item.chunks = [ChunkListEntry(*e) for e in item.chunks]
- if filter:
- items = [item for item in items if filter(item)]
- if preload:
- for item in items:
- if 'chunks' in item:
- self.repository.preload([c.id for c in item.chunks])
- for item in items:
- yield item
- def fetch_many(self, ids, is_preloaded=False):
- for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
- yield self.key.decrypt(id_, data)
- class ChunkBuffer:
- BUFFER_SIZE = 8 * 1024 * 1024
- def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
- self.buffer = BytesIO()
- self.packer = msgpack.Packer(unicode_errors='surrogateescape')
- self.chunks = []
- self.key = key
- self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
- def add(self, item):
- self.buffer.write(self.packer.pack(item.as_dict()))
- if self.is_full():
- self.flush()
- def write_chunk(self, chunk):
- raise NotImplementedError
- def flush(self, flush=False):
- if self.buffer.tell() == 0:
- return
- self.buffer.seek(0)
- chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
- self.buffer.seek(0)
- self.buffer.truncate(0)
- # Leave the last partial chunk in the buffer unless flush is True
- end = None if flush or len(chunks) == 1 else -1
- for chunk in chunks[:end]:
- self.chunks.append(self.write_chunk(chunk))
- if end == -1:
- self.buffer.write(chunks[-1].data)
- def is_full(self):
- return self.buffer.tell() > self.BUFFER_SIZE
- class CacheChunkBuffer(ChunkBuffer):
- def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
- super().__init__(key, chunker_params)
- self.cache = cache
- self.stats = stats
- def write_chunk(self, chunk):
- id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
- return id_
- class Archive:
- class DoesNotExist(Error):
- """Archive {} does not exist"""
- class AlreadyExists(Error):
- """Archive {} already exists"""
- class IncompatibleFilesystemEncodingError(Error):
- """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
- def __init__(self, repository, key, manifest, name, cache=None, create=False,
- checkpoint_interval=300, numeric_owner=False, noatime=False, noctime=False, progress=False,
- chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None, compression=None, compression_files=None,
- consider_part_files=False, log_json=False):
- self.cwd = os.getcwd()
- self.key = key
- self.repository = repository
- self.cache = cache
- self.manifest = manifest
- self.hard_links = {}
- self.stats = Statistics(output_json=log_json)
- self.show_progress = progress
- self.name = name
- self.checkpoint_interval = checkpoint_interval
- self.numeric_owner = numeric_owner
- self.noatime = noatime
- self.noctime = noctime
- assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
- if start is None:
- start = datetime.utcnow()
- start_monotonic = time.monotonic()
- self.chunker_params = chunker_params
- self.start = start
- self.start_monotonic = start_monotonic
- if end is None:
- end = datetime.utcnow()
- self.end = end
- self.consider_part_files = consider_part_files
- self.pipeline = DownloadPipeline(self.repository, self.key)
- self.create = create
- if self.create:
- self.file_compression_logger = create_logger('borg.debug.file-compression')
- self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
- self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
- self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
- compression_files or [])
- key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
- if name in manifest.archives:
- raise self.AlreadyExists(name)
- self.last_checkpoint = time.monotonic()
- i = 0
- while True:
- self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
- if self.checkpoint_name not in manifest.archives:
- break
- i += 1
- else:
- info = self.manifest.archives.get(name)
- if info is None:
- raise self.DoesNotExist(name)
- self.load(info.id)
- self.zeros = b'\0' * (1 << chunker_params[1])
- def _load_meta(self, id):
- _, data = self.key.decrypt(id, self.repository.get(id))
- metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
- if metadata.version != 1:
- raise Exception('Unknown archive metadata version')
- return metadata
- def load(self, id):
- self.id = id
- self.metadata = self._load_meta(self.id)
- self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
- self.name = self.metadata.name
- @property
- def ts(self):
- """Timestamp of archive creation (start) in UTC"""
- ts = self.metadata.time
- return parse_timestamp(ts)
- @property
- def ts_end(self):
- """Timestamp of archive creation (end) in UTC"""
- # fall back to time if there is no time_end present in metadata
- ts = self.metadata.get('time_end') or self.metadata.time
- return parse_timestamp(ts)
- @property
- def fpr(self):
- return bin_to_hex(self.id)
- @property
- def duration(self):
- return format_timedelta(self.end - self.start)
- @property
- def duration_from_meta(self):
- return format_timedelta(self.ts_end - self.ts)
- def info(self):
- if self.create:
- stats = self.stats
- start = self.start.replace(tzinfo=timezone.utc)
- end = self.end.replace(tzinfo=timezone.utc)
- else:
- stats = self.calc_stats(self.cache)
- start = self.ts
- end = self.ts_end
- info = {
- 'name': self.name,
- 'id': self.fpr,
- 'start': format_time(to_localtime(start)),
- 'end': format_time(to_localtime(end)),
- 'duration': (end - start).total_seconds(),
- 'stats': stats.as_dict(),
- 'limits': {
- 'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
- },
- }
- if self.create:
- info['command_line'] = sys.argv
- else:
- info.update({
- 'command_line': self.metadata.cmdline,
- 'hostname': self.metadata.hostname,
- 'username': self.metadata.username,
- 'comment': self.metadata.get('comment', ''),
- })
- return info
- def __str__(self):
- return '''\
- Archive name: {0.name}
- Archive fingerprint: {0.fpr}
- Time (start): {start}
- Time (end): {end}
- Duration: {0.duration}
- Number of files: {0.stats.nfiles}
- Utilization of max. archive size: {csize_max:.0%}
- '''.format(
- self,
- start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
- end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))),
- csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE)
- def __repr__(self):
- return 'Archive(%r)' % self.name
- def item_filter(self, item, filter=None):
- if not self.consider_part_files and 'part' in item:
- # this is a part(ial) file, we usually don't want to consider it.
- return False
- return filter(item) if filter else True
- def iter_items(self, filter=None, preload=False):
- for item in self.pipeline.unpack_many(self.metadata.items, preload=preload,
- filter=lambda item: self.item_filter(item, filter)):
- yield item
- def add_item(self, item, show_progress=True):
- if show_progress and self.show_progress:
- self.stats.show_progress(item=item, dt=0.2)
- self.items_buffer.add(item)
- def write_checkpoint(self):
- self.save(self.checkpoint_name)
- del self.manifest.archives[self.checkpoint_name]
- self.cache.chunk_decref(self.id, self.stats)
- def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
- name = name or self.name
- if name in self.manifest.archives:
- raise self.AlreadyExists(name)
- self.items_buffer.flush(flush=True)
- duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
- if timestamp is None:
- self.end = datetime.utcnow()
- self.start = self.end - duration
- start = self.start
- end = self.end
- else:
- self.end = timestamp
- self.start = timestamp - duration
- end = timestamp
- start = self.start
- metadata = {
- 'version': 1,
- 'name': name,
- 'comment': comment or '',
- 'items': self.items_buffer.chunks,
- 'cmdline': sys.argv,
- 'hostname': socket.gethostname(),
- 'username': getuser(),
- 'time': start.isoformat(),
- 'time_end': end.isoformat(),
- 'chunker_params': self.chunker_params,
- }
- metadata.update(additional_metadata or {})
- metadata = ArchiveItem(metadata)
- data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
- self.id = self.key.id_hash(data)
- self.cache.add_chunk(self.id, Chunk(data), self.stats)
- self.manifest.archives[name] = (self.id, metadata.time)
- self.manifest.write()
- self.repository.commit()
- self.cache.commit()
- def calc_stats(self, cache):
- def add(id):
- count, size, csize = cache.chunks[id]
- stats.update(size, csize, count == 1)
- cache.chunks[id] = count - 1, size, csize
- def add_file_chunks(chunks):
- for id, _, _ in chunks:
- add(id)
- # This function is a bit evil since it abuses the cache to calculate
- # the stats. The cache transaction must be rolled back afterwards
- unpacker = msgpack.Unpacker(use_list=False)
- cache.begin_txn()
- stats = Statistics()
- add(self.id)
- for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
- add(id)
- _, data = self.key.decrypt(id, chunk)
- unpacker.feed(data)
- for item in unpacker:
- chunks = item.get(b'chunks')
- if chunks is not None:
- stats.nfiles += 1
- add_file_chunks(chunks)
- cache.rollback()
- return stats
- def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
- hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
- """
- Extract archive item.
- :param item: the item to extract
- :param restore_attrs: restore file attributes
- :param dry_run: do not write any data
- :param stdout: write extracted data to stdout
- :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
- :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
- :param stripped_components: stripped leading path components to correct hard link extraction
- :param original_path: 'path' key as stored in archive
- :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
- """
- hardlink_masters = hardlink_masters or {}
- has_damaged_chunks = 'chunks_healthy' in item
- if dry_run or stdout:
- if 'chunks' in item:
- item_chunks_size = 0
- for _, data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
- if pi:
- pi.show(increase=len(data), info=[remove_surrogates(item.path)])
- if stdout:
- sys.stdout.buffer.write(data)
- item_chunks_size += len(data)
- if stdout:
- sys.stdout.buffer.flush()
- if 'size' in item:
- item_size = item.size
- if item_size != item_chunks_size:
- logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
- item.path, item_size, item_chunks_size))
- if has_damaged_chunks:
- logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
- remove_surrogates(item.path))
- return
- original_path = original_path or item.path
- dest = self.cwd
- if item.path.startswith(('/', '..')):
- raise Exception('Path should be relative and local')
- path = os.path.join(dest, item.path)
- # Attempt to remove existing files, ignore errors on failure
- try:
- st = os.lstat(path)
- if stat.S_ISDIR(st.st_mode):
- os.rmdir(path)
- else:
- os.unlink(path)
- except UnicodeEncodeError:
- raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
- except OSError:
- pass
- mode = item.mode
- if stat.S_ISREG(mode):
- with backup_io('makedirs'):
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- # Hard link?
- if 'source' in item:
- source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
- with backup_io('link'):
- if os.path.exists(path):
- os.unlink(path)
- if item.source not in hardlink_masters:
- os.link(source, path)
- return
- item.chunks, link_target = hardlink_masters[item.source]
- if link_target:
- # Hard link was extracted previously, just link
- with backup_io:
- os.link(link_target, path)
- return
- # Extract chunks, since the item which had the chunks was not extracted
- with backup_io('open'):
- fd = open(path, 'wb')
- with fd:
- ids = [c.id for c in item.chunks]
- for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
- if pi:
- pi.show(increase=len(data), info=[remove_surrogates(item.path)])
- with backup_io('write'):
- if sparse and self.zeros.startswith(data):
- # all-zero chunk: create a hole in a sparse file
- fd.seek(len(data), 1)
- else:
- fd.write(data)
- with backup_io('truncate'):
- pos = item_chunks_size = fd.tell()
- fd.truncate(pos)
- fd.flush()
- self.restore_attrs(path, item, fd=fd.fileno())
- if 'size' in item:
- item_size = item.size
- if item_size != item_chunks_size:
- logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
- item.path, item_size, item_chunks_size))
- if has_damaged_chunks:
- logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
- remove_surrogates(item.path))
- if hardlink_masters:
- # Update master entry with extracted file path, so that following hardlinks don't extract twice.
- hardlink_masters[item.get('source') or original_path] = (None, path)
- return
- with backup_io:
- # No repository access beyond this point.
- if stat.S_ISDIR(mode):
- if not os.path.exists(path):
- os.makedirs(path)
- if restore_attrs:
- self.restore_attrs(path, item)
- elif stat.S_ISLNK(mode):
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- source = item.source
- if os.path.exists(path):
- os.unlink(path)
- try:
- os.symlink(source, path)
- except UnicodeEncodeError:
- raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
- self.restore_attrs(path, item, symlink=True)
- elif stat.S_ISFIFO(mode):
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- os.mkfifo(path)
- self.restore_attrs(path, item)
- elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
- os.mknod(path, item.mode, item.rdev)
- self.restore_attrs(path, item)
- else:
- raise Exception('Unknown archive item type %r' % item.mode)
- def restore_attrs(self, path, item, symlink=False, fd=None):
- """
- Restore filesystem attributes on *path* (*fd*) from *item*.
- Does not access the repository.
- """
- backup_io.op = 'attrs'
- uid = gid = None
- if not self.numeric_owner:
- uid = user2uid(item.user)
- gid = group2gid(item.group)
- uid = item.uid if uid is None else uid
- gid = item.gid if gid is None else gid
- # This code is a bit of a mess due to os specific differences
- try:
- if fd:
- os.fchown(fd, uid, gid)
- else:
- os.lchown(path, uid, gid)
- except OSError:
- pass
- if fd:
- os.fchmod(fd, item.mode)
- elif not symlink:
- os.chmod(path, item.mode)
- elif has_lchmod: # Not available on Linux
- os.lchmod(path, item.mode)
- mtime = item.mtime
- if 'atime' in item:
- atime = item.atime
- else:
- # old archives only had mtime in item metadata
- atime = mtime
- try:
- if fd:
- os.utime(fd, None, ns=(atime, mtime))
- else:
- os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
- except OSError:
- # some systems don't support calling utime on a symlink
- pass
- acl_set(path, item, self.numeric_owner)
- if 'bsdflags' in item:
- try:
- set_flags(path, item.bsdflags, fd=fd)
- except OSError:
- pass
- # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
- # the Linux capabilities in the "security.capability" attribute.
- xattrs = item.get('xattrs', {})
- for k, v in xattrs.items():
- try:
- xattr.setxattr(fd or path, k, v, follow_symlinks=False)
- except OSError as e:
- if e.errno not in (errno.ENOTSUP, errno.EACCES):
- # only raise if the errno is not on our ignore list:
- # ENOTSUP == xattrs not supported here
- # EACCES == permission denied to set this specific xattr
- # (this may happen related to security.* keys)
- raise
- def set_meta(self, key, value):
- metadata = self._load_meta(self.id)
- setattr(metadata, key, value)
- data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
- new_id = self.key.id_hash(data)
- self.cache.add_chunk(new_id, Chunk(data), self.stats)
- self.manifest.archives[self.name] = (new_id, metadata.time)
- self.cache.chunk_decref(self.id, self.stats)
- self.id = new_id
- def rename(self, name):
- if name in self.manifest.archives:
- raise self.AlreadyExists(name)
- oldname = self.name
- self.name = name
- self.set_meta('name', name)
- del self.manifest.archives[oldname]
- def delete(self, stats, progress=False, forced=False):
- class ChunksIndexError(Error):
- """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
- def chunk_decref(id, stats):
- nonlocal error
- try:
- self.cache.chunk_decref(id, stats)
- except KeyError:
- cid = bin_to_hex(id)
- raise ChunksIndexError(cid)
- except Repository.ObjectNotFound as e:
- # object not in repo - strange, but we wanted to delete it anyway.
- if forced == 0:
- raise
- error = True
- error = False
- try:
- unpacker = msgpack.Unpacker(use_list=False)
- items_ids = self.metadata.items
- pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete')
- for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
- if progress:
- pi.show(i)
- _, data = self.key.decrypt(items_id, data)
- unpacker.feed(data)
- chunk_decref(items_id, stats)
- try:
- for item in unpacker:
- item = Item(internal_dict=item)
- if 'chunks' in item:
- for chunk_id, size, csize in item.chunks:
- chunk_decref(chunk_id, stats)
- except (TypeError, ValueError):
- # if items metadata spans multiple chunks and one chunk got dropped somehow,
- # it could be that unpacker yields bad types
- if forced == 0:
- raise
- error = True
- if progress:
- pi.finish()
- except (msgpack.UnpackException, Repository.ObjectNotFound):
- # items metadata corrupted
- if forced == 0:
- raise
- error = True
- # in forced delete mode, we try hard to delete at least the manifest entry,
- # if possible also the archive superblock, even if processing the items raises
- # some harmless exception.
- chunk_decref(self.id, stats)
- del self.manifest.archives[self.name]
- if error:
- logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
- logger.warning('borg check --repair is required to free all space.')
- def stat_simple_attrs(self, st):
- attrs = dict(
- mode=st.st_mode,
- uid=st.st_uid,
- gid=st.st_gid,
- mtime=st.st_mtime_ns,
- )
- # borg can work with archives only having mtime (older attic archives do not have
- # atime/ctime). it can be useful to omit atime/ctime, if they change without the
- # file content changing - e.g. to get better metadata deduplication.
- if not self.noatime:
- attrs['atime'] = st.st_atime_ns
- if not self.noctime:
- attrs['ctime'] = st.st_ctime_ns
- if self.numeric_owner:
- attrs['user'] = attrs['group'] = None
- else:
- attrs['user'] = uid2user(st.st_uid)
- attrs['group'] = gid2group(st.st_gid)
- return attrs
- def stat_ext_attrs(self, st, path):
- attrs = {}
- with backup_io('extended stat'):
- xattrs = xattr.get_all(path, follow_symlinks=False)
- bsdflags = get_flags(path, st)
- acl_get(path, attrs, st, self.numeric_owner)
- if xattrs:
- attrs['xattrs'] = StableDict(xattrs)
- if bsdflags:
- attrs['bsdflags'] = bsdflags
- return attrs
- def stat_attrs(self, st, path):
- attrs = self.stat_simple_attrs(st)
- attrs.update(self.stat_ext_attrs(st, path))
- return attrs
- def process_dir(self, path, st):
- item = Item(path=make_path_safe(path))
- item.update(self.stat_attrs(st, path))
- self.add_item(item)
- return 'd' # directory
- def process_fifo(self, path, st):
- item = Item(path=make_path_safe(path))
- item.update(self.stat_attrs(st, path))
- self.add_item(item)
- return 'f' # fifo
- def process_dev(self, path, st):
- item = Item(path=make_path_safe(path), rdev=st.st_rdev)
- item.update(self.stat_attrs(st, path))
- self.add_item(item)
- if stat.S_ISCHR(st.st_mode):
- return 'c' # char device
- elif stat.S_ISBLK(st.st_mode):
- return 'b' # block device
- def process_symlink(self, path, st):
- with backup_io('readlink'):
- source = os.readlink(path)
- item = Item(path=make_path_safe(path), source=source)
- item.update(self.stat_attrs(st, path))
- self.add_item(item)
- return 's' # symlink
- def write_part_file(self, item, from_chunk, number):
- item = Item(internal_dict=item.as_dict())
- length = len(item.chunks)
- # the item should only have the *additional* chunks we processed after the last partial item:
- item.chunks = item.chunks[from_chunk:]
- item.get_size(memorize=True)
- item.path += '.borg_part_%d' % number
- item.part = number
- number += 1
- self.add_item(item, show_progress=False)
- self.write_checkpoint()
- return length, number
- def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
- if not chunk_processor:
- def chunk_processor(data):
- return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)
- item.chunks = []
- from_chunk = 0
- part_number = 1
- for data in chunk_iter:
- item.chunks.append(chunk_processor(data))
- if self.show_progress:
- self.stats.show_progress(item=item, dt=0.2)
- if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
- from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
- self.last_checkpoint = time.monotonic()
- else:
- if part_number > 1:
- if item.chunks[from_chunk:]:
- # if we already have created a part item inside this file, we want to put the final
- # chunks (if any) into a part item also (so all parts can be concatenated to get
- # the complete file):
- from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
- self.last_checkpoint = time.monotonic()
- # if we created part files, we have referenced all chunks from the part files,
- # but we also will reference the same chunks also from the final, complete file:
- for chunk in item.chunks:
- cache.chunk_incref(chunk.id, stats)
- def process_stdin(self, path, cache):
- uid, gid = 0, 0
- t = int(time.time()) * 1000000000
- item = Item(
- path=path,
- mode=0o100660, # regular file, ug=rw
- uid=uid, user=uid2user(uid),
- gid=gid, group=gid2group(gid),
- mtime=t, atime=t, ctime=t,
- )
- fd = sys.stdin.buffer # binary
- self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
- item.get_size(memorize=True)
- self.stats.nfiles += 1
- self.add_item(item)
- return 'i' # stdin
- def process_file(self, path, st, cache, ignore_inode=False):
- status = None
- safe_path = make_path_safe(path)
- # Is it a hard link?
- if st.st_nlink > 1:
- source = self.hard_links.get((st.st_ino, st.st_dev))
- if source is not None:
- item = Item(path=safe_path, source=source)
- item.update(self.stat_attrs(st, path))
- self.add_item(item)
- status = 'h' # regular file, hardlink (to already seen inodes)
- return status
- is_special_file = is_special(st.st_mode)
- if not is_special_file:
- path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
- ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
- else:
- # in --read-special mode, we may be called for special files.
- # there should be no information in the cache about special files processed in
- # read-special mode, but we better play safe as this was wrong in the past:
- path_hash = ids = None
- first_run = not cache.files and cache.do_files
- if first_run:
- logger.debug('Processing files ...')
- chunks = None
- if ids is not None:
- # Make sure all ids are available
- for id_ in ids:
- if not cache.seen_chunk(id_):
- break
- else:
- chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
- status = 'U' # regular file, unchanged
- else:
- status = 'A' # regular file, added
- item = Item(
- path=safe_path,
- hardlink_master=st.st_nlink > 1, # item is a hard link and has the chunks
- )
- item.update(self.stat_simple_attrs(st))
- # Only chunkify the file if needed
- if chunks is not None:
- item.chunks = chunks
- else:
- compress = self.compression_decider1.decide(path)
- self.file_compression_logger.debug('%s -> compression %s', path, compress['name'])
- with backup_io('open'):
- fh = Archive._open_rb(path)
- with os.fdopen(fh, 'rb') as fd:
- self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)), compress=compress)
- if not is_special_file:
- # we must not memorize special files, because the contents of e.g. a
- # block or char device will change without its mtime/size/inode changing.
- cache.memorize_file(path_hash, st, [c.id for c in item.chunks])
- status = status or 'M' # regular file, modified (if not 'A' already)
- item.update(self.stat_attrs(st, path))
- item.get_size(memorize=True)
- if is_special_file:
- # we processed a special file like a regular file. reflect that in mode,
- # so it can be extracted / accessed in FUSE mount like a regular file:
- item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
- self.stats.nfiles += 1
- self.add_item(item)
- if st.st_nlink > 1 and source is None:
- # Add the hard link reference *after* the file has been added to the archive.
- self.hard_links[st.st_ino, st.st_dev] = safe_path
- return status
- @staticmethod
- def list_archives(repository, key, manifest, cache=None):
- # expensive! see also Manifest.list_archive_infos.
- for name in manifest.archives:
- yield Archive(repository, key, manifest, name, cache=cache)
- @staticmethod
- def _open_rb(path):
- try:
- # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
- return os.open(path, flags_noatime)
- except PermissionError:
- if flags_noatime == flags_normal:
- # we do not have O_NOATIME, no need to try again:
- raise
- # Was this EPERM due to the O_NOATIME flag? Try again without it:
- return os.open(path, flags_normal)
- def valid_msgpacked_dict(d, keys_serialized):
- """check if the data <d> looks like a msgpacked dict"""
- d_len = len(d)
- if d_len == 0:
- return False
- if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
- offs = 1
- elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
- offs = 3
- else:
- # object is not a map (dict)
- # note: we must not have dicts with > 2^16-1 elements
- return False
- if d_len <= offs:
- return False
- # is the first dict key a bytestring?
- if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
- pass
- elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
- pass
- else:
- # key is not a bytestring
- return False
- # is the bytestring any of the expected key names?
- key_serialized = d[offs:]
- return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
- class RobustUnpacker:
- """A restartable/robust version of the streaming msgpack unpacker
- """
- class UnpackerCrashed(Exception):
- """raise if unpacker crashed"""
- def __init__(self, validator, item_keys):
- super().__init__()
- self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
- self.validator = validator
- self._buffered_data = []
- self._resync = False
- self._unpacker = msgpack.Unpacker(object_hook=StableDict)
- def resync(self):
- self._buffered_data = []
- self._resync = True
- def feed(self, data):
- if self._resync:
- self._buffered_data.append(data)
- else:
- self._unpacker.feed(data)
- def __iter__(self):
- return self
- def __next__(self):
- def unpack_next():
- try:
- return next(self._unpacker)
- except (TypeError, ValueError) as err:
- # transform exceptions that might be raised when feeding
- # msgpack with invalid data to a more specific exception
- raise self.UnpackerCrashed(str(err))
- if self._resync:
- data = b''.join(self._buffered_data)
- while self._resync:
- if not data:
- raise StopIteration
- # Abort early if the data does not look like a serialized item dict
- if not valid_msgpacked_dict(data, self.item_keys):
- data = data[1:]
- continue
- self._unpacker = msgpack.Unpacker(object_hook=StableDict)
- self._unpacker.feed(data)
- try:
- item = unpack_next()
- except (self.UnpackerCrashed, StopIteration):
- # as long as we are resyncing, we also ignore StopIteration
- pass
- else:
- if self.validator(item):
- self._resync = False
- return item
- data = data[1:]
- else:
- return unpack_next()
- class ArchiveChecker:
- def __init__(self):
- self.error_found = False
- self.possibly_superseded = set()
- def check(self, repository, repair=False, archive=None, first=0, last=0, sort_by='', prefix='',
- verify_data=False, save_space=False):
- """Perform a set of checks on 'repository'
- :param repair: enable repair mode, write updated or corrected data into repository
- :param archive: only check this archive
- :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
- :param prefix: only check archives with this prefix
- :param verify_data: integrity verification of data referenced by archives
- :param save_space: Repository.commit(save_space)
- """
- logger.info('Starting archive consistency check...')
- self.check_all = archive is None and not any((first, last, prefix))
- self.repair = repair
- self.repository = repository
- self.init_chunks()
- if not self.chunks:
- logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
- return False
- self.key = self.identify_key(repository)
- if verify_data:
- self.verify_data()
- if Manifest.MANIFEST_ID not in self.chunks:
- logger.error("Repository manifest not found!")
- self.error_found = True
- self.manifest = self.rebuild_manifest()
- else:
- try:
- self.manifest, _ = Manifest.load(repository, key=self.key)
- except IntegrityError as exc:
- logger.error('Repository manifest is corrupted: %s', exc)
- self.error_found = True
- del self.chunks[Manifest.MANIFEST_ID]
- self.manifest = self.rebuild_manifest()
- self.rebuild_refcounts(archive=archive, first=first, last=last, sort_by=sort_by, prefix=prefix)
- self.orphan_chunks_check()
- self.finish(save_space=save_space)
- if self.error_found:
- logger.error('Archive consistency check complete, problems found.')
- else:
- logger.info('Archive consistency check complete, no problems found.')
- return self.repair or not self.error_found
- def init_chunks(self):
- """Fetch a list of all object keys from repository
- """
- # Explicitly set the initial hash table capacity to avoid performance issues
- # due to hash table "resonance".
- # Since reconstruction of archive items can add some new chunks, add 10 % headroom
- capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
- self.chunks = ChunkIndex(capacity)
- marker = None
- while True:
- result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
- if not result:
- break
- marker = result[-1]
- init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
- for id_ in result:
- self.chunks[id_] = init_entry
- def identify_key(self, repository):
- try:
- some_chunkid, _ = next(self.chunks.iteritems())
- except StopIteration:
- # repo is completely empty, no chunks
- return None
- cdata = repository.get(some_chunkid)
- return key_factory(repository, cdata)
- def verify_data(self):
- logger.info('Starting cryptographic data integrity verification...')
- chunks_count_index = len(self.chunks)
- chunks_count_segments = 0
- errors = 0
- defect_chunks = []
- pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01,
- msgid='check.verify_data')
- marker = None
- while True:
- chunk_ids = self.repository.scan(limit=100, marker=marker)
- if not chunk_ids:
- break
- chunks_count_segments += len(chunk_ids)
- marker = chunk_ids[-1]
- chunk_data_iter = self.repository.get_many(chunk_ids)
- chunk_ids_revd = list(reversed(chunk_ids))
- while chunk_ids_revd:
- pi.show()
- chunk_id = chunk_ids_revd.pop(-1) # better efficiency
- try:
- encrypted_data = next(chunk_data_iter)
- except (Repository.ObjectNotFound, IntegrityError) as err:
- self.error_found = True
- errors += 1
- logger.error('chunk %s: %s', bin_to_hex(chunk_id), err)
- if isinstance(err, IntegrityError):
- defect_chunks.append(chunk_id)
- # as the exception killed our generator, make a new one for remaining chunks:
- if chunk_ids_revd:
- chunk_ids = list(reversed(chunk_ids_revd))
- chunk_data_iter = self.repository.get_many(chunk_ids)
- else:
- try:
- _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
- _, data = self.key.decrypt(_chunk_id, encrypted_data)
- except IntegrityError as integrity_error:
- self.error_found = True
- errors += 1
- logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
- defect_chunks.append(chunk_id)
- pi.finish()
- if chunks_count_index != chunks_count_segments:
- logger.error('Repo/Chunks index object count vs. segment files object count mismatch.')
- logger.error('Repo/Chunks index: %d objects != segment files: %d objects',
- chunks_count_index, chunks_count_segments)
- if defect_chunks:
- if self.repair:
- # if we kill the defect chunk here, subsequent actions within this "borg check"
- # run will find missing chunks and replace them with all-zero replacement
- # chunks and flag the files as "repaired".
- # if another backup is done later and the missing chunks get backupped again,
- # a "borg check" afterwards can heal all files where this chunk was missing.
- logger.warning('Found defect chunks. They will be deleted now, so affected files can '
- 'get repaired now and maybe healed later.')
- for defect_chunk in defect_chunks:
- # remote repo (ssh): retry might help for strange network / NIC / RAM errors
- # as the chunk will be retransmitted from remote server.
- # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
- # a defect chunk is likely not in the fs cache any more and really gets re-read
- # from the underlying media.
- encrypted_data = self.repository.get(defect_chunk)
- try:
- _chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk
- self.key.decrypt(_chunk_id, encrypted_data)
- except IntegrityError:
- # failed twice -> get rid of this chunk
- del self.chunks[defect_chunk]
- self.repository.delete(defect_chunk)
- logger.debug('chunk %s deleted.', bin_to_hex(defect_chunk))
- else:
- logger.warning('chunk %s not deleted, did not consistently fail.')
- else:
- logger.warning('Found defect chunks. With --repair, they would get deleted, so affected '
- 'files could get repaired then and maybe healed later.')
- for defect_chunk in defect_chunks:
- logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk))
- log = logger.error if errors else logger.info
- log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.',
- chunks_count_segments, errors)
- def rebuild_manifest(self):
- """Rebuild the manifest object if it is missing
- Iterates through all objects in the repository looking for archive metadata blocks.
- """
- required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)
- def valid_archive(obj):
- if not isinstance(obj, dict):
- return False
- keys = set(obj)
- return required_archive_keys.issubset(keys)
- logger.info('Rebuilding missing manifest, this might take some time...')
- # as we have lost the manifest, we do not know any more what valid item keys we had.
- # collecting any key we encounter in a damaged repo seems unwise, thus we just use
- # the hardcoded list from the source code. thus, it is not recommended to rebuild a
- # lost manifest on a older borg version than the most recent one that was ever used
- # within this repository (assuming that newer borg versions support more item keys).
- manifest = Manifest(self.key, self.repository)
- archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS]
- for chunk_id, _ in self.chunks.iteritems():
- cdata = self.repository.get(chunk_id)
- try:
- _, data = self.key.decrypt(chunk_id, cdata)
- except IntegrityError as exc:
- logger.error('Skipping corrupted chunk: %s', exc)
- self.error_found = True
- continue
- if not valid_msgpacked_dict(data, archive_keys_serialized):
- continue
- if b'cmdline' not in data or b'\xa7version\x01' not in data:
- continue
- try:
- archive = msgpack.unpackb(data)
- # Ignore exceptions that might be raised when feeding
- # msgpack with invalid data
- except (TypeError, ValueError, StopIteration):
- continue
- if valid_archive(archive):
- archive = ArchiveItem(internal_dict=archive)
- name = archive.name
- logger.info('Found archive %s', name)
- if name in manifest.archives:
- i = 1
- while True:
- new_name = '%s.%d' % (name, i)
- if new_name not in manifest.archives:
- break
- i += 1
- logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
- name = new_name
- manifest.archives[name] = (chunk_id, archive.time)
- logger.info('Manifest rebuild complete.')
- return manifest
- def rebuild_refcounts(self, archive=None, first=0, last=0, sort_by='', prefix=''):
- """Rebuild object reference counts by walking the metadata
- Missing and/or incorrect data is repaired when detected
- """
- # Exclude the manifest from chunks
- del self.chunks[Manifest.MANIFEST_ID]
- def mark_as_possibly_superseded(id_):
- if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
- self.possibly_superseded.add(id_)
- def add_callback(chunk):
- id_ = self.key.id_hash(chunk.data)
- cdata = self.key.encrypt(chunk)
- add_reference(id_, len(chunk.data), len(cdata), cdata)
- return id_
- def add_reference(id_, size, csize, cdata=None):
- try:
- self.chunks.incref(id_)
- except KeyError:
- assert cdata is not None
- self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
- if self.repair:
- self.repository.put(id_, cdata)
- def verify_file_chunks(item):
- """Verifies that all file chunks are present.
- Missing file chunks will be replaced with new chunks of the same length containing all zeros.
- If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
- """
- def replacement_chunk(size):
- data = bytes(size)
- chunk_id = self.key.id_hash(data)
- cdata = self.key.encrypt(Chunk(data))
- csize = len(cdata)
- return chunk_id, size, csize, cdata
- offset = 0
- chunk_list = []
- chunks_replaced = False
- has_chunks_healthy = 'chunks_healthy' in item
- chunks_current = item.chunks
- chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
- assert len(chunks_current) == len(chunks_healthy)
- for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
- chunk_id, size, csize = chunk_healthy
- if chunk_id not in self.chunks:
- # a chunk of the healthy list is missing
- if chunk_current == chunk_healthy:
- logger.error('{}: New missing file chunk detected (Byte {}-{}). '
- 'Replacing with all-zero chunk.'.format(item.path, offset, offset + size))
- self.error_found = chunks_replaced = True
- chunk_id, size, csize, cdata = replacement_chunk(size)
- add_reference(chunk_id, size, csize, cdata)
- else:
- logger.info('{}: Previously missing file chunk is still missing (Byte {}-{}). It has a '
- 'all-zero replacement chunk already.'.format(item.path, offset, offset + size))
- chunk_id, size, csize = chunk_current
- if chunk_id in self.chunks:
- add_reference(chunk_id, size, csize)
- else:
- logger.warning('{}: Missing all-zero replacement chunk detected (Byte {}-{}). '
- 'Generating new replacement chunk.'.format(item.path, offset, offset + size))
- self.error_found = chunks_replaced = True
- chunk_id, size, csize, cdata = replacement_chunk(size)
- add_reference(chunk_id, size, csize, cdata)
- else:
- if chunk_current == chunk_healthy:
- # normal case, all fine.
- add_reference(chunk_id, size, csize)
- else:
- logger.info('{}: Healed previously missing file chunk! '
- '(Byte {}-{}).'.format(item.path, offset, offset + size))
- add_reference(chunk_id, size, csize)
- mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
- chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
- offset += size
- if chunks_replaced and not has_chunks_healthy:
- # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
- item.chunks_healthy = item.chunks
- if has_chunks_healthy and chunk_list == chunks_healthy:
- logger.info('{}: Completely healed previously damaged file!'.format(item.path))
- del item.chunks_healthy
- item.chunks = chunk_list
- if 'size' in item:
- item_size = item.size
- item_chunks_size = item.get_size(compressed=False, from_chunks=True)
- if item_size != item_chunks_size:
- # just warn, but keep the inconsistency, so that borg extract can warn about it.
- logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
- item.path, item_size, item_chunks_size))
- def robust_iterator(archive):
- """Iterates through all archive items
- Missing item chunks will be skipped and the msgpack stream will be restarted
- """
- item_keys = frozenset(key.encode() for key in self.manifest.item_keys)
- required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS)
- unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and 'path' in item,
- self.manifest.item_keys)
- _state = 0
- def missing_chunk_detector(chunk_id):
- nonlocal _state
- if _state % 2 != int(chunk_id not in self.chunks):
- _state += 1
- return _state
- def report(msg, chunk_id, chunk_no):
- cid = bin_to_hex(chunk_id)
- msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see "debug dump-archive-items"
- self.error_found = True
- logger.error(msg)
- def list_keys_safe(keys):
- return ', '.join((k.decode() if isinstance(k, bytes) else str(k) for k in keys))
- def valid_item(obj):
- if not isinstance(obj, StableDict):
- return False, 'not a dictionary'
- # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
- # We ignore it here, should it exist. See test_attic013_acl_bug for details.
- obj.pop(b'acl', None)
- keys = set(obj)
- if not required_item_keys.issubset(keys):
- return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
- if not keys.issubset(item_keys):
- return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
- return True, ''
- i = 0
- for state, items in groupby(archive.items, missing_chunk_detector):
- items = list(items)
- if state % 2:
- for chunk_id in items:
- report('item metadata chunk missing', chunk_id, i)
- i += 1
- continue
- if state > 0:
- unpacker.resync()
- for chunk_id, cdata in zip(items, repository.get_many(items)):
- _, data = self.key.decrypt(chunk_id, cdata)
- unpacker.feed(data)
- try:
- for item in unpacker:
- valid, reason = valid_item(item)
- if valid:
- yield Item(internal_dict=item)
- else:
- report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
- except RobustUnpacker.UnpackerCrashed as err:
- report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
- unpacker.resync()
- except Exception:
- report('Exception while unpacking item metadata', chunk_id, i)
- raise
- i += 1
- if archive is None:
- sort_by = sort_by.split(',')
- if any((first, last, prefix)):
- archive_infos = self.manifest.archives.list(sort_by=sort_by, prefix=prefix, first=first, last=last)
- if prefix and not archive_infos:
- logger.warning('--prefix %s does not match any archives', prefix)
- if first and len(archive_infos) < first:
- logger.warning('--first %d archives: only found %d archives', first, len(archive_infos))
- if last and len(archive_infos) < last:
- logger.warning('--last %d archives: only found %d archives', last, len(archive_infos))
- else:
- archive_infos = self.manifest.archives.list(sort_by=sort_by)
- else:
- # we only want one specific archive
- try:
- archive_infos = [self.manifest.archives[archive]]
- except KeyError:
- logger.error("Archive '%s' not found.", archive)
- self.error_found = True
- return
- num_archives = len(archive_infos)
- with cache_if_remote(self.repository) as repository:
- for i, info in enumerate(archive_infos):
- logger.info('Analyzing archive {} ({}/{})'.format(info.name, i + 1, num_archives))
- archive_id = info.id
- if archive_id not in self.chunks:
- logger.error('Archive metadata block is missing!')
- self.error_found = True
- del self.manifest.archives[info.name]
- continue
- mark_as_possibly_superseded(archive_id)
- cdata = self.repository.get(archive_id)
- _, data = self.key.decrypt(archive_id, cdata)
- archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
- if archive.version != 1:
- raise Exception('Unknown archive metadata version')
- archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
- items_buffer = ChunkBuffer(self.key)
- items_buffer.write_chunk = add_callback
- for item in robust_iterator(archive):
- if 'chunks' in item:
- verify_file_chunks(item)
- items_buffer.add(item)
- items_buffer.flush(flush=True)
- for previous_item_id in archive.items:
- mark_as_possibly_superseded(previous_item_id)
- archive.items = items_buffer.chunks
- data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
- new_archive_id = self.key.id_hash(data)
- cdata = self.key.encrypt(Chunk(data))
- add_reference(new_archive_id, len(data), len(cdata), cdata)
- self.manifest.archives[info.name] = (new_archive_id, info.ts)
- def orphan_chunks_check(self):
- if self.check_all:
- unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
- orphaned = unused - self.possibly_superseded
- if orphaned:
- logger.error('{} orphaned objects found!'.format(len(orphaned)))
- self.error_found = True
- if self.repair:
- for id_ in unused:
- self.repository.delete(id_)
- else:
- logger.info('Orphaned objects check skipped (needs all archives checked).')
- def finish(self, save_space=False):
- if self.repair:
- self.manifest.write()
- self.repository.commit(save_space=save_space)
- class ArchiveRecreater:
- class Interrupted(Exception):
- def __init__(self, metadata=None):
- self.metadata = metadata or {}
- @staticmethod
- def is_temporary_archive(archive_name):
- return archive_name.endswith('.recreate')
- def __init__(self, repository, manifest, key, cache, matcher,
- exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False,
- chunker_params=None, compression=None, compression_files=None, always_recompress=False,
- dry_run=False, stats=False, progress=False, file_status_printer=None,
- checkpoint_interval=1800):
- self.repository = repository
- self.key = key
- self.manifest = manifest
- self.cache = cache
- self.matcher = matcher
- self.exclude_caches = exclude_caches
- self.exclude_if_present = exclude_if_present or []
- self.keep_exclude_tags = keep_exclude_tags
- self.rechunkify = chunker_params is not None
- if self.rechunkify:
- logger.debug('Rechunking archives to %s', chunker_params)
- self.chunker_params = chunker_params or CHUNKER_PARAMS
- self.recompress = bool(compression)
- self.always_recompress = always_recompress
- self.compression = compression or CompressionSpec('none')
- self.seen_chunks = set()
- self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
- compression_files or [])
- key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
- self.dry_run = dry_run
- self.stats = stats
- self.progress = progress
- self.print_file_status = file_status_printer or (lambda *args: None)
- self.checkpoint_interval = None if dry_run else checkpoint_interval
- def recreate(self, archive_name, comment=None, target_name=None):
- assert not self.is_temporary_archive(archive_name)
- archive = self.open_archive(archive_name)
- target = self.create_target(archive, target_name)
- if self.exclude_if_present or self.exclude_caches:
- self.matcher_add_tagged_dirs(archive)
- if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
- logger.info("Skipping archive %s, nothing to do", archive_name)
- return
- self.process_items(archive, target)
- replace_original = target_name is None
- self.save(archive, target, comment, replace_original=replace_original)
- def process_items(self, archive, target):
- matcher = self.matcher
- target_is_subset = not matcher.empty()
- hardlink_masters = {} if target_is_subset else None
- def item_is_hardlink_master(item):
- return (target_is_subset and
- stat.S_ISREG(item.mode) and
- item.get('hardlink_master', True) and
- 'source' not in item and
- not matcher.match(item.path))
- for item in archive.iter_items():
- if item_is_hardlink_master(item):
- hardlink_masters[item.path] = (item.get('chunks'), None)
- continue
- if not matcher.match(item.path):
- self.print_file_status('x', item.path)
- continue
- if target_is_subset and stat.S_ISREG(item.mode) and item.get('source') in hardlink_masters:
- # master of this hard link is outside the target subset
- chunks, new_source = hardlink_masters[item.source]
- if new_source is None:
- # First item to use this master, move the chunks
- item.chunks = chunks
- hardlink_masters[item.source] = (None, item.path)
- del item.source
- else:
- # Master was already moved, only update this item's source
- item.source = new_source
- if self.dry_run:
- self.print_file_status('-', item.path)
- else:
- self.process_item(archive, target, item)
- if self.progress:
- target.stats.show_progress(final=True)
- def process_item(self, archive, target, item):
- if 'chunks' in item:
- self.process_chunks(archive, target, item)
- target.stats.nfiles += 1
- target.add_item(item)
- self.print_file_status(file_status(item.mode), item.path)
- def process_chunks(self, archive, target, item):
- if not self.recompress and not target.recreate_rechunkify:
- for chunk_id, size, csize in item.chunks:
- self.cache.chunk_incref(chunk_id, target.stats)
- return item.chunks
- chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
- compress = self.compression_decider1.decide(item.path)
- chunk_processor = partial(self.chunk_processor, target, compress)
- target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
- def chunk_processor(self, target, compress, data):
- chunk_id = self.key.id_hash(data)
- if chunk_id in self.seen_chunks:
- return self.cache.chunk_incref(chunk_id, target.stats)
- chunk = Chunk(data, compress=compress)
- compression_spec, chunk = self.key.compression_decider2.decide(chunk)
- overwrite = self.recompress
- if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
- # Check if this chunk is already compressed the way we want it
- old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
- if Compressor.detect(old_chunk.data).name == compression_spec['name']:
- # Stored chunk has the same compression we wanted
- overwrite = False
- chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
- self.seen_chunks.add(chunk_entry.id)
- return chunk_entry
- def iter_chunks(self, archive, target, chunks):
- chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
- if target.recreate_rechunkify:
- # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
- # (does not load the entire file into memory)
- file = ChunkIteratorFileWrapper(chunk_iterator)
- yield from target.chunker.chunkify(file)
- else:
- for chunk in chunk_iterator:
- yield chunk.data
- def save(self, archive, target, comment=None, replace_original=True):
- if self.dry_run:
- return
- timestamp = archive.ts.replace(tzinfo=None)
- if comment is None:
- comment = archive.metadata.get('comment', '')
- target.save(timestamp=timestamp, comment=comment, additional_metadata={
- 'cmdline': archive.metadata.cmdline,
- 'recreate_cmdline': sys.argv,
- })
- if replace_original:
- archive.delete(Statistics(), progress=self.progress)
- target.rename(archive.name)
- if self.stats:
- target.end = datetime.utcnow()
- log_multi(DASHES,
- str(target),
- DASHES,
- str(target.stats),
- str(self.cache),
- DASHES)
- def matcher_add_tagged_dirs(self, archive):
- """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
- def exclude(dir, tag_item):
- if self.keep_exclude_tags:
- tag_files.append(PathPrefixPattern(tag_item.path))
- tagged_dirs.append(FnmatchPattern(dir + '/'))
- else:
- tagged_dirs.append(PathPrefixPattern(dir))
- matcher = self.matcher
- tag_files = []
- tagged_dirs = []
- # build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
- cachedir_masters = {}
- for item in archive.iter_items(
- filter=lambda item: item.path.endswith(CACHE_TAG_NAME) or matcher.match(item.path)):
- if item.path.endswith(CACHE_TAG_NAME):
- cachedir_masters[item.path] = item
- dir, tag_file = os.path.split(item.path)
- if tag_file in self.exclude_if_present:
- exclude(dir, item)
- if stat.S_ISREG(item.mode):
- if self.exclude_caches and tag_file == CACHE_TAG_NAME:
- if 'chunks' in item:
- file = open_item(archive, item)
- else:
- file = open_item(archive, cachedir_masters[item.source])
- if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
- exclude(dir, item)
- matcher.add(tag_files, True)
- matcher.add(tagged_dirs, False)
- def create_target(self, archive, target_name=None):
- """Create target archive."""
- target_name = target_name or archive.name + '.recreate'
- target = self.create_target_archive(target_name)
- # If the archives use the same chunker params, then don't rechunkify
- source_chunker_params = tuple(archive.metadata.get('chunker_params', []))
- target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
- if target.recreate_rechunkify:
- logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
- return target
- def create_target_archive(self, name):
- target = Archive(self.repository, self.key, self.manifest, name, create=True,
- progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
- checkpoint_interval=self.checkpoint_interval, compression=self.compression)
- return target
- def open_archive(self, name, **kwargs):
- return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)
|