core: migrate code to benefit from 3.9 stuff

for now keeping ruff on 3.8 target version, need to sort out modules as well
This commit is contained in:
Dima Gerasimov 2024-10-19 20:19:07 +01:00
parent d1511929a8
commit 721fd98dca
37 changed files with 413 additions and 302 deletions

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import functools
import importlib
import inspect
@ -12,13 +14,13 @@ from contextlib import ExitStack
from itertools import chain
from pathlib import Path
from subprocess import PIPE, CompletedProcess, Popen, check_call, run
from typing import Any, Callable, List, Optional, Type
from typing import Any, Callable
import click
@functools.lru_cache
def mypy_cmd() -> Optional[Sequence[str]]:
def mypy_cmd() -> Sequence[str] | None:
try:
# preferably, use mypy from current python env
import mypy # noqa: F401 fine not to use it
@ -33,7 +35,7 @@ def mypy_cmd() -> Optional[Sequence[str]]:
return None
def run_mypy(cfg_path: Path) -> Optional[CompletedProcess]:
def run_mypy(cfg_path: Path) -> CompletedProcess | None:
# todo dunno maybe use the same mypy config in repository?
# I'd need to install mypy.ini then??
env = {**os.environ}
@ -64,21 +66,27 @@ def eprint(x: str) -> None:
# err=True prints to stderr
click.echo(x, err=True)
def indent(x: str) -> str:
# todo use textwrap.indent?
return ''.join(' ' + l for l in x.splitlines(keepends=True))
OK = ''
OK = ''
OFF = '🔲'
def info(x: str) -> None:
eprint(OK + ' ' + x)
def error(x: str) -> None:
eprint('' + x)
def warning(x: str) -> None:
eprint('' + x) # todo yellow?
eprint('' + x) # todo yellow?
def tb(e: Exception) -> None:
tb = ''.join(traceback.format_exception(Exception, e, e.__traceback__))
@ -87,6 +95,7 @@ def tb(e: Exception) -> None:
def config_create() -> None:
from .preinit import get_mycfg_dir
mycfg_dir = get_mycfg_dir()
created = False
@ -95,7 +104,8 @@ def config_create() -> None:
my_config = mycfg_dir / 'my' / 'config' / '__init__.py'
my_config.parent.mkdir(parents=True)
my_config.write_text('''
my_config.write_text(
'''
### HPI personal config
## see
# https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-modules
@ -118,7 +128,8 @@ class example:
### you can insert your own configuration below
### but feel free to delete the stuff above if you don't need ti
'''.lstrip())
'''.lstrip()
)
info(f'created empty config: {my_config}')
created = True
else:
@ -131,12 +142,13 @@ class example:
# todo return the config as a result?
def config_ok() -> bool:
errors: List[Exception] = []
errors: list[Exception] = []
# at this point 'my' should already be imported, so doesn't hurt to extract paths from it
import my
try:
paths: List[str] = list(my.__path__)
paths: list[str] = list(my.__path__)
except Exception as e:
errors.append(e)
error('failed to determine module import path')
@ -146,19 +158,23 @@ def config_ok() -> bool:
# first try doing as much as possible without actually importing my.config
from .preinit import get_mycfg_dir
cfg_path = get_mycfg_dir()
# alternative is importing my.config and then getting cfg_path from its __file__/__path__
# not sure which is better tbh
## check we're not using stub config
import my.core
try:
core_pkg_path = str(Path(my.core.__path__[0]).parent)
if str(cfg_path).startswith(core_pkg_path):
error(f'''
error(
f'''
Seems that the stub config is used ({cfg_path}). This is likely not going to work.
See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-modules for more information
'''.strip())
'''.strip()
)
errors.append(RuntimeError('bad config path'))
except Exception as e:
errors.append(e)
@ -190,7 +206,7 @@ See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-module
## check types
mypy_res = run_mypy(cfg_path)
if mypy_res is not None: # has mypy
if mypy_res is not None: # has mypy
rc = mypy_res.returncode
if rc == 0:
info('mypy check : success')
@ -222,7 +238,7 @@ See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-module
from .util import HPIModule, modules
def _modules(*, all: bool=False) -> Iterable[HPIModule]:
def _modules(*, all: bool = False) -> Iterable[HPIModule]:
skipped = []
for m in modules():
if not all and m.skip_reason is not None:
@ -233,7 +249,7 @@ def _modules(*, all: bool=False) -> Iterable[HPIModule]:
warning(f'Skipped {len(skipped)} modules: {skipped}. Pass --all if you want to see them.')
def modules_check(*, verbose: bool, list_all: bool, quick: bool, for_modules: List[str]) -> None:
def modules_check(*, verbose: bool, list_all: bool, quick: bool, for_modules: list[str]) -> None:
if len(for_modules) > 0:
# if you're checking specific modules, show errors
# hopefully makes sense?
@ -257,7 +273,7 @@ def modules_check(*, verbose: bool, list_all: bool, quick: bool, for_modules: Li
# todo add a --all argument to disregard is_active check?
for mr in mods:
skip = mr.skip_reason
m = mr.name
m = mr.name
if skip is not None:
eprint(f'{OFF} {click.style("SKIP", fg="yellow")}: {m:<50} {skip}')
continue
@ -307,8 +323,8 @@ def list_modules(*, list_all: bool) -> None:
tabulate_warnings()
for mr in _modules(all=list_all):
m = mr.name
sr = mr.skip_reason
m = mr.name
sr = mr.skip_reason
if sr is None:
pre = OK
suf = ''
@ -324,17 +340,20 @@ def tabulate_warnings() -> None:
Helper to avoid visual noise in hpi modules/doctor
'''
import warnings
orig = warnings.formatwarning
def override(*args, **kwargs) -> str:
res = orig(*args, **kwargs)
return ''.join(' ' + x for x in res.splitlines(keepends=True))
warnings.formatwarning = override
# TODO loggers as well?
def _requires(modules: Sequence[str]) -> Sequence[str]:
from .discovery_pure import module_by_name
mods = [module_by_name(module) for module in modules]
res = []
for mod in mods:
@ -361,7 +380,7 @@ def module_requires(*, module: Sequence[str]) -> None:
click.echo(x)
def module_install(*, user: bool, module: Sequence[str], parallel: bool=False, break_system_packages: bool=False) -> None:
def module_install(*, user: bool, module: Sequence[str], parallel: bool = False, break_system_packages: bool = False) -> None:
if isinstance(module, str):
# legacy behavior, used to take a since argument
module = [module]
@ -438,7 +457,7 @@ def _ui_getchar_pick(choices: Sequence[str], prompt: str = 'Select from: ') -> i
return result_map[ch]
def _locate_functions_or_prompt(qualified_names: List[str], *, prompt: bool = True) -> Iterable[Callable[..., Any]]:
def _locate_functions_or_prompt(qualified_names: list[str], *, prompt: bool = True) -> Iterable[Callable[..., Any]]:
from .query import QueryException, locate_qualified_function
from .stats import is_data_provider
@ -488,6 +507,7 @@ def _locate_functions_or_prompt(qualified_names: List[str], *, prompt: bool = Tr
def _warn_exceptions(exc: Exception) -> None:
from my.core import make_logger
logger = make_logger('CLI', level='warning')
logger.exception(f'hpi query: {exc}')
@ -499,14 +519,14 @@ def query_hpi_functions(
*,
output: str = 'json',
stream: bool = False,
qualified_names: List[str],
order_key: Optional[str],
order_by_value_type: Optional[Type],
qualified_names: list[str],
order_key: str | None,
order_by_value_type: type | None,
after: Any,
before: Any,
within: Any,
reverse: bool = False,
limit: Optional[int],
limit: int | None,
drop_unsorted: bool,
wrap_unsorted: bool,
warn_exceptions: bool,
@ -530,7 +550,8 @@ def query_hpi_functions(
warn_exceptions=warn_exceptions,
warn_func=_warn_exceptions,
raise_exceptions=raise_exceptions,
drop_exceptions=drop_exceptions)
drop_exceptions=drop_exceptions,
)
if output == 'json':
from .serialize import dumps
@ -564,7 +585,7 @@ def query_hpi_functions(
# can ignore the mypy warning here, locations_to_gpx yields any errors
# if you didnt pass it something that matches the LocationProtocol
for exc in locations_to_gpx(res, sys.stdout): # type: ignore[arg-type]
for exc in locations_to_gpx(res, sys.stdout): # type: ignore[arg-type]
if warn_exceptions:
_warn_exceptions(exc)
elif raise_exceptions:
@ -581,6 +602,7 @@ def query_hpi_functions(
except ModuleNotFoundError:
eprint("'repl' typically uses ipython, install it with 'python3 -m pip install ipython'. falling back to stdlib...")
import code
code.interact(local=locals())
else:
IPython.embed()
@ -620,13 +642,13 @@ def main(*, debug: bool) -> None:
@functools.lru_cache(maxsize=1)
def _all_mod_names() -> List[str]:
def _all_mod_names() -> list[str]:
"""Should include all modules, in case user is trying to diagnose issues"""
# sort this, so that the order doesn't change while tabbing through
return sorted([m.name for m in modules()])
def _module_autocomplete(ctx: click.Context, args: Sequence[str], incomplete: str) -> List[str]:
def _module_autocomplete(ctx: click.Context, args: Sequence[str], incomplete: str) -> list[str]:
return [m for m in _all_mod_names() if m.startswith(incomplete)]
@ -785,14 +807,14 @@ def query_cmd(
function_name: Sequence[str],
output: str,
stream: bool,
order_key: Optional[str],
order_type: Optional[str],
after: Optional[str],
before: Optional[str],
within: Optional[str],
recent: Optional[str],
order_key: str | None,
order_type: str | None,
after: str | None,
before: str | None,
within: str | None,
recent: str | None,
reverse: bool,
limit: Optional[int],
limit: int | None,
drop_unsorted: bool,
wrap_unsorted: bool,
warn_exceptions: bool,
@ -828,7 +850,7 @@ def query_cmd(
from datetime import date, datetime
chosen_order_type: Optional[Type]
chosen_order_type: type | None
if order_type == "datetime":
chosen_order_type = datetime
elif order_type == "date":
@ -864,7 +886,8 @@ def query_cmd(
wrap_unsorted=wrap_unsorted,
warn_exceptions=warn_exceptions,
raise_exceptions=raise_exceptions,
drop_exceptions=drop_exceptions)
drop_exceptions=drop_exceptions,
)
except QueryException as qe:
eprint(str(qe))
sys.exit(1)
@ -879,6 +902,7 @@ def query_cmd(
def test_requires() -> None:
from click.testing import CliRunner
result = CliRunner().invoke(main, ['module', 'requires', 'my.github.ghexport', 'my.browser.export'])
assert result.exit_code == 0
assert "github.com/karlicoss/ghexport" in result.output