vendorize lazy colored logger from kython

This commit is contained in:
Dima Gerasimov 2020-01-11 16:55:03 +00:00
parent 604cd40699
commit a1f65754f9
3 changed files with 94 additions and 29 deletions

View file

@ -79,20 +79,7 @@ def listify(fn=None, wrapper=list):
# return listify(fn=fn, wrapper=md) # return listify(fn=fn, wrapper=md)
# TODO try importing logzero defensively? from .kython.klogging import setup_logger, LazyLogger
def setup_logger(logger, level=None, format=None, datefmt=None):
import logging
old_root = logging.root
try:
logging.root = logger
logging.basicConfig(
level=level or logging.DEBUG,
format=format or '%(name)s %(asctime)s %(levelname)-8s %(filename)s:%(lineno)-4d %(message)s',
datefmt=datefmt or '%Y-%m-%d %H:%M:%S',
)
finally:
logging.root = old_root
PathIsh = Union[Path, str] PathIsh = Union[Path, str]

68
my/kython/klogging.py Normal file
View file

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import logging
from typing import Union, Optional
Level = int
LevelIsh = Optional[Union[Level, str]]
def mklevel(level: LevelIsh) -> Level:
if level is None:
return logging.NOTSET
if isinstance(level, int):
return level
return getattr(logging, level.upper())
_FMT = '{start}[%(levelname)-7s %(asctime)s %(name)s %(filename)s:%(lineno)d]{end} %(message)s'
_FMT_COLOR = _FMT.format(start='%(color)s', end='%(end_color)s')
_FMT_NOCOLOR = _FMT.format(start='', end='')
def setup_logger(logger: logging.Logger, level: Level) -> None:
try:
import logzero # type: ignore
except ModuleNotFoundError:
import warnings
warnings.warn("You might want to install 'logzero' for nice colored logs!")
logger.setLevel(level)
h = logging.StreamHandler()
h.setLevel(level)
h.setFormatter(logging.Formatter(fmt=_FMT_NOCOLOR))
logger.addHandler(h)
else:
formatter = logzero.LogFormatter(
fmt=_FMT_COLOR,
datefmt=None, # pass None to prevent logzero from messing with date format
)
logzero.setup_logger(logger.name, level=level, formatter=formatter)
class LazyLogger(logging.Logger):
# TODO perhaps should use __new__?
def __new__(cls, name, level: LevelIsh = 'DEBUG'):
logger = logging.getLogger(name)
lvl = mklevel(level)
# this is called prior to all _log calls so makes sense to do it here?
def isEnabledFor_lazyinit(*args, logger=logger, orig=logger.isEnabledFor, **kwargs):
att = 'lazylogger_init_done'
if not getattr(logger, att, False): # init once, if necessary
setup_logger(logger, level=lvl)
setattr(logger, att, True)
return orig(*args, **kwargs)
logger.isEnabledFor = isEnabledFor_lazyinit # type: ignore[assignment]
return logger
def test():
ll = LazyLogger('test')
ll.debug('THIS IS DEBUG')
ll.warning('THIS IS WARNING')
ll.exception(RuntimeError("oops"))
if __name__ == '__main__':
test()

View file

@ -17,7 +17,10 @@ import zipfile
import pytz import pytz
from .common import PathIsh from .common import PathIsh, get_files, LazyLogger
logger = LazyLogger('my.twitter')
_export_path: Optional[Path] = None _export_path: Optional[Path] = None
@ -33,12 +36,7 @@ def _get_export() -> Path:
# fallback to mycfg # fallback to mycfg
from mycfg import paths from mycfg import paths
export_path = paths.twitter.export_path export_path = paths.twitter.export_path
p = Path(export_path) return max(get_files(export_path, '*.zip'))
if p.is_dir():
return max(p.glob('*.zip'))
else:
return p
Tid = str Tid = str
@ -75,23 +73,35 @@ class Tweet:
def __repr__(self) -> str: def __repr__(self) -> str:
return repr(self.tw) return repr(self.tw)
# TODO a bit messy... perhaps we do need DAL for twitter exports
def _from_json_export() -> Iterator[Tweet]: class ZipExport:
epath = _get_export() def __init__(self) -> None:
ddd = zipfile.ZipFile(epath).read('tweet.js').decode('utf8') pass
start = ddd.index('[')
ddd = ddd[start:] def raw(self): # TODO Json in common?
for j in json.loads(ddd): epath = _get_export()
yield Tweet(j) logger.info('processing: %s', epath)
ddd = zipfile.ZipFile(epath).read('tweet.js').decode('utf8')
start = ddd.index('[')
ddd = ddd[start:]
for j in json.loads(ddd):
yield j
def tweets(self) -> Iterator[Tweet]:
for r in self.raw():
yield Tweet(r)
def tweets_all() -> List[Tweet]: def tweets_all() -> List[Tweet]:
return list(sorted(_from_json_export(), key=lambda t: t.dt)) return list(sorted(ZipExport().tweets(), key=lambda t: t.dt))
def predicate(p) -> List[Tweet]: def predicate(p) -> List[Tweet]:
return [t for t in tweets_all() if p(t)] return [t for t in tweets_all() if p(t)]
def predicate_date(p) -> List[Tweet]: def predicate_date(p) -> List[Tweet]:
return predicate(lambda t: p(t.dt.date())) return predicate(lambda t: p(t.dt.date()))