123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import argparse
- from datetime import datetime
- import grp
- import logging
- import pwd
- import re
- def format_time(t):
- """Format datetime suitable for fixed length list output
- """
- if (datetime.now() - t).days < 365:
- return t.strftime('%b %d %H:%M')
- else:
- return t.strftime('%b %d %Y')
- def format_file_mode(mod):
- """Format file mode bits for list output
- """
- def x(v):
- return ''.join(v & m and s or '-'
- for m, s in ((4, 'r'), (2, 'w'), (1, 'x')))
- return '%s%s%s' % (x(mod / 64), x(mod / 8), x(mod))
- def format_file_size(v):
- """Format file size into a human friendly format
- """
- if v > 1024 * 1024 * 1024:
- return '%.2f GB' % (v / 1024. / 1024. / 1024.)
- elif v > 1024 * 1024:
- return '%.2f MB' % (v / 1024. / 1024.)
- elif v > 1024:
- return '%.2f kB' % (v / 1024.)
- else:
- return str(v)
- class IntegrityError(Exception):
- """
- """
- def memoize(function):
- cache = {}
- def decorated_function(*args):
- try:
- return cache[args]
- except KeyError:
- val = function(*args)
- cache[args] = val
- return val
- return decorated_function
- @memoize
- def uid2user(uid):
- try:
- return pwd.getpwuid(uid).pw_name
- except KeyError:
- return None
- @memoize
- def user2uid(user):
- try:
- return pwd.getpwnam(user).pw_uid
- except KeyError:
- return None
- @memoize
- def gid2group(gid):
- try:
- return grp.getgrgid(gid).gr_name
- except KeyError:
- return None
- @memoize
- def group2gid(group):
- try:
- return grp.getgrnam(group).gr_gid
- except KeyError:
- return None
- class LevelFilter(logging.Filter):
- """Filter that counts record levels
- """
- def __init__(self, *args, **kwargs):
- logging.Filter.__init__(self, *args, **kwargs)
- self.count = {}
- def filter(self, record):
- self.count.setdefault(record.levelname, 0)
- self.count[record.levelname] += 1
- return record
- class Location(object):
- loc_re = re.compile(r'^((?:(?P<user>[^@]+)@)?(?P<host>[^:]+):)?'
- r'(?P<path>[^:]*)(?:::(?P<archive>[^:]+))?$')
- def __init__(self, text):
- loc = self.loc_re.match(text)
- loc = loc and loc.groupdict()
- if not loc:
- raise ValueError
- self.user = loc['user']
- self.host = loc['host']
- self.path = loc['path']
- if not self.host and not self.path:
- raise ValueError
- self.archive = loc['archive']
- def __str__(self):
- text = ''
- if self.user:
- text += '%s@' % self.user
- if self.host:
- text += '%s::' % self.host
- if self.path:
- text += self.path
- if self.archive:
- text += ':%s' % self.archive
- return text
- def __repr__(self):
- return "Location('%s')" % self
- def location_validator(archive=None):
- def validator(text):
- try:
- loc = Location(text)
- except ValueError:
- raise argparse.ArgumentTypeError('Invalid location format: "%s"' % text)
- if archive is True and not loc.archive:
- raise argparse.ArgumentTypeError('"%s": No archive specified' % text)
- elif archive is False and loc.archive:
- raise argparse.ArgumentTypeError('"%s" No archive can be specified' % text)
- return loc
- return validator
|