core: improve mypy coverage

This commit is contained in:
Dima Gerasimov 2021-02-17 23:10:37 +00:00 committed by karlicoss
parent 56d5587c20
commit 4ad4f34cda
6 changed files with 60 additions and 43 deletions

View file

@ -1,9 +1,10 @@
import functools
import importlib
import os
from pathlib import Path
from subprocess import check_call, run, PIPE, CompletedProcess
import sys
from subprocess import check_call, run, PIPE
from typing import Optional, Sequence, Iterable, List
import importlib
import traceback
from . import LazyLogger
@ -11,13 +12,12 @@ from . import LazyLogger
log = LazyLogger('HPI cli')
import functools
@functools.lru_cache()
def mypy_cmd() -> Optional[Sequence[str]]:
try:
# preferably, use mypy from current python env
import mypy
return ['python3', '-m', 'mypy']
return [sys.executable, '-m', 'mypy']
except ImportError:
pass
# ok, not ideal but try from PATH
@ -28,7 +28,8 @@ def mypy_cmd() -> Optional[Sequence[str]]:
return None
def run_mypy(pkg):
from types import ModuleType
def run_mypy(pkg: ModuleType) -> Optional[CompletedProcess]:
from .preinit import get_mycfg_dir
mycfg_dir = get_mycfg_dir()
# todo ugh. not sure how to extract it from pkg?
@ -57,7 +58,7 @@ def run_mypy(pkg):
return mres
def eprint(x: str):
def eprint(x: str) -> None:
print(x, file=sys.stderr)
def indent(x: str) -> str:
@ -66,16 +67,16 @@ def indent(x: str) -> str:
OK = ''
OFF = '🔲'
def info(x: str):
def info(x: str) -> None:
eprint(OK + ' ' + x)
def error(x: str):
def error(x: str) -> None:
eprint('' + x)
def warning(x: str):
def warning(x: str) -> None:
eprint('' + x) # todo yellow?
def tb(e):
def tb(e: Exception) -> None:
tb = ''.join(traceback.format_exception(Exception, e, e.__traceback__))
sys.stderr.write(indent(tb))
@ -94,7 +95,9 @@ class color:
RESET = '\033[0m'
def config_create(args) -> None:
from argparse import Namespace
def config_create(args: Namespace) -> None:
from .preinit import get_mycfg_dir
mycfg_dir = get_mycfg_dir()
@ -138,18 +141,18 @@ class example:
sys.exit(1)
def config_check_cli(args) -> None:
def config_check_cli(args: Namespace) -> None:
ok = config_ok(args)
sys.exit(0 if ok else False)
# TODO return the config as a result?
def config_ok(args) -> bool:
def config_ok(args: Namespace) -> bool:
errors: List[Exception] = []
import my
try:
paths = my.__path__._path # type: ignore[attr-defined]
paths: Sequence[str] = my.__path__._path # type: ignore[attr-defined]
except Exception as e:
errors.append(e)
error('failed to determine module import path')
@ -211,8 +214,8 @@ See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-module
return True
def _modules(all=False):
from .util import modules
from .util import HPIModule, modules
def _modules(*, all: bool=False) -> Iterable[HPIModule]:
skipped = []
for m in modules():
if not all and m.skip_reason is not None:
@ -223,7 +226,7 @@ def _modules(all=False):
warning(f'Skipped {len(skipped)} modules: {skipped}. Pass --all if you want to see them.')
def modules_check(args) -> None:
def modules_check(args: Namespace) -> None:
verbose: bool = args.verbose
quick: bool = args.quick
module: Optional[str] = args.module
@ -279,11 +282,12 @@ def modules_check(args) -> None:
info(f' - stats: {res}')
def list_modules(args) -> None:
def list_modules(args: Namespace) -> None:
# todo add a --sort argument?
tabulate_warnings()
for mr in _modules(all=args.all):
all: bool = args.all
for mr in _modules(all=all):
m = mr.name
sr = mr.skip_reason
if sr is None:
@ -302,7 +306,7 @@ def tabulate_warnings() -> None:
'''
import warnings
orig = warnings.formatwarning
def override(*args, **kwargs):
def override(*args, **kwargs) -> str:
res = orig(*args, **kwargs)
return ''.join(' ' + x for x in res.splitlines(keepends=True))
warnings.formatwarning = override
@ -310,14 +314,14 @@ def tabulate_warnings() -> None:
# todo check that it finds private modules too?
def doctor(args) -> None:
def doctor(args: Namespace) -> None:
ok = config_ok(args)
# TODO propagate ok status up?
modules_check(args)
def parser():
from argparse import ArgumentParser
from argparse import ArgumentParser
def parser() -> ArgumentParser:
p = ArgumentParser('Human Programming Interface', epilog='''
Tool for HPI.
@ -347,7 +351,7 @@ Work in progress, will be used for config management, troubleshooting & introspe
return p
def main():
def main() -> None:
p = parser()
args = p.parse_args()

View file

@ -377,6 +377,7 @@ QUICK_STATS = False
C = TypeVar('C')
Stats = Dict[str, Any]
StatsFun = Callable[[], Stats]
# todo not sure about return type...
def stat(func: Union[Callable[[], Iterable[C]], Iterable[C]]) -> Stats:
if callable(func):

View file

@ -72,9 +72,10 @@ config = make_config(Config)
### tests start
from typing import Iterator, Any
from contextlib import contextmanager as ctx
@ctx
def _reset_config():
def _reset_config() -> Iterator[Config]:
# todo maybe have this decorator for the whole of my.config?
from .cfg import override_config
with override_config(config) as cc:

View file

@ -5,23 +5,26 @@ TODO name 'klogging' to avoid possible conflict with default 'logging' module
TODO shit. too late already? maybe use fallback & deprecate
'''
def test() -> None:
from typing import Callable
import logging
import sys
M = lambda s: print(s, file=sys.stderr)
M: Callable[[str], None] = lambda s: print(s, file=sys.stderr)
M(" Logging module's deafults are not great...'")
l = logging.getLogger('test_logger')
# todo why is mypy unhappy about these???
l.error("For example, this should be logged as error. But it's not even formatted properly, doesn't have logger name or level")
M(" The reason is that you need to remember to call basicConfig() first")
logging.basicConfig()
l.error("OK, this is better. But the default format kinda sucks, I prefer having timestamps and the file/line number")
M("")
M(" With LazyLogger you get a reasonable logging format, colours and other neat things")
ll = LazyLogger('test') # No need for basicConfig!
ll = LazyLogger('test') # No need for basicConfig!
ll.info("default level is INFO")
ll.debug(".. so this shouldn't be displayed")
ll.warning("warnings are easy to spot!")
@ -37,6 +40,7 @@ LevelIsh = Optional[Union[Level, str]]
def mklevel(level: LevelIsh) -> Level:
# todo put in some global file, like envvars.py
glevel = os.environ.get('HPI_LOGS', None)
if glevel is not None:
level = glevel
@ -56,16 +60,17 @@ DATEFMT = '%Y-%m-%d %H:%M:%S'
def setup_logger(logger: logging.Logger, level: LevelIsh) -> None:
lvl = mklevel(level)
try:
import logzero # type: ignore[import]
import logzero # type: ignore[import]
except ModuleNotFoundError:
import warnings
warnings.warn("You might want to install 'logzero' for nice colored logs!")
logger.setLevel(lvl)
h = logging.StreamHandler()
h.setLevel(lvl)
h.setFormatter(logging.Formatter(fmt=FORMAT_NOCOLOR, datefmt=DATEFMT))
logger.addHandler(h)
logger.propagate = False # ugh. otherwise it duplicates log messages? not sure about it..
logger.propagate = False # ugh. otherwise it duplicates log messages? not sure about it..
else:
formatter = logzero.LogFormatter(
fmt=FORMAT_COLOR,
@ -75,18 +80,18 @@ def setup_logger(logger: logging.Logger, level: LevelIsh) -> None:
class LazyLogger(logging.Logger):
def __new__(cls, name, level: LevelIsh = 'INFO'):
def __new__(cls, name: str, level: LevelIsh = 'INFO') -> 'LazyLogger':
logger = logging.getLogger(name)
# this is called prior to all _log calls so makes sense to do it here?
def isEnabledFor_lazyinit(*args, logger=logger, orig=logger.isEnabledFor, **kwargs):
att = 'lazylogger_init_done'
if not getattr(logger, att, False): # init once, if necessary
if not getattr(logger, att, False): # init once, if necessary
setup_logger(logger, level=level)
setattr(logger, att, True)
return orig(*args, **kwargs)
logger.isEnabledFor = isEnabledFor_lazyinit # type: ignore[assignment]
return logger
return logger # type: ignore[return-value]
if __name__ == '__main__':

View file

@ -31,7 +31,8 @@ def ignored(m: str) -> bool:
return re.match(f'^my.({exs})$', m) is not None
def get_stats(module: str):
from .common import StatsFun
def get_stats(module: str) -> Optional[StatsFun]:
# todo detect via ast?
try:
mod = import_module(module)
@ -63,19 +64,21 @@ def is_not_hpi_module(module: str) -> Optional[str]:
return "has no 'stats()' function"
return None
from types import ModuleType
# todo reuse in readme/blog post
# borrowed from https://github.com/sanitizers/octomachinery/blob/24288774d6dcf977c5033ae11311dbff89394c89/tests/circular_imports_test.py#L22-L55
def _iter_all_importables(pkg) -> Iterable[HPIModule]:
def _iter_all_importables(pkg: ModuleType) -> Iterable[HPIModule]:
# todo crap. why does it include some stuff three times??
yield from chain.from_iterable(
_discover_path_importables(Path(p), pkg.__name__)
# todo might need to handle __path__ for individual modules too?
# not sure why __path__ was duplicated, but it did happen..
for p in set(pkg.__path__)
for p in set(pkg.__path__) # type: ignore[attr-defined]
)
def _discover_path_importables(pkg_pth, pkg_name) -> Iterable[HPIModule]:
def _discover_path_importables(pkg_pth: Path, pkg_name: str) -> Iterable[HPIModule]:
from .core_config import config
"""Yield all importables under a given path and package."""
@ -109,7 +112,7 @@ def _discover_path_importables(pkg_pth, pkg_name) -> Iterable[HPIModule]:
# TODO when do we need to recurse?
def _walk_packages(path=None, prefix='', onerror=None) -> Iterable[HPIModule]:
def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterable[HPIModule]:
'''
Modified version of https://github.com/python/cpython/blob/d50a0700265536a20bcce3fb108c954746d97625/Lib/pkgutil.py#L53,
to alvoid importing modules that are skipped
@ -123,6 +126,9 @@ def _walk_packages(path=None, prefix='', onerror=None) -> Iterable[HPIModule]:
for info in pkgutil.iter_modules(path, prefix):
mname = info.name
if mname is None:
# why would it be? anyway makes mypy happier
continue
if ignored(mname):
# not sure if need to yield?
@ -189,7 +195,7 @@ def modules_via_ast() -> Iterable[HPIModule]:
a = ast.parse(f.read_text())
NM = '__NOT_HPI_MODULE__'
is_not_module = any(
getattr(node, 'name', None) == NM # direct definition
getattr(node, 'name', None) == NM # direct definition
or
any(getattr(n, 'name', None) == NM for n in getattr(node, 'names', [])) # import from
for node in a.body)
@ -201,7 +207,7 @@ def modules_via_ast() -> Iterable[HPIModule]:
name=m,
skip_reason=None,
doc=doc,
file=f.relative_to(my_root.parent), # todo not sure if should be relative
file=f.relative_to(my_root.parent), # todo not sure if should be relative
)

View file

@ -6,12 +6,13 @@ E.g. would be nice to propagate the warnings in the UI (it's even a subclass of
'''
import sys
from typing import Optional
import warnings
# just bring in the scope of this module for convenience
from warnings import warn
def _colorize(x: str, color=None) -> str:
def _colorize(x: str, color: Optional[str]=None) -> str:
if color is None:
return x
@ -28,8 +29,7 @@ def _colorize(x: str, color=None) -> str:
# todo log something?
return x
def _warn(message: str, *args, color=None, **kwargs) -> None:
def _warn(message: str, *args, color: Optional[str]=None, **kwargs) -> None:
stacklevel = kwargs.get('stacklevel', 1)
kwargs['stacklevel'] = stacklevel + 2 # +1 for this function, +1 for medium/high wrapper
warnings.warn(_colorize(message, color=color), *args, **kwargs)