From d3f9a8e8b69542361ad0838a1012a1ad10440b5b Mon Sep 17 00:00:00 2001 From: karlicoss Date: Sat, 19 Oct 2024 20:55:09 +0100 Subject: [PATCH] core: migrate code to benefit from 3.9 stuff (#401) for now keeping ruff on 3.8 target version, need to sort out modules as well --- my/core/__init__.py | 4 +- my/core/__main__.py | 97 +++++++++++++++++++++------------ my/core/_cpu_pool.py | 9 ++- my/core/_deprecated/kompress.py | 5 +- my/core/cachew.py | 29 +++++----- my/core/cfg.py | 20 +++++-- my/core/common.py | 36 ++++++------ my/core/compat.py | 7 ++- my/core/core_config.py | 33 ++++++----- my/core/denylist.py | 28 +++++----- my/core/discovery_pure.py | 19 ++++--- my/core/error.py | 43 ++++++++------- my/core/experimental.py | 6 +- my/core/freezer.py | 29 +++++----- my/core/hpi_compat.py | 13 +++-- my/core/influxdb.py | 29 +++++++--- my/core/init.py | 2 + my/core/kompress.py | 4 +- my/core/konsume.py | 39 +++++++------ my/core/mime.py | 11 ++-- my/core/orgmode.py | 8 ++- my/core/pandas.py | 7 +-- my/core/preinit.py | 1 + my/core/pytest.py | 4 +- my/core/query.py | 76 ++++++++++++-------------- my/core/query_range.py | 68 +++++++++++++---------- my/core/serialize.py | 20 ++++--- my/core/source.py | 12 +++- my/core/sqlite.py | 47 +++++++++------- my/core/stats.py | 34 +++++------- my/core/structure.py | 14 +++-- my/core/tests/auto_stats.py | 2 +- my/core/tests/common.py | 6 +- my/core/tests/denylist.py | 3 +- my/core/tests/test_cachew.py | 8 +-- my/core/tests/test_config.py | 2 +- my/core/time.py | 15 +++-- my/core/types.py | 13 +++-- my/core/util.py | 28 ++++++---- my/core/utils/concurrent.py | 7 ++- my/core/utils/imports.py | 14 ++--- my/core/utils/itertools.py | 59 +++++++++----------- my/core/warnings.py | 8 ++- 43 files changed, 515 insertions(+), 404 deletions(-) diff --git a/my/core/__init__.py b/my/core/__init__.py index ba633f6..cc549d5 100644 --- a/my/core/__init__.py +++ b/my/core/__init__.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from .cfg import make_config from .common import PathIsh, Paths, get_files from .compat import assert_never -from .error import Res, unwrap, notnone +from .error import Res, notnone, unwrap from .logging import ( make_logger, ) @@ -52,7 +52,7 @@ __all__ = [ # you could put _init_hook.py next to your private my/config # that way you can configure logging/warnings/env variables on every HPI import try: - import my._init_hook # type: ignore[import-not-found] + import my._init_hook # type: ignore[import-not-found] # noqa: F401 except: pass ## diff --git a/my/core/__main__.py b/my/core/__main__.py index 2777008..00ac4ee 100644 --- a/my/core/__main__.py +++ b/my/core/__main__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import functools import importlib import inspect @@ -7,17 +9,18 @@ import shutil import sys import tempfile import traceback +from collections.abc import Iterable, Sequence from contextlib import ExitStack from itertools import chain from pathlib import Path from subprocess import PIPE, CompletedProcess, Popen, check_call, run -from typing import Any, Callable, Iterable, List, Optional, Sequence, Type +from typing import Any, Callable import click @functools.lru_cache -def mypy_cmd() -> Optional[Sequence[str]]: +def mypy_cmd() -> Sequence[str] | None: try: # preferably, use mypy from current python env import mypy # noqa: F401 fine not to use it @@ -32,7 +35,7 @@ def mypy_cmd() -> Optional[Sequence[str]]: return None -def run_mypy(cfg_path: Path) -> Optional[CompletedProcess]: +def run_mypy(cfg_path: Path) -> CompletedProcess | None: # todo dunno maybe use the same mypy config in repository? # I'd need to install mypy.ini then?? env = {**os.environ} @@ -63,21 +66,27 @@ def eprint(x: str) -> None: # err=True prints to stderr click.echo(x, err=True) + def indent(x: str) -> str: + # todo use textwrap.indent? return ''.join(' ' + l for l in x.splitlines(keepends=True)) -OK = '✅' +OK = '✅' OFF = '🔲' + def info(x: str) -> None: eprint(OK + ' ' + x) + def error(x: str) -> None: eprint('❌ ' + x) + def warning(x: str) -> None: - eprint('❗ ' + x) # todo yellow? + eprint('❗ ' + x) # todo yellow? + def tb(e: Exception) -> None: tb = ''.join(traceback.format_exception(Exception, e, e.__traceback__)) @@ -86,6 +95,7 @@ def tb(e: Exception) -> None: def config_create() -> None: from .preinit import get_mycfg_dir + mycfg_dir = get_mycfg_dir() created = False @@ -94,7 +104,8 @@ def config_create() -> None: my_config = mycfg_dir / 'my' / 'config' / '__init__.py' my_config.parent.mkdir(parents=True) - my_config.write_text(''' + my_config.write_text( + ''' ### HPI personal config ## see # https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-modules @@ -117,7 +128,8 @@ class example: ### you can insert your own configuration below ### but feel free to delete the stuff above if you don't need ti -'''.lstrip()) +'''.lstrip() + ) info(f'created empty config: {my_config}') created = True else: @@ -130,12 +142,13 @@ class example: # todo return the config as a result? def config_ok() -> bool: - errors: List[Exception] = [] + errors: list[Exception] = [] # at this point 'my' should already be imported, so doesn't hurt to extract paths from it import my + try: - paths: List[str] = list(my.__path__) + paths: list[str] = list(my.__path__) except Exception as e: errors.append(e) error('failed to determine module import path') @@ -145,19 +158,23 @@ def config_ok() -> bool: # first try doing as much as possible without actually importing my.config from .preinit import get_mycfg_dir + cfg_path = get_mycfg_dir() # alternative is importing my.config and then getting cfg_path from its __file__/__path__ # not sure which is better tbh ## check we're not using stub config import my.core + try: core_pkg_path = str(Path(my.core.__path__[0]).parent) if str(cfg_path).startswith(core_pkg_path): - error(f''' + error( + f''' Seems that the stub config is used ({cfg_path}). This is likely not going to work. See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-modules for more information -'''.strip()) +'''.strip() + ) errors.append(RuntimeError('bad config path')) except Exception as e: errors.append(e) @@ -189,7 +206,7 @@ See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-module ## check types mypy_res = run_mypy(cfg_path) - if mypy_res is not None: # has mypy + if mypy_res is not None: # has mypy rc = mypy_res.returncode if rc == 0: info('mypy check : success') @@ -221,7 +238,7 @@ See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-module from .util import HPIModule, modules -def _modules(*, all: bool=False) -> Iterable[HPIModule]: +def _modules(*, all: bool = False) -> Iterable[HPIModule]: skipped = [] for m in modules(): if not all and m.skip_reason is not None: @@ -232,7 +249,7 @@ def _modules(*, all: bool=False) -> Iterable[HPIModule]: warning(f'Skipped {len(skipped)} modules: {skipped}. Pass --all if you want to see them.') -def modules_check(*, verbose: bool, list_all: bool, quick: bool, for_modules: List[str]) -> None: +def modules_check(*, verbose: bool, list_all: bool, quick: bool, for_modules: list[str]) -> None: if len(for_modules) > 0: # if you're checking specific modules, show errors # hopefully makes sense? @@ -256,7 +273,7 @@ def modules_check(*, verbose: bool, list_all: bool, quick: bool, for_modules: Li # todo add a --all argument to disregard is_active check? for mr in mods: skip = mr.skip_reason - m = mr.name + m = mr.name if skip is not None: eprint(f'{OFF} {click.style("SKIP", fg="yellow")}: {m:<50} {skip}') continue @@ -306,8 +323,8 @@ def list_modules(*, list_all: bool) -> None: tabulate_warnings() for mr in _modules(all=list_all): - m = mr.name - sr = mr.skip_reason + m = mr.name + sr = mr.skip_reason if sr is None: pre = OK suf = '' @@ -323,17 +340,20 @@ def tabulate_warnings() -> None: Helper to avoid visual noise in hpi modules/doctor ''' import warnings + orig = warnings.formatwarning def override(*args, **kwargs) -> str: res = orig(*args, **kwargs) return ''.join(' ' + x for x in res.splitlines(keepends=True)) + warnings.formatwarning = override # TODO loggers as well? def _requires(modules: Sequence[str]) -> Sequence[str]: from .discovery_pure import module_by_name + mods = [module_by_name(module) for module in modules] res = [] for mod in mods: @@ -360,7 +380,7 @@ def module_requires(*, module: Sequence[str]) -> None: click.echo(x) -def module_install(*, user: bool, module: Sequence[str], parallel: bool=False, break_system_packages: bool=False) -> None: +def module_install(*, user: bool, module: Sequence[str], parallel: bool = False, break_system_packages: bool = False) -> None: if isinstance(module, str): # legacy behavior, used to take a since argument module = [module] @@ -437,7 +457,7 @@ def _ui_getchar_pick(choices: Sequence[str], prompt: str = 'Select from: ') -> i return result_map[ch] -def _locate_functions_or_prompt(qualified_names: List[str], *, prompt: bool = True) -> Iterable[Callable[..., Any]]: +def _locate_functions_or_prompt(qualified_names: list[str], *, prompt: bool = True) -> Iterable[Callable[..., Any]]: from .query import QueryException, locate_qualified_function from .stats import is_data_provider @@ -487,6 +507,7 @@ def _locate_functions_or_prompt(qualified_names: List[str], *, prompt: bool = Tr def _warn_exceptions(exc: Exception) -> None: from my.core import make_logger + logger = make_logger('CLI', level='warning') logger.exception(f'hpi query: {exc}') @@ -498,14 +519,14 @@ def query_hpi_functions( *, output: str = 'json', stream: bool = False, - qualified_names: List[str], - order_key: Optional[str], - order_by_value_type: Optional[Type], + qualified_names: list[str], + order_key: str | None, + order_by_value_type: type | None, after: Any, before: Any, within: Any, reverse: bool = False, - limit: Optional[int], + limit: int | None, drop_unsorted: bool, wrap_unsorted: bool, warn_exceptions: bool, @@ -529,7 +550,8 @@ def query_hpi_functions( warn_exceptions=warn_exceptions, warn_func=_warn_exceptions, raise_exceptions=raise_exceptions, - drop_exceptions=drop_exceptions) + drop_exceptions=drop_exceptions, + ) if output == 'json': from .serialize import dumps @@ -563,7 +585,7 @@ def query_hpi_functions( # can ignore the mypy warning here, locations_to_gpx yields any errors # if you didnt pass it something that matches the LocationProtocol - for exc in locations_to_gpx(res, sys.stdout): # type: ignore[arg-type] + for exc in locations_to_gpx(res, sys.stdout): # type: ignore[arg-type] if warn_exceptions: _warn_exceptions(exc) elif raise_exceptions: @@ -580,6 +602,7 @@ def query_hpi_functions( except ModuleNotFoundError: eprint("'repl' typically uses ipython, install it with 'python3 -m pip install ipython'. falling back to stdlib...") import code + code.interact(local=locals()) else: IPython.embed() @@ -619,13 +642,13 @@ def main(*, debug: bool) -> None: @functools.lru_cache(maxsize=1) -def _all_mod_names() -> List[str]: +def _all_mod_names() -> list[str]: """Should include all modules, in case user is trying to diagnose issues""" # sort this, so that the order doesn't change while tabbing through return sorted([m.name for m in modules()]) -def _module_autocomplete(ctx: click.Context, args: Sequence[str], incomplete: str) -> List[str]: +def _module_autocomplete(ctx: click.Context, args: Sequence[str], incomplete: str) -> list[str]: return [m for m in _all_mod_names() if m.startswith(incomplete)] @@ -784,14 +807,14 @@ def query_cmd( function_name: Sequence[str], output: str, stream: bool, - order_key: Optional[str], - order_type: Optional[str], - after: Optional[str], - before: Optional[str], - within: Optional[str], - recent: Optional[str], + order_key: str | None, + order_type: str | None, + after: str | None, + before: str | None, + within: str | None, + recent: str | None, reverse: bool, - limit: Optional[int], + limit: int | None, drop_unsorted: bool, wrap_unsorted: bool, warn_exceptions: bool, @@ -827,7 +850,7 @@ def query_cmd( from datetime import date, datetime - chosen_order_type: Optional[Type] + chosen_order_type: type | None if order_type == "datetime": chosen_order_type = datetime elif order_type == "date": @@ -863,7 +886,8 @@ def query_cmd( wrap_unsorted=wrap_unsorted, warn_exceptions=warn_exceptions, raise_exceptions=raise_exceptions, - drop_exceptions=drop_exceptions) + drop_exceptions=drop_exceptions, + ) except QueryException as qe: eprint(str(qe)) sys.exit(1) @@ -878,6 +902,7 @@ def query_cmd( def test_requires() -> None: from click.testing import CliRunner + result = CliRunner().invoke(main, ['module', 'requires', 'my.github.ghexport', 'my.browser.export']) assert result.exit_code == 0 assert "github.com/karlicoss/ghexport" in result.output diff --git a/my/core/_cpu_pool.py b/my/core/_cpu_pool.py index 2369075..6b107a7 100644 --- a/my/core/_cpu_pool.py +++ b/my/core/_cpu_pool.py @@ -10,15 +10,18 @@ how many cores we want to dedicate to the DAL. Enabled by the env variable, specifying how many cores to dedicate e.g. "HPI_CPU_POOL=4 hpi query ..." """ + +from __future__ import annotations + import os from concurrent.futures import ProcessPoolExecutor -from typing import Optional, cast +from typing import cast _NOT_SET = cast(ProcessPoolExecutor, object()) -_INSTANCE: Optional[ProcessPoolExecutor] = _NOT_SET +_INSTANCE: ProcessPoolExecutor | None = _NOT_SET -def get_cpu_pool() -> Optional[ProcessPoolExecutor]: +def get_cpu_pool() -> ProcessPoolExecutor | None: global _INSTANCE if _INSTANCE is _NOT_SET: use_cpu_pool = os.environ.get('HPI_CPU_POOL') diff --git a/my/core/_deprecated/kompress.py b/my/core/_deprecated/kompress.py index cd27a7f..ce14fad 100644 --- a/my/core/_deprecated/kompress.py +++ b/my/core/_deprecated/kompress.py @@ -1,16 +1,17 @@ """ Various helpers for compression """ + # fmt: off from __future__ import annotations import io import pathlib -import sys +from collections.abc import Iterator, Sequence from datetime import datetime from functools import total_ordering from pathlib import Path -from typing import IO, Any, Iterator, Sequence, Union +from typing import IO, Any, Union PathIsh = Union[Path, str] diff --git a/my/core/cachew.py b/my/core/cachew.py index dc6ed79..9ccee09 100644 --- a/my/core/cachew.py +++ b/my/core/cachew.py @@ -1,16 +1,18 @@ -from .internal import assert_subpackage; assert_subpackage(__name__) +from __future__ import annotations + +from .internal import assert_subpackage + +assert_subpackage(__name__) import logging import sys +from collections.abc import Iterator from contextlib import contextmanager from pathlib import Path from typing import ( TYPE_CHECKING, Any, Callable, - Iterator, - Optional, - Type, TypeVar, Union, cast, @@ -21,7 +23,6 @@ import appdirs # type: ignore[import-untyped] from . import warnings - PathIsh = Union[str, Path] # avoid circular import from .common @@ -60,12 +61,12 @@ def _appdirs_cache_dir() -> Path: _CACHE_DIR_NONE_HACK = Path('/tmp/hpi/cachew_none_hack') -def cache_dir(suffix: Optional[PathIsh] = None) -> Path: +def cache_dir(suffix: PathIsh | None = None) -> Path: from . import core_config as CC cdir_ = CC.config.get_cache_dir() - sp: Optional[Path] = None + sp: Path | None = None if suffix is not None: sp = Path(suffix) # guess if you do need absolute, better path it directly instead of as suffix? @@ -144,21 +145,19 @@ if TYPE_CHECKING: # we need two versions due to @doublewrap # this is when we just annotate as @cachew without any args @overload # type: ignore[no-overload-impl] - def mcachew(fun: F) -> F: - ... + def mcachew(fun: F) -> F: ... @overload def mcachew( - cache_path: Optional[PathProvider] = ..., + cache_path: PathProvider | None = ..., *, force_file: bool = ..., - cls: Optional[Type] = ..., + cls: type | None = ..., depends_on: HashFunction = ..., - logger: Optional[logging.Logger] = ..., + logger: logging.Logger | None = ..., chunk_by: int = ..., - synthetic_key: Optional[str] = ..., - ) -> Callable[[F], F]: - ... + synthetic_key: str | None = ..., + ) -> Callable[[F], F]: ... else: mcachew = _mcachew_impl diff --git a/my/core/cfg.py b/my/core/cfg.py index a71a7e3..9851443 100644 --- a/my/core/cfg.py +++ b/my/core/cfg.py @@ -3,28 +3,32 @@ from __future__ import annotations import importlib import re import sys +from collections.abc import Iterator from contextlib import ExitStack, contextmanager -from typing import Any, Callable, Dict, Iterator, Optional, Type, TypeVar +from typing import Any, Callable, TypeVar -Attrs = Dict[str, Any] +Attrs = dict[str, Any] C = TypeVar('C') + # todo not sure about it, could be overthinking... # but short enough to change later # TODO document why it's necessary? -def make_config(cls: Type[C], migration: Callable[[Attrs], Attrs]=lambda x: x) -> C: +def make_config(cls: type[C], migration: Callable[[Attrs], Attrs] = lambda x: x) -> C: user_config = cls.__base__ old_props = { # NOTE: deliberately use gettatr to 'force' class properties here - k: getattr(user_config, k) for k in vars(user_config) + k: getattr(user_config, k) + for k in vars(user_config) } new_props = migration(old_props) from dataclasses import fields + params = { k: v for k, v in new_props.items() - if k in {f.name for f in fields(cls)} # type: ignore[arg-type] # see https://github.com/python/typing_extensions/issues/115 + if k in {f.name for f in fields(cls)} # type: ignore[arg-type] # see https://github.com/python/typing_extensions/issues/115 } # todo maybe return type here? return cls(**params) @@ -51,6 +55,8 @@ def _override_config(config: F) -> Iterator[F]: ModuleRegex = str + + @contextmanager def _reload_modules(modules: ModuleRegex) -> Iterator[None]: # need to use list here, otherwise reordering with set might mess things up @@ -81,13 +87,14 @@ def _reload_modules(modules: ModuleRegex) -> Iterator[None]: @contextmanager -def tmp_config(*, modules: Optional[ModuleRegex]=None, config=None): +def tmp_config(*, modules: ModuleRegex | None = None, config=None): if modules is None: assert config is None if modules is not None: assert config is not None import my.config + with ExitStack() as module_reload_stack, _override_config(my.config) as new_config: if config is not None: overrides = {k: v for k, v in vars(config).items() if not k.startswith('__')} @@ -102,6 +109,7 @@ def tmp_config(*, modules: Optional[ModuleRegex]=None, config=None): def test_tmp_config() -> None: class extra: data_path = '/path/to/data' + with tmp_config() as c: assert c.google != 'whatever' assert not hasattr(c, 'extra') diff --git a/my/core/common.py b/my/core/common.py index a2c2ad3..91fe9bd 100644 --- a/my/core/common.py +++ b/my/core/common.py @@ -1,20 +1,18 @@ +from __future__ import annotations + import os +from collections.abc import Iterable, Sequence from glob import glob as do_glob from pathlib import Path from typing import ( TYPE_CHECKING, Callable, Generic, - Iterable, - List, - Sequence, - Tuple, TypeVar, Union, ) -from . import compat -from . import warnings +from . import compat, warnings # some helper functions # TODO start deprecating this? soon we'd be able to use Path | str syntax which is shorter and more explicit @@ -24,20 +22,22 @@ Paths = Union[Sequence[PathIsh], PathIsh] DEFAULT_GLOB = '*' + + def get_files( pp: Paths, - glob: str=DEFAULT_GLOB, + glob: str = DEFAULT_GLOB, *, - sort: bool=True, - guess_compression: bool=True, -) -> Tuple[Path, ...]: + sort: bool = True, + guess_compression: bool = True, +) -> tuple[Path, ...]: """ Helper function to avoid boilerplate. Tuple as return type is a bit friendlier for hashing/caching, so hopefully makes sense """ # TODO FIXME mm, some wrapper to assert iterator isn't empty? - sources: List[Path] + sources: list[Path] if isinstance(pp, Path): sources = [pp] elif isinstance(pp, str): @@ -54,7 +54,7 @@ def get_files( # TODO ugh. very flaky... -3 because [, get_files(), ] return traceback.extract_stack()[-3].filename - paths: List[Path] = [] + paths: list[Path] = [] for src in sources: if src.parts[0] == '~': src = src.expanduser() @@ -64,7 +64,7 @@ def get_files( if glob != DEFAULT_GLOB: warnings.medium(f"{caller()}: treating {gs} as glob path. Explicit glob={glob} argument is ignored!") paths.extend(map(Path, do_glob(gs))) - elif os.path.isdir(str(src)): + elif os.path.isdir(str(src)): # noqa: PTH112 # NOTE: we're using os.path here on purpose instead of src.is_dir # the reason is is_dir for archives might return True and then # this clause would try globbing insize the archives @@ -234,16 +234,14 @@ if not TYPE_CHECKING: return types.asdict(*args, **kwargs) # todo wrap these in deprecated decorator as well? + # TODO hmm how to deprecate these in runtime? + # tricky cause they are actually classes/types + from typing import Literal # noqa: F401 + from .cachew import mcachew # noqa: F401 # this is kinda internal, should just use my.core.logging.setup_logger if necessary from .logging import setup_logger - - # TODO hmm how to deprecate these in runtime? - # tricky cause they are actually classes/types - - from typing import Literal # noqa: F401 - from .stats import Stats from .types import ( Json, diff --git a/my/core/compat.py b/my/core/compat.py index 3273ff4..8f719a8 100644 --- a/my/core/compat.py +++ b/my/core/compat.py @@ -3,6 +3,8 @@ Contains backwards compatibility helpers for different python versions. If something is relevant to HPI itself, please put it in .hpi_compat instead ''' +from __future__ import annotations + import sys from typing import TYPE_CHECKING @@ -29,6 +31,7 @@ if not TYPE_CHECKING: @deprecated('use .removesuffix method on string directly instead') def removesuffix(text: str, suffix: str) -> str: return text.removesuffix(suffix) + ## ## used to have compat function before 3.8 for these, keeping for runtime back compatibility @@ -46,13 +49,13 @@ else: # bisect_left doesn't have a 'key' parameter (which we use) # till python3.10 if sys.version_info[:2] <= (3, 9): - from typing import Any, Callable, List, Optional, TypeVar + from typing import Any, Callable, List, Optional, TypeVar # noqa: UP035 X = TypeVar('X') # copied from python src # fmt: off - def bisect_left(a: List[Any], x: Any, lo: int=0, hi: Optional[int]=None, *, key: Optional[Callable[..., Any]]=None) -> int: + def bisect_left(a: list[Any], x: Any, lo: int=0, hi: int | None=None, *, key: Callable[..., Any] | None=None) -> int: if lo < 0: raise ValueError('lo must be non-negative') if hi is None: diff --git a/my/core/core_config.py b/my/core/core_config.py index 9036971..3f26c03 100644 --- a/my/core/core_config.py +++ b/my/core/core_config.py @@ -2,18 +2,21 @@ Bindings for the 'core' HPI configuration ''' +from __future__ import annotations + import re +from collections.abc import Sequence from dataclasses import dataclass from pathlib import Path -from typing import Optional, Sequence -from . import PathIsh, warnings +from . import warnings try: from my.config import core as user_config # type: ignore[attr-defined] except Exception as e: try: from my.config import common as user_config # type: ignore[attr-defined] + warnings.high("'common' config section is deprecated. Please rename it to 'core'.") except Exception as e2: # make it defensive, because it's pretty commonly used and would be annoying if it breaks hpi doctor etc. @@ -24,6 +27,7 @@ except Exception as e: _HPI_CACHE_DIR_DEFAULT = '' + @dataclass class Config(user_config): ''' @@ -34,7 +38,7 @@ class Config(user_config): cache_dir = '/your/custom/cache/path' ''' - cache_dir: Optional[PathIsh] = _HPI_CACHE_DIR_DEFAULT + cache_dir: Path | str | None = _HPI_CACHE_DIR_DEFAULT ''' Base directory for cachew. - if None , means cache is disabled @@ -44,7 +48,7 @@ class Config(user_config): NOTE: you shouldn't use this attribute in HPI modules directly, use Config.get_cache_dir()/cachew.cache_dir() instead ''' - tmp_dir: Optional[PathIsh] = None + tmp_dir: Path | str | None = None ''' Path to a temporary directory. This can be used temporarily while extracting zipfiles etc... @@ -52,34 +56,36 @@ class Config(user_config): - otherwise , use the specified directory as the base temporary directory ''' - enabled_modules : Optional[Sequence[str]] = None + enabled_modules: Sequence[str] | None = None ''' list of regexes/globs - None means 'rely on disabled_modules' ''' - disabled_modules: Optional[Sequence[str]] = None + disabled_modules: Sequence[str] | None = None ''' list of regexes/globs - None means 'rely on enabled_modules' ''' - def get_cache_dir(self) -> Optional[Path]: + def get_cache_dir(self) -> Path | None: cdir = self.cache_dir if cdir is None: return None if cdir == _HPI_CACHE_DIR_DEFAULT: from .cachew import _appdirs_cache_dir + return _appdirs_cache_dir() else: return Path(cdir).expanduser() def get_tmp_dir(self) -> Path: - tdir: Optional[PathIsh] = self.tmp_dir + tdir: Path | str | None = self.tmp_dir tpath: Path # use tempfile if unset if tdir is None: import tempfile + tpath = Path(tempfile.gettempdir()) / 'HPI' else: tpath = Path(tdir) @@ -87,10 +93,10 @@ class Config(user_config): tpath.mkdir(parents=True, exist_ok=True) return tpath - def _is_module_active(self, module: str) -> Optional[bool]: + def _is_module_active(self, module: str) -> bool | None: # None means the config doesn't specify anything # todo might be nice to return the 'reason' too? e.g. which option has matched - def matches(specs: Sequence[str]) -> Optional[str]: + def matches(specs: Sequence[str]) -> str | None: for spec in specs: # not sure because . (packages separate) matches anything, but I guess unlikely to clash if re.match(spec, module): @@ -106,10 +112,10 @@ class Config(user_config): return None else: return False - else: # not None + else: # not None if off is None: return True - else: # not None + else: # not None # fallback onto the 'enable everything', then the user will notice warnings.medium(f"[module]: conflicting regexes '{on}' and '{off}' are set in the config. Please only use one of them.") return True @@ -121,8 +127,8 @@ config = make_config(Config) ### tests start +from collections.abc import Iterator from contextlib import contextmanager as ctx -from typing import Iterator @ctx @@ -163,4 +169,5 @@ def test_active_modules() -> None: assert cc._is_module_active("my.body.exercise") is True assert len(record_warnings) == 1 + ### tests end diff --git a/my/core/denylist.py b/my/core/denylist.py index 92faf2c..c92f9a0 100644 --- a/my/core/denylist.py +++ b/my/core/denylist.py @@ -5,23 +5,25 @@ A helper module for defining denylists for sources programmatically For docs, see doc/DENYLIST.md """ +from __future__ import annotations + import functools import json import sys from collections import defaultdict +from collections.abc import Iterator, Mapping from pathlib import Path -from typing import Any, Dict, Iterator, List, Mapping, Set, TypeVar +from typing import Any, TypeVar import click from more_itertools import seekable -from my.core.common import PathIsh -from my.core.serialize import dumps -from my.core.warnings import medium +from .serialize import dumps +from .warnings import medium T = TypeVar("T") -DenyMap = Mapping[str, Set[Any]] +DenyMap = Mapping[str, set[Any]] def _default_key_func(obj: T) -> str: @@ -29,9 +31,9 @@ def _default_key_func(obj: T) -> str: class DenyList: - def __init__(self, denylist_file: PathIsh): + def __init__(self, denylist_file: Path | str) -> None: self.file = Path(denylist_file).expanduser().absolute() - self._deny_raw_list: List[Dict[str, Any]] = [] + self._deny_raw_list: list[dict[str, Any]] = [] self._deny_map: DenyMap = defaultdict(set) # deny cli, user can override these @@ -45,7 +47,7 @@ class DenyList: return deny_map: DenyMap = defaultdict(set) - data: List[Dict[str, Any]]= json.loads(self.file.read_text()) + data: list[dict[str, Any]] = json.loads(self.file.read_text()) self._deny_raw_list = data for ignore in data: @@ -112,7 +114,7 @@ class DenyList: self._load() self._deny_raw({key: self._stringify_value(value)}, write=write) - def _deny_raw(self, data: Dict[str, Any], *, write: bool = False) -> None: + def _deny_raw(self, data: dict[str, Any], *, write: bool = False) -> None: self._deny_raw_list.append(data) if write: self.write() @@ -131,7 +133,7 @@ class DenyList: def _deny_cli_remember( self, items: Iterator[T], - mem: Dict[str, T], + mem: dict[str, T], ) -> Iterator[str]: keyf = self._deny_cli_key_func or _default_key_func # i.e., convert each item to a string, and map str -> item @@ -157,10 +159,8 @@ class DenyList: # reset the iterator sit.seek(0) # so we can map the selected string from fzf back to the original objects - memory_map: Dict[str, T] = {} - picker = FzfPrompt( - executable_path=self.fzf_path, default_options="--no-multi" - ) + memory_map: dict[str, T] = {} + picker = FzfPrompt(executable_path=self.fzf_path, default_options="--no-multi") picked_l = picker.prompt( self._deny_cli_remember(itr, memory_map), "--read0", diff --git a/my/core/discovery_pure.py b/my/core/discovery_pure.py index b753de8..18a19c4 100644 --- a/my/core/discovery_pure.py +++ b/my/core/discovery_pure.py @@ -10,6 +10,8 @@ This potentially allows it to be: It should be free of external modules, importlib, exec, etc. etc. ''' +from __future__ import annotations + REQUIRES = 'REQUIRES' NOT_HPI_MODULE_VAR = '__NOT_HPI_MODULE__' @@ -19,8 +21,9 @@ import ast import logging import os import re +from collections.abc import Iterable, Sequence from pathlib import Path -from typing import Any, Iterable, List, NamedTuple, Optional, Sequence, cast +from typing import Any, NamedTuple, Optional, cast ''' None means that requirements weren't defined (different from empty requirements) @@ -30,11 +33,11 @@ Requires = Optional[Sequence[str]] class HPIModule(NamedTuple): name: str - skip_reason: Optional[str] - doc: Optional[str] = None - file: Optional[Path] = None + skip_reason: str | None + doc: str | None = None + file: Path | None = None requires: Requires = None - legacy: Optional[str] = None # contains reason/deprecation warning + legacy: str | None = None # contains reason/deprecation warning def ignored(m: str) -> bool: @@ -55,13 +58,13 @@ def has_stats(src: Path) -> bool: def _has_stats(code: str) -> bool: a: ast.Module = ast.parse(code) for x in a.body: - try: # maybe assign + try: # maybe assign [tg] = cast(Any, x).targets if tg.id == 'stats': return True except: pass - try: # maybe def? + try: # maybe def? name = cast(Any, x).name if name == 'stats': return True @@ -144,7 +147,7 @@ def all_modules() -> Iterable[HPIModule]: def _iter_my_roots() -> Iterable[Path]: import my # doesn't import any code, because of namespace package - paths: List[str] = list(my.__path__) + paths: list[str] = list(my.__path__) if len(paths) == 0: # should probably never happen?, if this code is running, it was imported # because something was added to __path__ to match this name diff --git a/my/core/error.py b/my/core/error.py index ed26dda..b308869 100644 --- a/my/core/error.py +++ b/my/core/error.py @@ -3,19 +3,16 @@ Various error handling helpers See https://beepb00p.xyz/mypy-error-handling.html#kiss for more detail """ +from __future__ import annotations + import traceback +from collections.abc import Iterable, Iterator from datetime import datetime from itertools import tee from typing import ( Any, Callable, - Iterable, - Iterator, - List, Literal, - Optional, - Tuple, - Type, TypeVar, Union, cast, @@ -33,7 +30,7 @@ Res = ResT[T, Exception] ErrorPolicy = Literal["yield", "raise", "drop"] -def notnone(x: Optional[T]) -> T: +def notnone(x: T | None) -> T: assert x is not None return x @@ -60,13 +57,15 @@ def raise_exceptions(itr: Iterable[Res[T]]) -> Iterator[T]: yield o -def warn_exceptions(itr: Iterable[Res[T]], warn_func: Optional[Callable[[Exception], None]] = None) -> Iterator[T]: +def warn_exceptions(itr: Iterable[Res[T]], warn_func: Callable[[Exception], None] | None = None) -> Iterator[T]: # if not provided, use the 'warnings' module if warn_func is None: from my.core.warnings import medium + def _warn_func(e: Exception) -> None: # TODO: print traceback? but user could always --raise-exceptions as well medium(str(e)) + warn_func = _warn_func for o in itr: @@ -81,7 +80,7 @@ def echain(ex: E, cause: Exception) -> E: return ex -def split_errors(l: Iterable[ResT[T, E]], ET: Type[E]) -> Tuple[Iterable[T], Iterable[E]]: +def split_errors(l: Iterable[ResT[T, E]], ET: type[E]) -> tuple[Iterable[T], Iterable[E]]: # TODO would be nice to have ET=Exception default? but it causes some mypy complaints? vit, eit = tee(l) # TODO ugh, not sure if I can reconcile type checking and runtime and convince mypy that ET and E are the same type? @@ -99,7 +98,9 @@ def split_errors(l: Iterable[ResT[T, E]], ET: Type[E]) -> Tuple[Iterable[T], Ite K = TypeVar('K') -def sort_res_by(items: Iterable[Res[T]], key: Callable[[Any], K]) -> List[Res[T]]: + + +def sort_res_by(items: Iterable[Res[T]], key: Callable[[Any], K]) -> list[Res[T]]: """ Sort a sequence potentially interleaved with errors/entries on which the key can't be computed. The general idea is: the error sticks to the non-error entry that follows it @@ -107,7 +108,7 @@ def sort_res_by(items: Iterable[Res[T]], key: Callable[[Any], K]) -> List[Res[T] group = [] groups = [] for i in items: - k: Optional[K] + k: K | None try: k = key(i) except Exception: # error white computing key? dunno, might be nice to handle... @@ -117,10 +118,10 @@ def sort_res_by(items: Iterable[Res[T]], key: Callable[[Any], K]) -> List[Res[T] groups.append((k, group)) group = [] - results: List[Res[T]] = [] - for _v, grp in sorted(groups, key=lambda p: p[0]): # type: ignore[return-value, arg-type] # TODO SupportsLessThan?? + results: list[Res[T]] = [] + for _v, grp in sorted(groups, key=lambda p: p[0]): # type: ignore[return-value, arg-type] # TODO SupportsLessThan?? results.extend(grp) - results.extend(group) # handle last group (it will always be errors only) + results.extend(group) # handle last group (it will always be errors only) return results @@ -162,20 +163,20 @@ def test_sort_res_by() -> None: # helpers to associate timestamps with the errors (so something meaningful could be displayed on the plots, for example) # todo document it under 'patterns' somewhere... # todo proper typevar? -def set_error_datetime(e: Exception, dt: Optional[datetime]) -> None: +def set_error_datetime(e: Exception, dt: datetime | None) -> None: if dt is None: return e.args = (*e.args, dt) # todo not sure if should return new exception? -def attach_dt(e: Exception, *, dt: Optional[datetime]) -> Exception: +def attach_dt(e: Exception, *, dt: datetime | None) -> Exception: set_error_datetime(e, dt) return e # todo it might be problematic because might mess with timezones (when it's converted to string, it's converted to a shift) -def extract_error_datetime(e: Exception) -> Optional[datetime]: +def extract_error_datetime(e: Exception) -> datetime | None: import re for x in reversed(e.args): @@ -201,10 +202,10 @@ MODULE_SETUP_URL = 'https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#p def warn_my_config_import_error( - err: Union[ImportError, AttributeError], + err: ImportError | AttributeError, *, - help_url: Optional[str] = None, - module_name: Optional[str] = None, + help_url: str | None = None, + module_name: str | None = None, ) -> bool: """ If the user tried to import something from my.config but it failed, @@ -265,7 +266,7 @@ def test_datetime_errors() -> None: import pytz # noqa: I001 dt_notz = datetime.now() - dt_tz = datetime.now(tz=pytz.timezone('Europe/Amsterdam')) + dt_tz = datetime.now(tz=pytz.timezone('Europe/Amsterdam')) for dt in [dt_tz, dt_notz]: e1 = RuntimeError('whatever') assert extract_error_datetime(e1) is None diff --git a/my/core/experimental.py b/my/core/experimental.py index 1a78272..0a1c3b4 100644 --- a/my/core/experimental.py +++ b/my/core/experimental.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import sys import types -from typing import Any, Dict, Optional +from typing import Any # The idea behind this one is to support accessing "overlaid/shadowed" modules from namespace packages @@ -20,7 +22,7 @@ def import_original_module( file: str, *, star: bool = False, - globals: Optional[Dict[str, Any]] = None, + globals: dict[str, Any] | None = None, ) -> types.ModuleType: module_to_restore = sys.modules[module_name] diff --git a/my/core/freezer.py b/my/core/freezer.py index 93bceb7..4fb0e25 100644 --- a/my/core/freezer.py +++ b/my/core/freezer.py @@ -1,29 +1,29 @@ -from .internal import assert_subpackage; assert_subpackage(__name__) +from __future__ import annotations -import dataclasses as dcl +from .internal import assert_subpackage + +assert_subpackage(__name__) + +import dataclasses import inspect -from typing import Any, Type, TypeVar +from typing import Any, Generic, TypeVar D = TypeVar('D') -def _freeze_dataclass(Orig: Type[D]): - ofields = [(f.name, f.type, f) for f in dcl.fields(Orig)] # type: ignore[arg-type] # see https://github.com/python/typing_extensions/issues/115 +def _freeze_dataclass(Orig: type[D]): + ofields = [(f.name, f.type, f) for f in dataclasses.fields(Orig)] # type: ignore[arg-type] # see https://github.com/python/typing_extensions/issues/115 # extract properties along with their types - props = list(inspect.getmembers(Orig, lambda o: isinstance(o, property))) + props = list(inspect.getmembers(Orig, lambda o: isinstance(o, property))) pfields = [(name, inspect.signature(getattr(prop, 'fget')).return_annotation) for name, prop in props] # FIXME not sure about name? # NOTE: sadly passing bases=[Orig] won't work, python won't let us override properties with fields - RRR = dcl.make_dataclass('RRR', fields=[*ofields, *pfields]) + RRR = dataclasses.make_dataclass('RRR', fields=[*ofields, *pfields]) # todo maybe even declare as slots? return props, RRR -# todo need some decorator thingie? -from typing import Generic - - class Freezer(Generic[D]): ''' Some magic which converts dataclass properties into fields. @@ -31,13 +31,13 @@ class Freezer(Generic[D]): For now only supports dataclasses. ''' - def __init__(self, Orig: Type[D]) -> None: + def __init__(self, Orig: type[D]) -> None: self.Orig = Orig self.props, self.Frozen = _freeze_dataclass(Orig) def freeze(self, value: D) -> D: pvalues = {name: getattr(value, name) for name, _ in self.props} - return self.Frozen(**dcl.asdict(value), **pvalues) # type: ignore[call-overload] # see https://github.com/python/typing_extensions/issues/115 + return self.Frozen(**dataclasses.asdict(value), **pvalues) # type: ignore[call-overload] # see https://github.com/python/typing_extensions/issues/115 ### tests @@ -45,7 +45,7 @@ class Freezer(Generic[D]): # this needs to be defined here to prevent a mypy bug # see https://github.com/python/mypy/issues/7281 -@dcl.dataclass +@dataclasses.dataclass class _A: x: Any @@ -71,6 +71,7 @@ def test_freezer() -> None: assert fd['typed'] == 123 assert fd['untyped'] == [1, 2, 3] + ### # TODO shit. what to do with exceptions? diff --git a/my/core/hpi_compat.py b/my/core/hpi_compat.py index 949046d..3687483 100644 --- a/my/core/hpi_compat.py +++ b/my/core/hpi_compat.py @@ -3,11 +3,14 @@ Contains various backwards compatibility/deprecation helpers relevant to HPI its (as opposed to .compat module which implements compatibility between python versions) """ +from __future__ import annotations + import inspect import os import re +from collections.abc import Iterator, Sequence from types import ModuleType -from typing import Iterator, List, Optional, Sequence, TypeVar +from typing import TypeVar from . import warnings @@ -15,7 +18,7 @@ from . import warnings def handle_legacy_import( parent_module_name: str, legacy_submodule_name: str, - parent_module_path: List[str], + parent_module_path: list[str], ) -> bool: ### # this is to trick mypy into treating this as a proper namespace package @@ -122,8 +125,8 @@ class always_supports_sequence(Iterator[V]): def __init__(self, it: Iterator[V]) -> None: self._it = it - self._list: Optional[List[V]] = None - self._lit: Optional[Iterator[V]] = None + self._list: list[V] | None = None + self._lit: Iterator[V] | None = None def __iter__(self) -> Iterator[V]: # noqa: PYI034 if self._list is not None: @@ -142,7 +145,7 @@ class always_supports_sequence(Iterator[V]): return getattr(self._it, name) @property - def _aslist(self) -> List[V]: + def _aslist(self) -> list[V]: if self._list is None: qualname = getattr(self._it, '__qualname__', '') # defensive just in case warnings.medium(f'Using {qualname} as list is deprecated. Migrate to iterative processing or call list() explicitly.') diff --git a/my/core/influxdb.py b/my/core/influxdb.py index 25eeba1..78a439a 100644 --- a/my/core/influxdb.py +++ b/my/core/influxdb.py @@ -2,9 +2,14 @@ TODO doesn't really belong to 'core' morally, but can think of moving out later ''' -from .internal import assert_subpackage; assert_subpackage(__name__) +from __future__ import annotations -from typing import Any, Dict, Iterable, Optional +from .internal import assert_subpackage + +assert_subpackage(__name__) + +from collections.abc import Iterable +from typing import Any import click @@ -21,7 +26,7 @@ class config: RESET_DEFAULT = False -def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_col: str='dt') -> None: +def fill(it: Iterable[Any], *, measurement: str, reset: bool = RESET_DEFAULT, dt_col: str = 'dt') -> None: # todo infer dt column automatically, reuse in stat? # it doesn't like dots, ends up some syntax error? measurement = measurement.replace('.', '_') @@ -30,6 +35,7 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_c db = config.db from influxdb import InfluxDBClient # type: ignore + client = InfluxDBClient() # todo maybe create if not exists? # client.create_database(db) @@ -40,7 +46,7 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_c client.delete_series(database=db, measurement=measurement) # TODO need to take schema here... - cache: Dict[str, bool] = {} + cache: dict[str, bool] = {} def good(f, v) -> bool: c = cache.get(f) @@ -59,9 +65,9 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_c def dit() -> Iterable[Json]: for i in it: d = asdict(i) - tags: Optional[Json] = None - tags_ = d.get('tags') # meh... handle in a more robust manner - if tags_ is not None and isinstance(tags_, dict): # FIXME meh. + tags: Json | None = None + tags_ = d.get('tags') # meh... handle in a more robust manner + if tags_ is not None and isinstance(tags_, dict): # FIXME meh. del d['tags'] tags = tags_ @@ -84,6 +90,7 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_c } from more_itertools import chunked + # "The optimal batch size is 5000 lines of line protocol." # some chunking is def necessary, otherwise it fails inserted = 0 @@ -97,9 +104,9 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_c # todo "Specify timestamp precision when writing to InfluxDB."? -def magic_fill(it, *, name: Optional[str]=None, reset: bool=RESET_DEFAULT) -> None: +def magic_fill(it, *, name: str | None = None, reset: bool = RESET_DEFAULT) -> None: if name is None: - assert callable(it) # generators have no name/module + assert callable(it) # generators have no name/module name = f'{it.__module__}:{it.__name__}' assert name is not None @@ -109,6 +116,7 @@ def magic_fill(it, *, name: Optional[str]=None, reset: bool=RESET_DEFAULT) -> No from itertools import tee from more_itertools import first, one + it, x = tee(it) f = first(x, default=None) if f is None: @@ -118,9 +126,11 @@ def magic_fill(it, *, name: Optional[str]=None, reset: bool=RESET_DEFAULT) -> No # TODO can we reuse pandas code or something? # from .pandas import _as_columns + schema = _as_columns(type(f)) from datetime import datetime + dtex = RuntimeError(f'expected single datetime field. schema: {schema}') dtf = one((f for f, t in schema.items() if t == datetime), too_short=dtex, too_long=dtex) @@ -137,6 +147,7 @@ def main() -> None: @click.argument('FUNCTION_NAME', type=str, required=True) def populate(*, function_name: str, reset: bool) -> None: from .__main__ import _locate_functions_or_prompt + [provider] = list(_locate_functions_or_prompt([function_name])) # todo could have a non-interactive version which populates from all data sources for the provider? magic_fill(provider, reset=reset) diff --git a/my/core/init.py b/my/core/init.py index 7a30955..644c7b4 100644 --- a/my/core/init.py +++ b/my/core/init.py @@ -19,6 +19,7 @@ def setup_config() -> None: from pathlib import Path from .preinit import get_mycfg_dir + mycfg_dir = get_mycfg_dir() if not mycfg_dir.exists(): @@ -43,6 +44,7 @@ See https://github.com/karlicoss/HPI/blob/master/doc/SETUP.org#setting-up-the-mo except ImportError as ex: # just in case... who knows what crazy setup users have import logging + logging.exception(ex) warnings.warn(f""" Importing 'my.config' failed! (error: {ex}). This is likely to result in issues. diff --git a/my/core/kompress.py b/my/core/kompress.py index 7cbf310..8accb2d 100644 --- a/my/core/kompress.py +++ b/my/core/kompress.py @@ -1,4 +1,6 @@ -from .internal import assert_subpackage; assert_subpackage(__name__) +from .internal import assert_subpackage + +assert_subpackage(__name__) from . import warnings diff --git a/my/core/konsume.py b/my/core/konsume.py index 0e4a2fe..6d24167 100644 --- a/my/core/konsume.py +++ b/my/core/konsume.py @@ -5,17 +5,21 @@ This can potentially allow both for safer defensive parsing, and let you know if TODO perhaps need to get some inspiration from linear logic to decide on a nice API... ''' +from __future__ import annotations + from collections import OrderedDict -from typing import Any, List +from typing import Any def ignore(w, *keys): for k in keys: w[k].ignore() + def zoom(w, *keys): return [w[k].zoom() for k in keys] + # TODO need to support lists class Zoomable: def __init__(self, parent, *args, **kwargs) -> None: @@ -40,7 +44,7 @@ class Zoomable: assert self.parent is not None self.parent._remove(self) - def zoom(self) -> 'Zoomable': + def zoom(self) -> Zoomable: self.consume() return self @@ -63,6 +67,7 @@ class Wdict(Zoomable, OrderedDict): def this_consumed(self): return len(self) == 0 + # TODO specify mypy type for the index special method? @@ -77,6 +82,7 @@ class Wlist(Zoomable, list): def this_consumed(self): return len(self) == 0 + class Wvalue(Zoomable): def __init__(self, parent, value: Any) -> None: super().__init__(parent) @@ -87,23 +93,20 @@ class Wvalue(Zoomable): return [] def this_consumed(self): - return True # TODO not sure.. + return True # TODO not sure.. def __repr__(self): return 'WValue{' + repr(self.value) + '}' -from typing import Tuple - - -def _wrap(j, parent=None) -> Tuple[Zoomable, List[Zoomable]]: +def _wrap(j, parent=None) -> tuple[Zoomable, list[Zoomable]]: res: Zoomable - cc: List[Zoomable] + cc: list[Zoomable] if isinstance(j, dict): res = Wdict(parent) cc = [res] for k, v in j.items(): - vv, c = _wrap(v, parent=res) + vv, c = _wrap(v, parent=res) res[k] = vv cc.extend(c) return res, cc @@ -122,13 +125,14 @@ def _wrap(j, parent=None) -> Tuple[Zoomable, List[Zoomable]]: raise RuntimeError(f'Unexpected type: {type(j)} {j}') +from collections.abc import Iterator from contextlib import contextmanager -from typing import Iterator class UnconsumedError(Exception): pass + # TODO think about error policy later... @contextmanager def wrap(j, *, throw=True) -> Iterator[Zoomable]: @@ -137,7 +141,7 @@ def wrap(j, *, throw=True) -> Iterator[Zoomable]: yield w for c in children: - if not c.this_consumed(): # TODO hmm. how does it figure out if it's consumed??? + if not c.this_consumed(): # TODO hmm. how does it figure out if it's consumed??? if throw: # TODO need to keep a full path or something... raise UnconsumedError(f''' @@ -153,6 +157,7 @@ from typing import cast def test_unconsumed() -> None: import pytest + with pytest.raises(UnconsumedError): with wrap({'a': 1234}) as w: w = cast(Wdict, w) @@ -163,6 +168,7 @@ def test_unconsumed() -> None: w = cast(Wdict, w) d = w['c']['d'].zoom() + def test_consumed() -> None: with wrap({'a': 1234}) as w: w = cast(Wdict, w) @@ -173,6 +179,7 @@ def test_consumed() -> None: c = w['c'].zoom() d = c['d'].zoom() + def test_types() -> None: # (string, number, object, array, boolean or nul with wrap({'string': 'string', 'number': 3.14, 'boolean': True, 'null': None, 'list': [1, 2, 3]}) as w: @@ -181,9 +188,10 @@ def test_types() -> None: w['number'].consume() w['boolean'].zoom() w['null'].zoom() - for x in list(w['list'].zoom()): # TODO eh. how to avoid the extra list thing? + for x in list(w['list'].zoom()): # TODO eh. how to avoid the extra list thing? x.consume() + def test_consume_all() -> None: with wrap({'aaa': {'bbb': {'hi': 123}}}) as w: w = cast(Wdict, w) @@ -193,11 +201,9 @@ def test_consume_all() -> None: def test_consume_few() -> None: import pytest + pytest.skip('Will think about it later..') - with wrap({ - 'important': 123, - 'unimportant': 'whatever' - }) as w: + with wrap({'important': 123, 'unimportant': 'whatever'}) as w: w = cast(Wdict, w) w['important'].zoom() w.consume_all() @@ -206,6 +212,7 @@ def test_consume_few() -> None: def test_zoom() -> None: import pytest + with wrap({'aaa': 'whatever'}) as w: w = cast(Wdict, w) with pytest.raises(KeyError): diff --git a/my/core/mime.py b/my/core/mime.py index cf5bdf5..8235960 100644 --- a/my/core/mime.py +++ b/my/core/mime.py @@ -2,11 +2,14 @@ Utils for mime/filetype handling """ -from .internal import assert_subpackage; assert_subpackage(__name__) +from __future__ import annotations + +from .internal import assert_subpackage + +assert_subpackage(__name__) import functools - -from .common import PathIsh +from pathlib import Path @functools.lru_cache(1) @@ -23,7 +26,7 @@ import mimetypes # todo do I need init()? # todo wtf? fastermime thinks it's mime is application/json even if the extension is xz?? # whereas magic detects correctly: application/x-zstd and application/x-xz -def fastermime(path: PathIsh) -> str: +def fastermime(path: Path | str) -> str: paths = str(path) # mimetypes is faster, so try it first (mime, _) = mimetypes.guess_type(paths) diff --git a/my/core/orgmode.py b/my/core/orgmode.py index 979f288..96c09a4 100644 --- a/my/core/orgmode.py +++ b/my/core/orgmode.py @@ -1,6 +1,7 @@ """ Various helpers for reading org-mode data """ + from datetime import datetime @@ -22,17 +23,20 @@ def parse_org_datetime(s: str) -> datetime: # TODO I guess want to borrow inspiration from bs4? element type <-> tag; and similar logic for find_one, find_all -from typing import Callable, Iterable, TypeVar +from collections.abc import Iterable +from typing import Callable, TypeVar from orgparse import OrgNode V = TypeVar('V') + def collect(n: OrgNode, cfun: Callable[[OrgNode], Iterable[V]]) -> Iterable[V]: yield from cfun(n) for c in n.children: yield from collect(c, cfun) + from more_itertools import one from orgparse.extra import Table @@ -46,7 +50,7 @@ class TypedTable(Table): tt = super().__new__(TypedTable) tt.__dict__ = orig.__dict__ blocks = list(orig.blocks) - header = blocks[0] # fist block is schema + header = blocks[0] # fist block is schema if len(header) == 2: # TODO later interpret first line as types header = header[1:] diff --git a/my/core/pandas.py b/my/core/pandas.py index 8f5fd29..d444965 100644 --- a/my/core/pandas.py +++ b/my/core/pandas.py @@ -7,17 +7,14 @@ from __future__ import annotations # todo not sure if belongs to 'core'. It's certainly 'more' core than actual modules, but still not essential # NOTE: this file is meant to be importable without Pandas installed import dataclasses +from collections.abc import Iterable, Iterator from datetime import datetime, timezone from pprint import pformat from typing import ( TYPE_CHECKING, Any, Callable, - Dict, - Iterable, - Iterator, Literal, - Type, TypeVar, ) @@ -178,7 +175,7 @@ def _to_jsons(it: Iterable[Res[Any]]) -> Iterable[Json]: Schema = Any -def _as_columns(s: Schema) -> Dict[str, Type]: +def _as_columns(s: Schema) -> dict[str, type]: # todo would be nice to extract properties; add tests for this as well if dataclasses.is_dataclass(s): return {f.name: f.type for f in dataclasses.fields(s)} # type: ignore[misc] # ugh, why mypy thinks f.type can return str?? diff --git a/my/core/preinit.py b/my/core/preinit.py index be5477b..eb3a34f 100644 --- a/my/core/preinit.py +++ b/my/core/preinit.py @@ -8,6 +8,7 @@ def get_mycfg_dir() -> Path: import os import appdirs # type: ignore[import-untyped] + # not sure if that's necessary, i.e. could rely on PYTHONPATH instead # on the other hand, by using MY_CONFIG we are guaranteed to load it from the desired path? mvar = os.environ.get('MY_CONFIG') diff --git a/my/core/pytest.py b/my/core/pytest.py index e514957..ad9e7d7 100644 --- a/my/core/pytest.py +++ b/my/core/pytest.py @@ -2,7 +2,9 @@ Helpers to prevent depending on pytest in runtime """ -from .internal import assert_subpackage; assert_subpackage(__name__) +from .internal import assert_subpackage + +assert_subpackage(__name__) import sys import typing diff --git a/my/core/query.py b/my/core/query.py index 45806fb..50724a7 100644 --- a/my/core/query.py +++ b/my/core/query.py @@ -5,23 +5,20 @@ The main entrypoint to this library is the 'select' function below; try: python3 -c "from my.core.query import select; help(select)" """ +from __future__ import annotations + import dataclasses import importlib import inspect import itertools +from collections.abc import Iterable, Iterator from datetime import datetime from typing import ( Any, Callable, - Dict, - Iterable, - Iterator, - List, NamedTuple, Optional, - Tuple, TypeVar, - Union, ) import more_itertools @@ -51,6 +48,7 @@ class Unsortable(NamedTuple): class QueryException(ValueError): """Used to differentiate query-related errors, so the CLI interface is more expressive""" + pass @@ -63,7 +61,7 @@ def locate_function(module_name: str, function_name: str) -> Callable[[], Iterab """ try: mod = importlib.import_module(module_name) - for (fname, f) in inspect.getmembers(mod, inspect.isfunction): + for fname, f in inspect.getmembers(mod, inspect.isfunction): if fname == function_name: return f # in case the function is defined dynamically, @@ -83,10 +81,10 @@ def locate_qualified_function(qualified_name: str) -> Callable[[], Iterable[ET]] if "." not in qualified_name: raise QueryException("Could not find a '.' in the function name, e.g. my.reddit.rexport.comments") rdot_index = qualified_name.rindex(".") - return locate_function(qualified_name[:rdot_index], qualified_name[rdot_index + 1:]) + return locate_function(qualified_name[:rdot_index], qualified_name[rdot_index + 1 :]) -def attribute_func(obj: T, where: Where, default: Optional[U] = None) -> Optional[OrderFunc]: +def attribute_func(obj: T, where: Where, default: U | None = None) -> OrderFunc | None: """ Attempts to find an attribute which matches the 'where_function' on the object, using some getattr/dict checks. Returns a function which when called with @@ -133,11 +131,11 @@ def attribute_func(obj: T, where: Where, default: Optional[U] = None) -> Optiona def _generate_order_by_func( obj_res: Res[T], *, - key: Optional[str] = None, - where_function: Optional[Where] = None, - default: Optional[U] = None, + key: str | None = None, + where_function: Where | None = None, + default: U | None = None, force_unsortable: bool = False, -) -> Optional[OrderFunc]: +) -> OrderFunc | None: """ Accepts an object Res[T] (Instance of some class or Exception) @@ -202,7 +200,7 @@ pass 'drop_exceptions' to ignore exceptions""") # user must provide either a key or a where predicate if where_function is not None: - func: Optional[OrderFunc] = attribute_func(obj, where_function, default) + func: OrderFunc | None = attribute_func(obj, where_function, default) if func is not None: return func @@ -218,8 +216,6 @@ pass 'drop_exceptions' to ignore exceptions""") return None # couldn't compute a OrderFunc for this class/instance - - # currently using the 'key set' as a proxy for 'this is the same type of thing' def _determine_order_by_value_key(obj_res: ET) -> Any: """ @@ -244,7 +240,7 @@ def _drop_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Iterator[ET]: # try getting the first value from the iterator # similar to my.core.common.warn_if_empty? this doesn't go through the whole iterator though -def _peek_iter(itr: Iterator[ET]) -> Tuple[Optional[ET], Iterator[ET]]: +def _peek_iter(itr: Iterator[ET]) -> tuple[ET | None, Iterator[ET]]: itr = more_itertools.peekable(itr) try: first_item = itr.peek() @@ -255,9 +251,9 @@ def _peek_iter(itr: Iterator[ET]) -> Tuple[Optional[ET], Iterator[ET]]: # similar to 'my.core.error.sort_res_by'? -def _wrap_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Tuple[Iterator[Unsortable], Iterator[ET]]: - unsortable: List[Unsortable] = [] - sortable: List[ET] = [] +def _wrap_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> tuple[Iterator[Unsortable], Iterator[ET]]: + unsortable: list[Unsortable] = [] + sortable: list[ET] = [] for o in itr: # if input to select was another select if isinstance(o, Unsortable): @@ -279,7 +275,7 @@ def _handle_unsorted( orderfunc: OrderFunc, drop_unsorted: bool, wrap_unsorted: bool -) -> Tuple[Iterator[Unsortable], Iterator[ET]]: +) -> tuple[Iterator[Unsortable], Iterator[ET]]: # prefer drop_unsorted to wrap_unsorted, if both were present if drop_unsorted: return iter([]), _drop_unsorted(itr, orderfunc) @@ -294,16 +290,16 @@ def _handle_unsorted( # different types. ***This consumes the iterator***, so # you should definitely itertoolts.tee it beforehand # as to not exhaust the values -def _generate_order_value_func(itr: Iterator[ET], order_value: Where, default: Optional[U] = None) -> OrderFunc: +def _generate_order_value_func(itr: Iterator[ET], order_value: Where, default: U | None = None) -> OrderFunc: # TODO: add a kwarg to force lookup for every item? would sort of be like core.common.guess_datetime then - order_by_lookup: Dict[Any, OrderFunc] = {} + order_by_lookup: dict[Any, OrderFunc] = {} # need to go through a copy of the whole iterator here to # pre-generate functions to support sorting mixed types for obj_res in itr: key: Any = _determine_order_by_value_key(obj_res) if key not in order_by_lookup: - keyfunc: Optional[OrderFunc] = _generate_order_by_func( + keyfunc: OrderFunc | None = _generate_order_by_func( obj_res, where_function=order_value, default=default, @@ -324,12 +320,12 @@ def _generate_order_value_func(itr: Iterator[ET], order_value: Where, default: O def _handle_generate_order_by( itr, *, - order_by: Optional[OrderFunc] = None, - order_key: Optional[str] = None, - order_value: Optional[Where] = None, - default: Optional[U] = None, -) -> Tuple[Optional[OrderFunc], Iterator[ET]]: - order_by_chosen: Optional[OrderFunc] = order_by # if the user just supplied a function themselves + order_by: OrderFunc | None = None, + order_key: str | None = None, + order_value: Where | None = None, + default: U | None = None, +) -> tuple[OrderFunc | None, Iterator[ET]]: + order_by_chosen: OrderFunc | None = order_by # if the user just supplied a function themselves if order_by is not None: return order_by, itr if order_key is not None: @@ -354,19 +350,19 @@ def _handle_generate_order_by( def select( - src: Union[Iterable[ET], Callable[[], Iterable[ET]]], + src: Iterable[ET] | Callable[[], Iterable[ET]], *, - where: Optional[Where] = None, - order_by: Optional[OrderFunc] = None, - order_key: Optional[str] = None, - order_value: Optional[Where] = None, - default: Optional[U] = None, + where: Where | None = None, + order_by: OrderFunc | None = None, + order_key: str | None = None, + order_value: Where | None = None, + default: U | None = None, reverse: bool = False, - limit: Optional[int] = None, + limit: int | None = None, drop_unsorted: bool = False, wrap_unsorted: bool = True, warn_exceptions: bool = False, - warn_func: Optional[Callable[[Exception], None]] = None, + warn_func: Callable[[Exception], None] | None = None, drop_exceptions: bool = False, raise_exceptions: bool = False, ) -> Iterator[ET]: @@ -617,7 +613,7 @@ class _B(NamedTuple): # move these to tests/? They are re-used so much in the tests below, # not sure where the best place for these is -def _mixed_iter() -> Iterator[Union[_A, _B]]: +def _mixed_iter() -> Iterator[_A | _B]: yield _A(x=datetime(year=2009, month=5, day=10, hour=4, minute=10, second=1), y=5, z=10) yield _B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1)) yield _A(x=datetime(year=2005, month=5, day=10, hour=4, minute=10, second=1), y=10, z=2) @@ -626,7 +622,7 @@ def _mixed_iter() -> Iterator[Union[_A, _B]]: yield _A(x=datetime(year=2005, month=4, day=10, hour=4, minute=10, second=1), y=2, z=-5) -def _mixed_iter_errors() -> Iterator[Res[Union[_A, _B]]]: +def _mixed_iter_errors() -> Iterator[Res[_A | _B]]: m = _mixed_iter() yield from itertools.islice(m, 0, 3) yield RuntimeError("Unhandled error!") diff --git a/my/core/query_range.py b/my/core/query_range.py index 1f4a7ff..2a8d7bd 100644 --- a/my/core/query_range.py +++ b/my/core/query_range.py @@ -7,11 +7,14 @@ filtered iterator See the select_range function below """ +from __future__ import annotations + import re import time +from collections.abc import Iterator from datetime import date, datetime, timedelta -from functools import lru_cache -from typing import Any, Callable, Iterator, NamedTuple, Optional, Type +from functools import cache +from typing import Any, Callable, NamedTuple import more_itertools @@ -25,7 +28,9 @@ from .query import ( select, ) -timedelta_regex = re.compile(r"^((?P[\.\d]+?)w)?((?P[\.\d]+?)d)?((?P[\.\d]+?)h)?((?P[\.\d]+?)m)?((?P[\.\d]+?)s)?$") +timedelta_regex = re.compile( + r"^((?P[\.\d]+?)w)?((?P[\.\d]+?)d)?((?P[\.\d]+?)h)?((?P[\.\d]+?)m)?((?P[\.\d]+?)s)?$" +) # https://stackoverflow.com/a/51916936 @@ -88,7 +93,7 @@ def parse_datetime_float(date_str: str) -> float: # dateparser is a bit more lenient than the above, lets you type # all sorts of dates as inputs # https://github.com/scrapinghub/dateparser#how-to-use - res: Optional[datetime] = dateparser.parse(ds, settings={"DATE_ORDER": "YMD"}) + res: datetime | None = dateparser.parse(ds, settings={"DATE_ORDER": "YMD"}) if res is not None: return res.timestamp() @@ -98,7 +103,7 @@ def parse_datetime_float(date_str: str) -> float: # probably DateLike input? but a user could specify an order_key # which is an epoch timestamp or a float value which they # expect to be converted to a datetime to compare -@lru_cache(maxsize=None) +@cache def _datelike_to_float(dl: Any) -> float: if isinstance(dl, datetime): return dl.timestamp() @@ -130,11 +135,12 @@ class RangeTuple(NamedTuple): of the timeframe -- 'before' - before and after - anything after 'after' and before 'before', acts as a time range """ + # technically doesn't need to be Optional[Any], # just to make it more clear these can be None - after: Optional[Any] - before: Optional[Any] - within: Optional[Any] + after: Any | None + before: Any | None + within: Any | None Converter = Callable[[Any], Any] @@ -145,9 +151,9 @@ def _parse_range( unparsed_range: RangeTuple, end_parser: Converter, within_parser: Converter, - parsed_range: Optional[RangeTuple] = None, - error_message: Optional[str] = None -) -> Optional[RangeTuple]: + parsed_range: RangeTuple | None = None, + error_message: str | None = None, +) -> RangeTuple | None: if parsed_range is not None: return parsed_range @@ -176,11 +182,11 @@ def _create_range_filter( end_parser: Converter, within_parser: Converter, attr_func: Where, - parsed_range: Optional[RangeTuple] = None, - default_before: Optional[Any] = None, - value_coercion_func: Optional[Converter] = None, - error_message: Optional[str] = None, -) -> Optional[Where]: + parsed_range: RangeTuple | None = None, + default_before: Any | None = None, + value_coercion_func: Converter | None = None, + error_message: str | None = None, +) -> Where | None: """ Handles: - parsing the user input into values that are comparable to items the iterable returns @@ -272,17 +278,17 @@ def _create_range_filter( def select_range( itr: Iterator[ET], *, - where: Optional[Where] = None, - order_key: Optional[str] = None, - order_value: Optional[Where] = None, - order_by_value_type: Optional[Type] = None, - unparsed_range: Optional[RangeTuple] = None, + where: Where | None = None, + order_key: str | None = None, + order_value: Where | None = None, + order_by_value_type: type | None = None, + unparsed_range: RangeTuple | None = None, reverse: bool = False, - limit: Optional[int] = None, + limit: int | None = None, drop_unsorted: bool = False, wrap_unsorted: bool = False, warn_exceptions: bool = False, - warn_func: Optional[Callable[[Exception], None]] = None, + warn_func: Callable[[Exception], None] | None = None, drop_exceptions: bool = False, raise_exceptions: bool = False, ) -> Iterator[ET]: @@ -317,9 +323,10 @@ def select_range( drop_exceptions=drop_exceptions, raise_exceptions=raise_exceptions, warn_exceptions=warn_exceptions, - warn_func=warn_func) + warn_func=warn_func, + ) - order_by_chosen: Optional[OrderFunc] = None + order_by_chosen: OrderFunc | None = None # if the user didn't specify an attribute to order value, but specified a type # we should search for on each value in the iterator @@ -345,7 +352,7 @@ Specify a type or a key to order the value by""") # force drop_unsorted=True so we can use _create_range_filter # sort the iterable by the generated order_by_chosen function itr = select(itr, order_by=order_by_chosen, drop_unsorted=True) - filter_func: Optional[Where] + filter_func: Where | None if order_by_value_type in [datetime, date]: filter_func = _create_range_filter( unparsed_range=unparsed_range, @@ -353,7 +360,8 @@ Specify a type or a key to order the value by""") within_parser=parse_timedelta_float, attr_func=order_by_chosen, # type: ignore[arg-type] default_before=time.time(), - value_coercion_func=_datelike_to_float) + value_coercion_func=_datelike_to_float, + ) elif order_by_value_type in [int, float]: # allow primitives to be converted using the default int(), float() callables filter_func = _create_range_filter( @@ -362,7 +370,8 @@ Specify a type or a key to order the value by""") within_parser=order_by_value_type, attr_func=order_by_chosen, # type: ignore[arg-type] default_before=None, - value_coercion_func=order_by_value_type) + value_coercion_func=order_by_value_type, + ) else: # TODO: add additional kwargs to let the user sort by other values, by specifying the parsers? # would need to allow passing the end_parser, within parser, default before and value_coercion_func... @@ -470,7 +479,7 @@ def test_range_predicate() -> None: # filter from 0 to 5 rn: RangeTuple = RangeTuple("0", "5", None) - zero_to_five_filter: Optional[Where] = int_filter_func(unparsed_range=rn) + zero_to_five_filter: Where | None = int_filter_func(unparsed_range=rn) assert zero_to_five_filter is not None # this is just a Where function, given some input it return True/False if the value is allowed assert zero_to_five_filter(3) is True @@ -483,6 +492,7 @@ def test_range_predicate() -> None: rn = RangeTuple(None, 3, "3.5") assert list(filter(int_filter_func(unparsed_range=rn, attr_func=identity), src())) == ["0", "1", "2"] + def test_parse_range() -> None: from functools import partial diff --git a/my/core/serialize.py b/my/core/serialize.py index ab11a20..e36da8f 100644 --- a/my/core/serialize.py +++ b/my/core/serialize.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import datetime from dataclasses import asdict, is_dataclass from decimal import Decimal -from functools import lru_cache +from functools import cache from pathlib import Path -from typing import Any, Callable, NamedTuple, Optional +from typing import Any, Callable, NamedTuple from .error import error_to_json from .pytest import parametrize @@ -57,12 +59,12 @@ def _default_encode(obj: Any) -> Any: # could possibly run multiple times/raise warning if you provide different 'default' # functions or change the kwargs? The alternative is to maintain all of this at the module # level, which is just as annoying -@lru_cache(maxsize=None) +@cache def _dumps_factory(**kwargs) -> Callable[[Any], str]: use_default: DefaultEncoder = _default_encode # if the user passed an additional 'default' parameter, # try using that to serialize before before _default_encode - _additional_default: Optional[DefaultEncoder] = kwargs.get("default") + _additional_default: DefaultEncoder | None = kwargs.get("default") if _additional_default is not None and callable(_additional_default): def wrapped_default(obj: Any) -> Any: @@ -78,9 +80,9 @@ def _dumps_factory(**kwargs) -> Callable[[Any], str]: kwargs["default"] = use_default - prefer_factory: Optional[str] = kwargs.pop('_prefer_factory', None) + prefer_factory: str | None = kwargs.pop('_prefer_factory', None) - def orjson_factory() -> Optional[Dumps]: + def orjson_factory() -> Dumps | None: try: import orjson except ModuleNotFoundError: @@ -95,7 +97,7 @@ def _dumps_factory(**kwargs) -> Callable[[Any], str]: return _orjson_dumps - def simplejson_factory() -> Optional[Dumps]: + def simplejson_factory() -> Dumps | None: try: from simplejson import dumps as simplejson_dumps except ModuleNotFoundError: @@ -115,7 +117,7 @@ def _dumps_factory(**kwargs) -> Callable[[Any], str]: return _simplejson_dumps - def stdlib_factory() -> Optional[Dumps]: + def stdlib_factory() -> Dumps | None: import json from .warnings import high @@ -150,7 +152,7 @@ def _dumps_factory(**kwargs) -> Callable[[Any], str]: def dumps( obj: Any, - default: Optional[DefaultEncoder] = None, + default: DefaultEncoder | None = None, **kwargs, ) -> str: """ diff --git a/my/core/source.py b/my/core/source.py index 52c58c1..a309d13 100644 --- a/my/core/source.py +++ b/my/core/source.py @@ -3,9 +3,12 @@ Decorator to gracefully handle importing a data source, or warning and yielding nothing (or a default) when its not available """ +from __future__ import annotations + import warnings +from collections.abc import Iterable, Iterator from functools import wraps -from typing import Any, Callable, Iterable, Iterator, Optional, TypeVar +from typing import Any, Callable, TypeVar from .warnings import medium @@ -26,8 +29,8 @@ _DEFAULT_ITR = () def import_source( *, default: Iterable[T] = _DEFAULT_ITR, - module_name: Optional[str] = None, - help_url: Optional[str] = None, + module_name: str | None = None, + help_url: str | None = None, ) -> Callable[..., Callable[..., Iterator[T]]]: """ doesn't really play well with types, but is used to catch @@ -50,6 +53,7 @@ def import_source( except (ImportError, AttributeError) as err: from . import core_config as CC from .error import warn_my_config_import_error + suppressed_in_conf = False if module_name is not None and CC.config._is_module_active(module_name) is False: suppressed_in_conf = True @@ -72,5 +76,7 @@ class core: if not matched_config_err and isinstance(err, AttributeError): raise err yield from default + return wrapper + return decorator diff --git a/my/core/sqlite.py b/my/core/sqlite.py index 08a80e5..aa41ab3 100644 --- a/my/core/sqlite.py +++ b/my/core/sqlite.py @@ -1,12 +1,16 @@ -from .internal import assert_subpackage; assert_subpackage(__name__) +from __future__ import annotations +from .internal import assert_subpackage # noqa: I001 + +assert_subpackage(__name__) import shutil import sqlite3 +from collections.abc import Iterator from contextlib import contextmanager from pathlib import Path from tempfile import TemporaryDirectory -from typing import Any, Callable, Iterator, Literal, Optional, Tuple, Union, overload +from typing import Any, Callable, Literal, Union, overload from .common import PathIsh from .compat import assert_never @@ -22,6 +26,7 @@ def test_sqlite_connect_immutable(tmp_path: Path) -> None: conn.execute('CREATE TABLE testtable (col)') import pytest + with pytest.raises(sqlite3.OperationalError, match='readonly database'): with sqlite_connect_immutable(db) as conn: conn.execute('DROP TABLE testtable') @@ -33,6 +38,7 @@ def test_sqlite_connect_immutable(tmp_path: Path) -> None: SqliteRowFactory = Callable[[sqlite3.Cursor, sqlite3.Row], Any] + def dict_factory(cursor, row): fields = [column[0] for column in cursor.description] return dict(zip(fields, row)) @@ -40,8 +46,9 @@ def dict_factory(cursor, row): Factory = Union[SqliteRowFactory, Literal['row', 'dict']] + @contextmanager -def sqlite_connection(db: PathIsh, *, immutable: bool=False, row_factory: Optional[Factory]=None) -> Iterator[sqlite3.Connection]: +def sqlite_connection(db: PathIsh, *, immutable: bool = False, row_factory: Factory | None = None) -> Iterator[sqlite3.Connection]: dbp = f'file:{db}' # https://www.sqlite.org/draft/uri.html#uriimmutable if immutable: @@ -97,30 +104,32 @@ def sqlite_copy_and_open(db: PathIsh) -> sqlite3.Connection: # and then the return type ends up as Iterator[Tuple[str, ...]], which isn't desirable :( # a bit annoying to have this copy-pasting, but hopefully not a big issue +# fmt: off @overload -def select(cols: Tuple[str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any ]]: ... +def select(cols: tuple[str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any ]]: ... @overload -def select(cols: Tuple[str, str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any ]]: ... +def select(cols: tuple[str, str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any ]]: ... @overload -def select(cols: Tuple[str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any, Any ]]: ... +def select(cols: tuple[str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any, Any ]]: ... @overload -def select(cols: Tuple[str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any, Any, Any ]]: ... +def select(cols: tuple[str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any, Any, Any ]]: ... @overload -def select(cols: Tuple[str, str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any, Any, Any, Any ]]: ... +def select(cols: tuple[str, str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any, Any, Any, Any ]]: ... @overload -def select(cols: Tuple[str, str, str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any, Any, Any, Any, Any ]]: ... +def select(cols: tuple[str, str, str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any, Any, Any, Any, Any ]]: ... @overload -def select(cols: Tuple[str, str, str, str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any, Any, Any, Any, Any, Any ]]: ... +def select(cols: tuple[str, str, str, str, str, str, str ], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any, Any, Any, Any, Any, Any ]]: ... @overload -def select(cols: Tuple[str, str, str, str, str, str, str, str], rest: str, *, db: sqlite3.Connection) -> \ - Iterator[Tuple[Any, Any, Any, Any, Any, Any, Any, Any]]: ... +def select(cols: tuple[str, str, str, str, str, str, str, str], rest: str, *, db: sqlite3.Connection) -> \ + Iterator[tuple[Any, Any, Any, Any, Any, Any, Any, Any]]: ... +# fmt: on def select(cols, rest, *, db): # db arg is last cause that results in nicer code formatting.. diff --git a/my/core/stats.py b/my/core/stats.py index 674a8d1..a553db3 100644 --- a/my/core/stats.py +++ b/my/core/stats.py @@ -2,10 +2,13 @@ Helpers for hpi doctor/stats functionality. ''' +from __future__ import annotations + import collections.abc import importlib import inspect import typing +from collections.abc import Iterable, Iterator, Sequence from contextlib import contextmanager from datetime import datetime from pathlib import Path @@ -13,20 +16,13 @@ from types import ModuleType from typing import ( Any, Callable, - Dict, - Iterable, - Iterator, - List, - Optional, Protocol, - Sequence, - Union, cast, ) from .types import asdict -Stats = Dict[str, Any] +Stats = dict[str, Any] class StatsFun(Protocol): @@ -55,10 +51,10 @@ def quick_stats(): def stat( - func: Union[Callable[[], Iterable[Any]], Iterable[Any]], + func: Callable[[], Iterable[Any]] | Iterable[Any], *, quick: bool = False, - name: Optional[str] = None, + name: str | None = None, ) -> Stats: """ Extracts various statistics from a passed iterable/callable, e.g.: @@ -153,8 +149,8 @@ def test_stat() -> None: # -def get_stats(module_name: str, *, guess: bool = False) -> Optional[StatsFun]: - stats: Optional[StatsFun] = None +def get_stats(module_name: str, *, guess: bool = False) -> StatsFun | None: + stats: StatsFun | None = None try: module = importlib.import_module(module_name) except Exception: @@ -167,7 +163,7 @@ def get_stats(module_name: str, *, guess: bool = False) -> Optional[StatsFun]: # TODO maybe could be enough to annotate OUTPUTS or something like that? # then stats could just use them as hints? -def guess_stats(module: ModuleType) -> Optional[StatsFun]: +def guess_stats(module: ModuleType) -> StatsFun | None: """ If the module doesn't have explicitly defined 'stat' function, this is used to try to guess what could be included in stats automatically @@ -206,7 +202,7 @@ def test_guess_stats() -> None: } -def _guess_data_providers(module: ModuleType) -> Dict[str, Callable]: +def _guess_data_providers(module: ModuleType) -> dict[str, Callable]: mfunctions = inspect.getmembers(module, inspect.isfunction) return {k: v for k, v in mfunctions if is_data_provider(v)} @@ -263,7 +259,7 @@ def test_is_data_provider() -> None: lam = lambda: [1, 2] assert not idp(lam) - def has_extra_args(count) -> List[int]: + def has_extra_args(count) -> list[int]: return list(range(count)) assert not idp(has_extra_args) @@ -340,10 +336,10 @@ def test_type_is_iterable() -> None: assert not fun(None) assert not fun(int) assert not fun(Any) - assert not fun(Dict[int, int]) + assert not fun(dict[int, int]) - assert fun(List[int]) - assert fun(Sequence[Dict[str, str]]) + assert fun(list[int]) + assert fun(Sequence[dict[str, str]]) assert fun(Iterable[Any]) @@ -434,7 +430,7 @@ def test_stat_iterable() -> None: # experimental, not sure about it.. -def _guess_datetime(x: Any) -> Optional[datetime]: +def _guess_datetime(x: Any) -> datetime | None: # todo hmm implement without exception.. try: d = asdict(x) diff --git a/my/core/structure.py b/my/core/structure.py index fa26532..bb049e4 100644 --- a/my/core/structure.py +++ b/my/core/structure.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import atexit import os import shutil @@ -5,9 +7,9 @@ import sys import tarfile import tempfile import zipfile +from collections.abc import Generator, Sequence from contextlib import contextmanager from pathlib import Path -from typing import Generator, List, Sequence, Tuple, Union from .logging import make_logger @@ -42,10 +44,10 @@ TARGZ_EXT = {".tar.gz"} @contextmanager def match_structure( base: Path, - expected: Union[str, Sequence[str]], + expected: str | Sequence[str], *, partial: bool = False, -) -> Generator[Tuple[Path, ...], None, None]: +) -> Generator[tuple[Path, ...], None, None]: """ Given a 'base' directory or archive (zip/tar.gz), recursively search for one or more paths that match the pattern described in 'expected'. That can be a single string, or a list @@ -140,8 +142,8 @@ def match_structure( if not searchdir.is_dir(): raise NotADirectoryError(f"Expected either a zip/tar.gz archive or a directory, received {searchdir}") - matches: List[Path] = [] - possible_targets: List[Path] = [searchdir] + matches: list[Path] = [] + possible_targets: list[Path] = [searchdir] while len(possible_targets) > 0: p = possible_targets.pop(0) @@ -172,7 +174,7 @@ def warn_leftover_files() -> None: from . import core_config as CC base_tmp: Path = CC.config.get_tmp_dir() - leftover: List[Path] = list(base_tmp.iterdir()) + leftover: list[Path] = list(base_tmp.iterdir()) if leftover: logger.debug(f"at exit warning: Found leftover files in temporary directory '{leftover}'. this may be because you have multiple hpi processes running -- if so this can be ignored") diff --git a/my/core/tests/auto_stats.py b/my/core/tests/auto_stats.py index d10d4c4..fc49e03 100644 --- a/my/core/tests/auto_stats.py +++ b/my/core/tests/auto_stats.py @@ -2,11 +2,11 @@ Helper 'module' for test_guess_stats """ +from collections.abc import Iterable, Iterator, Sequence from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path -from typing import Iterable, Iterator, Sequence @dataclass diff --git a/my/core/tests/common.py b/my/core/tests/common.py index 22a74d7..073ea5f 100644 --- a/my/core/tests/common.py +++ b/my/core/tests/common.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import os +from collections.abc import Iterator from contextlib import contextmanager -from typing import Iterator, Optional import pytest @@ -15,7 +17,7 @@ skip_if_uses_optional_deps = pytest.mark.skipif( # TODO maybe move to hpi core? @contextmanager -def tmp_environ_set(key: str, value: Optional[str]) -> Iterator[None]: +def tmp_environ_set(key: str, value: str | None) -> Iterator[None]: prev_value = os.environ.get(key) if value is None: os.environ.pop(key, None) diff --git a/my/core/tests/denylist.py b/my/core/tests/denylist.py index 2688319..73c3165 100644 --- a/my/core/tests/denylist.py +++ b/my/core/tests/denylist.py @@ -1,8 +1,9 @@ import json import warnings +from collections.abc import Iterator from datetime import datetime from pathlib import Path -from typing import Iterator, NamedTuple +from typing import NamedTuple from ..denylist import DenyList diff --git a/my/core/tests/test_cachew.py b/my/core/tests/test_cachew.py index 70ac76f..a0d2267 100644 --- a/my/core/tests/test_cachew.py +++ b/my/core/tests/test_cachew.py @@ -1,6 +1,6 @@ -from .common import skip_if_uses_optional_deps as pytestmark +from __future__ import annotations -from typing import List +from .common import skip_if_uses_optional_deps as pytestmark # TODO ugh, this is very messy.. need to sort out config overriding here @@ -16,7 +16,7 @@ def test_cachew() -> None: # TODO ugh. need doublewrap or something to avoid having to pass parens @mcachew() - def cf() -> List[int]: + def cf() -> list[int]: nonlocal called called += 1 return [1, 2, 3] @@ -43,7 +43,7 @@ def test_cachew_dir_none() -> None: called = 0 @mcachew(cache_path=cache_dir() / 'ctest') - def cf() -> List[int]: + def cf() -> list[int]: nonlocal called called += 1 return [called, called, called] diff --git a/my/core/tests/test_config.py b/my/core/tests/test_config.py index 78d1a62..f6d12ba 100644 --- a/my/core/tests/test_config.py +++ b/my/core/tests/test_config.py @@ -2,8 +2,8 @@ Various tests that are checking behaviour of user config wrt to various things """ -import sys import os +import sys from pathlib import Path import pytest diff --git a/my/core/time.py b/my/core/time.py index fa20a7c..a9b180d 100644 --- a/my/core/time.py +++ b/my/core/time.py @@ -1,5 +1,7 @@ -from functools import lru_cache -from typing import Dict, Sequence +from __future__ import annotations + +from collections.abc import Sequence +from functools import cache, lru_cache import pytz @@ -11,6 +13,7 @@ def user_forced() -> Sequence[str]: # https://stackoverflow.com/questions/36067621/python-all-possible-timezone-abbreviations-for-given-timezone-name-and-vise-ve try: from my.config import time as user_config + return user_config.tz.force_abbreviations # type: ignore[attr-defined] # noqa: TRY300 # note: noqa since we're catching case where config doesn't have attribute here as well except: @@ -19,15 +22,15 @@ def user_forced() -> Sequence[str]: @lru_cache(1) -def _abbr_to_timezone_map() -> Dict[str, pytz.BaseTzInfo]: +def _abbr_to_timezone_map() -> dict[str, pytz.BaseTzInfo]: # also force UTC to always correspond to utc # this makes more sense than Zulu it ends up by default timezones = [*pytz.all_timezones, 'UTC', *user_forced()] - res: Dict[str, pytz.BaseTzInfo] = {} + res: dict[str, pytz.BaseTzInfo] = {} for tzname in timezones: tz = pytz.timezone(tzname) - infos = getattr(tz, '_tzinfos', []) # not sure if can rely on attr always present? + infos = getattr(tz, '_tzinfos', []) # not sure if can rely on attr always present? for info in infos: abbr = info[-1] # todo could support this with a better error handling strategy? @@ -43,7 +46,7 @@ def _abbr_to_timezone_map() -> Dict[str, pytz.BaseTzInfo]: return res -@lru_cache(maxsize=None) +@cache def abbr_to_timezone(abbr: str) -> pytz.BaseTzInfo: return _abbr_to_timezone_map()[abbr] diff --git a/my/core/types.py b/my/core/types.py index b1cf103..dc19c19 100644 --- a/my/core/types.py +++ b/my/core/types.py @@ -1,14 +1,15 @@ -from .internal import assert_subpackage; assert_subpackage(__name__) +from __future__ import annotations + +from .internal import assert_subpackage + +assert_subpackage(__name__) from dataclasses import asdict as dataclasses_asdict from dataclasses import is_dataclass from datetime import datetime -from typing import ( - Any, - Dict, -) +from typing import Any -Json = Dict[str, Any] +Json = dict[str, Any] # for now just serves documentation purposes... but one day might make it statically verifiable where possible? diff --git a/my/core/util.py b/my/core/util.py index a247f81..74e71e1 100644 --- a/my/core/util.py +++ b/my/core/util.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import os import pkgutil import sys +from collections.abc import Iterable from itertools import chain from pathlib import Path from types import ModuleType -from typing import Iterable, List, Optional from .discovery_pure import HPIModule, _is_not_module_src, has_stats, ignored @@ -20,13 +22,14 @@ from .discovery_pure import NOT_HPI_MODULE_VAR assert NOT_HPI_MODULE_VAR in globals() # check name consistency -def is_not_hpi_module(module: str) -> Optional[str]: + +def is_not_hpi_module(module: str) -> str | None: ''' None if a module, otherwise returns reason ''' import importlib.util - path: Optional[str] = None + path: str | None = None try: # TODO annoying, this can cause import of the parent module? spec = importlib.util.find_spec(module) @@ -35,7 +38,7 @@ def is_not_hpi_module(module: str) -> Optional[str]: except Exception as e: # todo a bit misleading.. it actually shouldn't import in most cases, it's just the weird parent module import thing return "import error (possibly missing config entry)" # todo add exc message? - assert path is not None # not sure if can happen? + assert path is not None # not sure if can happen? if _is_not_module_src(Path(path)): return f"marked explicitly (via {NOT_HPI_MODULE_VAR})" @@ -57,9 +60,10 @@ def _iter_all_importables(pkg: ModuleType) -> Iterable[HPIModule]: def _discover_path_importables(pkg_pth: Path, pkg_name: str) -> Iterable[HPIModule]: - from .core_config import config - """Yield all importables under a given path and package.""" + + from .core_config import config # noqa: F401 + for dir_path, dirs, file_names in os.walk(pkg_pth): file_names.sort() # NOTE: sorting dirs in place is intended, it's the way you're supposed to do it with os.walk @@ -82,6 +86,7 @@ def _discover_path_importables(pkg_pth: Path, pkg_name: str) -> Iterable[HPIModu # TODO might need to make it defensive and yield Exception (otherwise hpi doctor might fail for no good reason) # use onerror=? + # ignored explicitly -> not HPI # if enabled in config -> HPI # if disabled in config -> HPI @@ -90,7 +95,7 @@ def _discover_path_importables(pkg_pth: Path, pkg_name: str) -> Iterable[HPIModu # TODO when do we need to recurse? -def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterable[HPIModule]: +def _walk_packages(path: Iterable[str], prefix: str = '', onerror=None) -> Iterable[HPIModule]: """ Modified version of https://github.com/python/cpython/blob/d50a0700265536a20bcce3fb108c954746d97625/Lib/pkgutil.py#L53, to avoid importing modules that are skipped @@ -153,8 +158,9 @@ def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterabl path = [p for p in path if not seen(p)] yield from _walk_packages(path, mname + '.', onerror) + # deprecate? -def get_modules() -> List[HPIModule]: +def get_modules() -> list[HPIModule]: return list(modules()) @@ -169,14 +175,14 @@ def test_module_detection() -> None: with reset() as cc: cc.disabled_modules = ['my.location.*', 'my.body.*', 'my.workouts.*', 'my.private.*'] mods = {m.name: m for m in modules()} - assert mods['my.demo'] .skip_reason == "has no 'stats()' function" + assert mods['my.demo'].skip_reason == "has no 'stats()' function" with reset() as cc: cc.disabled_modules = ['my.location.*', 'my.body.*', 'my.workouts.*', 'my.private.*', 'my.lastfm'] - cc.enabled_modules = ['my.demo'] + cc.enabled_modules = ['my.demo'] mods = {m.name: m for m in modules()} - assert mods['my.demo'] .skip_reason is None # not skipped + assert mods['my.demo'].skip_reason is None # not skipped assert mods['my.lastfm'].skip_reason == "suppressed in the user config" diff --git a/my/core/utils/concurrent.py b/my/core/utils/concurrent.py index 73944ec..515c3f1 100644 --- a/my/core/utils/concurrent.py +++ b/my/core/utils/concurrent.py @@ -1,6 +1,7 @@ -import sys +from __future__ import annotations + from concurrent.futures import Executor, Future -from typing import Any, Callable, Optional, TypeVar +from typing import Any, Callable, TypeVar from ..compat import ParamSpec @@ -15,7 +16,7 @@ class DummyExecutor(Executor): but also want to provide an option to run the code serially (e.g. for debugging) """ - def __init__(self, max_workers: Optional[int] = 1) -> None: + def __init__(self, max_workers: int | None = 1) -> None: self._shutdown = False self._max_workers = max_workers diff --git a/my/core/utils/imports.py b/my/core/utils/imports.py index 4666a5e..e0fb01d 100644 --- a/my/core/utils/imports.py +++ b/my/core/utils/imports.py @@ -1,27 +1,27 @@ +from __future__ import annotations + import importlib import importlib.util import sys from pathlib import Path from types import ModuleType -from typing import Optional - -from ..common import PathIsh # TODO only used in tests? not sure if useful at all. -def import_file(p: PathIsh, name: Optional[str] = None) -> ModuleType: +def import_file(p: Path | str, name: str | None = None) -> ModuleType: p = Path(p) if name is None: name = p.stem spec = importlib.util.spec_from_file_location(name, p) assert spec is not None, f"Fatal error; Could not create module spec from {name} {p}" foo = importlib.util.module_from_spec(spec) - loader = spec.loader; assert loader is not None + loader = spec.loader + assert loader is not None loader.exec_module(foo) return foo -def import_from(path: PathIsh, name: str) -> ModuleType: +def import_from(path: Path | str, name: str) -> ModuleType: path = str(path) sys.path.append(path) try: @@ -30,7 +30,7 @@ def import_from(path: PathIsh, name: str) -> ModuleType: sys.path.remove(path) -def import_dir(path: PathIsh, extra: str = '') -> ModuleType: +def import_dir(path: Path | str, extra: str = '') -> ModuleType: p = Path(path) if p.parts[0] == '~': p = p.expanduser() # TODO eh. not sure about this.. diff --git a/my/core/utils/itertools.py b/my/core/utils/itertools.py index ae9402d..501ebbe 100644 --- a/my/core/utils/itertools.py +++ b/my/core/utils/itertools.py @@ -4,17 +4,13 @@ Various helpers/transforms of iterators Ideally this should be as small as possible and we should rely on stdlib itertools or more_itertools """ +from __future__ import annotations + import warnings -from collections.abc import Hashable +from collections.abc import Hashable, Iterable, Iterator, Sized from typing import ( TYPE_CHECKING, Callable, - Dict, - Iterable, - Iterator, - List, - Optional, - Sized, TypeVar, Union, cast, @@ -23,9 +19,8 @@ from typing import ( import more_itertools from decorator import decorator -from ..compat import ParamSpec from .. import warnings as core_warnings - +from ..compat import ParamSpec T = TypeVar('T') K = TypeVar('K') @@ -39,7 +34,7 @@ def _identity(v: T) -> V: # type: ignore[type-var] # ugh. nothing in more_itertools? # perhaps duplicates_everseen? but it doesn't yield non-unique elements? def ensure_unique(it: Iterable[T], *, key: Callable[[T], K]) -> Iterable[T]: - key2item: Dict[K, T] = {} + key2item: dict[K, T] = {} for i in it: k = key(i) pi = key2item.get(k, None) @@ -72,10 +67,10 @@ def make_dict( key: Callable[[T], K], # TODO make value optional instead? but then will need a typing override for it? value: Callable[[T], V] = _identity, -) -> Dict[K, V]: +) -> dict[K, V]: with_keys = ((key(i), i) for i in it) uniques = ensure_unique(with_keys, key=lambda p: p[0]) - res: Dict[K, V] = {} + res: dict[K, V] = {} for k, i in uniques: res[k] = i if value is None else value(i) return res @@ -93,8 +88,8 @@ def test_make_dict() -> None: d = make_dict(it, key=lambda i: i % 2, value=lambda i: i) # check type inference - d2: Dict[str, int] = make_dict(it, key=lambda i: str(i)) - d3: Dict[str, bool] = make_dict(it, key=lambda i: str(i), value=lambda i: i % 2 == 0) + d2: dict[str, int] = make_dict(it, key=lambda i: str(i)) + d3: dict[str, bool] = make_dict(it, key=lambda i: str(i), value=lambda i: i % 2 == 0) LFP = ParamSpec('LFP') @@ -102,7 +97,7 @@ LV = TypeVar('LV') @decorator -def _listify(func: Callable[LFP, Iterable[LV]], *args: LFP.args, **kwargs: LFP.kwargs) -> List[LV]: +def _listify(func: Callable[LFP, Iterable[LV]], *args: LFP.args, **kwargs: LFP.kwargs) -> list[LV]: """ Wraps a function's return value in wrapper (e.g. list) Useful when an algorithm can be expressed more cleanly as a generator @@ -115,7 +110,7 @@ def _listify(func: Callable[LFP, Iterable[LV]], *args: LFP.args, **kwargs: LFP.k # so seems easiest to just use specialize instantiations of decorator instead if TYPE_CHECKING: - def listify(func: Callable[LFP, Iterable[LV]]) -> Callable[LFP, List[LV]]: ... # noqa: ARG001 + def listify(func: Callable[LFP, Iterable[LV]]) -> Callable[LFP, list[LV]]: ... # noqa: ARG001 else: listify = _listify @@ -130,7 +125,7 @@ def test_listify() -> None: yield 2 res = it() - assert_type(res, List[int]) + assert_type(res, list[int]) assert res == [1, 2] @@ -201,24 +196,24 @@ def test_warn_if_empty_list() -> None: ll = [1, 2, 3] @warn_if_empty - def nonempty() -> List[int]: + def nonempty() -> list[int]: return ll with warnings.catch_warnings(record=True) as w: res1 = nonempty() assert len(w) == 0 - assert_type(res1, List[int]) + assert_type(res1, list[int]) assert isinstance(res1, list) assert res1 is ll # object should be unchanged! @warn_if_empty - def empty() -> List[str]: + def empty() -> list[str]: return [] with warnings.catch_warnings(record=True) as w: res2 = empty() assert len(w) == 1 - assert_type(res2, List[str]) + assert_type(res2, list[str]) assert isinstance(res2, list) assert res2 == [] @@ -242,7 +237,7 @@ def check_if_hashable(iterable: Iterable[_HT]) -> Iterable[_HT]: """ NOTE: Despite Hashable bound, typing annotation doesn't guarantee runtime safety Consider hashable type X, and Y that inherits from X, but not hashable - Then l: List[X] = [Y(...)] is a valid expression, and type checks against Hashable, + Then l: list[X] = [Y(...)] is a valid expression, and type checks against Hashable, but isn't runtime hashable """ # Sadly this doesn't work 100% correctly with dataclasses atm... @@ -268,28 +263,27 @@ def check_if_hashable(iterable: Iterable[_HT]) -> Iterable[_HT]: # TODO different policies -- error/warn/ignore? def test_check_if_hashable() -> None: from dataclasses import dataclass - from typing import Set, Tuple import pytest from ..compat import assert_type - x1: List[int] = [1, 2] + x1: list[int] = [1, 2] r1 = check_if_hashable(x1) assert_type(r1, Iterable[int]) assert r1 is x1 - x2: Iterator[Union[int, str]] = iter((123, 'aba')) + x2: Iterator[int | str] = iter((123, 'aba')) r2 = check_if_hashable(x2) assert_type(r2, Iterable[Union[int, str]]) assert list(r2) == [123, 'aba'] - x3: Tuple[object, ...] = (789, 'aba') + x3: tuple[object, ...] = (789, 'aba') r3 = check_if_hashable(x3) assert_type(r3, Iterable[object]) assert r3 is x3 # object should be unchanged - x4: List[Set[int]] = [{1, 2, 3}, {4, 5, 6}] + x4: list[set[int]] = [{1, 2, 3}, {4, 5, 6}] with pytest.raises(Exception): # should be rejected by mypy sice set isn't Hashable, but also throw at runtime r4 = check_if_hashable(x4) # type: ignore[type-var] @@ -307,7 +301,7 @@ def test_check_if_hashable() -> None: class X: a: int - x6: List[X] = [X(a=123)] + x6: list[X] = [X(a=123)] r6 = check_if_hashable(x6) assert x6 is r6 @@ -316,7 +310,7 @@ def test_check_if_hashable() -> None: class Y(X): b: str - x7: List[Y] = [Y(a=123, b='aba')] + x7: list[Y] = [Y(a=123, b='aba')] with pytest.raises(Exception): # ideally that would also be rejected by mypy, but currently there is a bug # which treats all dataclasses as hashable: https://github.com/python/mypy/issues/11463 @@ -331,11 +325,8 @@ _UEU = TypeVar('_UEU') # instead of just iterator # TODO maybe deprecated Callable support? not sure def unique_everseen( - fun: Union[ - Callable[[], Iterable[_UET]], - Iterable[_UET] - ], - key: Optional[Callable[[_UET], _UEU]] = None, + fun: Callable[[], Iterable[_UET]] | Iterable[_UET], + key: Callable[[_UET], _UEU] | None = None, ) -> Iterator[_UET]: import os diff --git a/my/core/warnings.py b/my/core/warnings.py index 2ffc3e4..d67ec7d 100644 --- a/my/core/warnings.py +++ b/my/core/warnings.py @@ -5,14 +5,16 @@ since who looks at the terminal output? E.g. would be nice to propagate the warnings in the UI (it's even a subclass of Exception!) ''' +from __future__ import annotations + import sys import warnings -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING import click -def _colorize(x: str, color: Optional[str] = None) -> str: +def _colorize(x: str, color: str | None = None) -> str: if color is None: return x @@ -24,7 +26,7 @@ def _colorize(x: str, color: Optional[str] = None) -> str: return click.style(x, fg=color) -def _warn(message: str, *args, color: Optional[str] = None, **kwargs) -> None: +def _warn(message: str, *args, color: str | None = None, **kwargs) -> None: stacklevel = kwargs.get('stacklevel', 1) kwargs['stacklevel'] = stacklevel + 2 # +1 for this function, +1 for medium/high wrapper warnings.warn(_colorize(message, color=color), *args, **kwargs) # noqa: B028