diff --git a/my/common.py b/my/common.py index 861e48b..5379833 100644 --- a/my/common.py +++ b/my/common.py @@ -79,20 +79,7 @@ def listify(fn=None, wrapper=list): # return listify(fn=fn, wrapper=md) -# TODO try importing logzero defensively? -def setup_logger(logger, level=None, format=None, datefmt=None): - import logging - old_root = logging.root - try: - logging.root = logger - logging.basicConfig( - level=level or logging.DEBUG, - format=format or '%(name)s %(asctime)s %(levelname)-8s %(filename)s:%(lineno)-4d %(message)s', - datefmt=datefmt or '%Y-%m-%d %H:%M:%S', - ) - finally: - logging.root = old_root - +from .kython.klogging import setup_logger, LazyLogger PathIsh = Union[Path, str] diff --git a/my/kython/klogging.py b/my/kython/klogging.py new file mode 100644 index 0000000..32e02cf --- /dev/null +++ b/my/kython/klogging.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import logging +from typing import Union, Optional + +Level = int +LevelIsh = Optional[Union[Level, str]] + + +def mklevel(level: LevelIsh) -> Level: + if level is None: + return logging.NOTSET + if isinstance(level, int): + return level + return getattr(logging, level.upper()) + + +_FMT = '{start}[%(levelname)-7s %(asctime)s %(name)s %(filename)s:%(lineno)d]{end} %(message)s' +_FMT_COLOR = _FMT.format(start='%(color)s', end='%(end_color)s') +_FMT_NOCOLOR = _FMT.format(start='', end='') + + +def setup_logger(logger: logging.Logger, level: Level) -> None: + try: + import logzero # type: ignore + except ModuleNotFoundError: + import warnings + warnings.warn("You might want to install 'logzero' for nice colored logs!") + logger.setLevel(level) + h = logging.StreamHandler() + h.setLevel(level) + h.setFormatter(logging.Formatter(fmt=_FMT_NOCOLOR)) + logger.addHandler(h) + else: + formatter = logzero.LogFormatter( + fmt=_FMT_COLOR, + datefmt=None, # pass None to prevent logzero from messing with date format + ) + logzero.setup_logger(logger.name, level=level, formatter=formatter) + + +class LazyLogger(logging.Logger): + # TODO perhaps should use __new__? + + def __new__(cls, name, level: LevelIsh = 'DEBUG'): + logger = logging.getLogger(name) + lvl = mklevel(level) + + # this is called prior to all _log calls so makes sense to do it here? + def isEnabledFor_lazyinit(*args, logger=logger, orig=logger.isEnabledFor, **kwargs): + att = 'lazylogger_init_done' + if not getattr(logger, att, False): # init once, if necessary + setup_logger(logger, level=lvl) + setattr(logger, att, True) + return orig(*args, **kwargs) + + logger.isEnabledFor = isEnabledFor_lazyinit # type: ignore[assignment] + return logger + + +def test(): + ll = LazyLogger('test') + ll.debug('THIS IS DEBUG') + ll.warning('THIS IS WARNING') + ll.exception(RuntimeError("oops")) + + +if __name__ == '__main__': + test() diff --git a/my/twitter.py b/my/twitter.py index 613fafb..55230f9 100755 --- a/my/twitter.py +++ b/my/twitter.py @@ -17,7 +17,10 @@ import zipfile import pytz -from .common import PathIsh +from .common import PathIsh, get_files, LazyLogger + + +logger = LazyLogger('my.twitter') _export_path: Optional[Path] = None @@ -33,12 +36,7 @@ def _get_export() -> Path: # fallback to mycfg from mycfg import paths export_path = paths.twitter.export_path - p = Path(export_path) - if p.is_dir(): - return max(p.glob('*.zip')) - else: - return p - + return max(get_files(export_path, '*.zip')) Tid = str @@ -75,23 +73,35 @@ class Tweet: def __repr__(self) -> str: return repr(self.tw) +# TODO a bit messy... perhaps we do need DAL for twitter exports -def _from_json_export() -> Iterator[Tweet]: - epath = _get_export() - ddd = zipfile.ZipFile(epath).read('tweet.js').decode('utf8') - start = ddd.index('[') - ddd = ddd[start:] - for j in json.loads(ddd): - yield Tweet(j) +class ZipExport: + def __init__(self) -> None: + pass + + def raw(self): # TODO Json in common? + epath = _get_export() + logger.info('processing: %s', epath) + ddd = zipfile.ZipFile(epath).read('tweet.js').decode('utf8') + start = ddd.index('[') + ddd = ddd[start:] + for j in json.loads(ddd): + yield j + + + def tweets(self) -> Iterator[Tweet]: + for r in self.raw(): + yield Tweet(r) def tweets_all() -> List[Tweet]: - return list(sorted(_from_json_export(), key=lambda t: t.dt)) + return list(sorted(ZipExport().tweets(), key=lambda t: t.dt)) def predicate(p) -> List[Tweet]: return [t for t in tweets_all() if p(t)] + def predicate_date(p) -> List[Tweet]: return predicate(lambda t: p(t.dt.date()))