diff --git a/my/core/discovery_pure.py b/my/core/discovery_pure.py new file mode 100644 index 0000000..0c3cab5 --- /dev/null +++ b/my/core/discovery_pure.py @@ -0,0 +1,158 @@ +''' +The idea of this module is to avoid imports of external HPI modules and code evaluation via ast module etc. + +This potentially allows it to be: + +- robust: can discover modules that can't be imported, generally makes it foolproof +- faster: importing is slow and with tens of modules can be noteiceable +- secure: can be executed in a sandbox & used during setup +''' + +REQUIRES = 'REQUIRES' +NOT_HPI_MODULE_VAR = '__NOT_HPI_MODULE__' + +### + +from typing import Optional, Sequence, NamedTuple, Iterable +from pathlib import Path +import re +import logging + +''' +None means that requirements weren't defined (different from empty requirements) +''' +Requires = Optional[Sequence[str]] + + +class HPIModule(NamedTuple): + name: str + skip_reason: Optional[str] + doc: Optional[str] = None + file: Optional[Path] = None + requires: Requires = None + + +def ignored(m: str) -> bool: + excluded = [ + # legacy stuff left for backwards compatibility + 'core.*', + 'config.*', + ] + exs = '|'.join(excluded) + return re.match(f'^my.({exs})$', m) is not None + + +import ast + +# todo should be defensive? not sure +def _extract_requirements(a: ast.Module) -> Requires: + # find the assignment.. + for x in a.body: + if not isinstance(x, ast.Assign): + continue + tg = x.targets + if len(tg) != 1: + continue + t = tg[0] + # could be Subscript.. so best to keep dynamic + id_ = getattr(t, 'id', None) + if id_ != REQUIRES: + continue + vals = x.value + # could be List/Tuple/Set? + elts = getattr(vals, 'elts', None) + if elts is None: + continue + deps = [] + for c in elts: + if isinstance(c, ast.Constant): + deps.append(c.value) + elif isinstance(c, ast.Str): + deps.append(c.s) + else: + raise RuntimeError(f"Expecting string contants only in {REQUIRES} declaration") + return tuple(deps) + return None + + +# todo should probably be more defensive.. +def all_modules() -> Iterable[HPIModule]: + """ + Experimental version, which isn't importing the modules, making it more robust and safe. + """ + my_root = Path(__file__).absolute().parent.parent + for f in sorted(my_root.rglob('*.py')): + if f.is_symlink(): + continue # meh + mp = f.relative_to(my_root.parent) + if mp.name == '__init__.py': + mp = mp.parent + m = str(mp.with_suffix('')).replace('/', '.') + if ignored(m): + continue + a: ast.Module = ast.parse(f.read_text()) + is_not_module = any( + getattr(node, 'name', None) == NOT_HPI_MODULE_VAR # direct definition + or any(getattr(n, 'name', None) == NOT_HPI_MODULE_VAR for n in getattr(node, 'names', [])) # import from + for node in a.body + ) + if is_not_module: + continue + doc = ast.get_docstring(a, clean=False) + + requires: Requires = None + try: + requires = _extract_requirements(a) + except Exception as e: + logging.exception(e) + + yield HPIModule( + name=m, + skip_reason=None, + doc=doc, + file=f.relative_to(my_root.parent), + requires=requires, + ) + + +def module_by_name(name: str) -> HPIModule: + for m in all_modules(): + if m.name == name: + return m + raise RuntimeError(f'No such module: {name}') + + +### tests + + +def test() -> None: + # TODO this should be a 'sanity check' or something + assert len(list(all_modules())) > 10 # kinda arbitrary + + +def test_demo() -> None: + demo = module_by_name('my.demo') + assert demo.doc is not None + assert str(demo.file) == 'my/demo.py' + assert demo.requires is None + + +def test_excluded() -> None: + for m in all_modules(): + assert 'my.core.' not in m.name + + +def test_requires() -> None: + photos = module_by_name('my.photos.main') + r = photos.requires + assert r is not None + assert len(r) == 2 # fragile, but ok for now + + +def test_pure() -> None: + """ + We want to keep this module clean of other HPI imports + """ + src = Path(__file__).read_text() + assert 'import ' + 'my' not in src + assert 'from ' + 'my' not in src diff --git a/my/core/util.py b/my/core/util.py index f6a4ea1..c3ee343 100644 --- a/my/core/util.py +++ b/my/core/util.py @@ -5,14 +5,9 @@ import os import pkgutil import re import sys -from typing import List, Iterable, NamedTuple, Optional +from typing import List, Iterable, Optional - -class HPIModule(NamedTuple): - name: str - skip_reason: Optional[str] - doc: Optional[str] = None - file: Optional[Path] = None +from .discovery_pure import HPIModule, ignored # legacy def modules() -> Iterable[HPIModule]: @@ -21,16 +16,6 @@ def modules() -> Iterable[HPIModule]: yield m -def ignored(m: str) -> bool: - excluded = [ - # legacy stuff left for backwards compatibility - 'core.*', - 'config.*', - ] - exs = '|'.join(excluded) - return re.match(f'^my.({exs})$', m) is not None - - from .common import StatsFun def get_stats(module: str) -> Optional[StatsFun]: # todo detect via ast? @@ -43,6 +28,8 @@ def get_stats(module: str) -> Optional[StatsFun]: __NOT_HPI_MODULE__ = 'Import this to mark a python file as a helper, not an actual HPI module' +from .discovery_pure import NOT_HPI_MODULE_VAR +assert NOT_HPI_MODULE_VAR in globals() # check name consistency def has_not_module_flag(module: str) -> bool: # if module == 'my.books.kobo': @@ -113,10 +100,10 @@ def _discover_path_importables(pkg_pth: Path, pkg_name: str) -> Iterable[HPIModu def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterable[HPIModule]: - ''' + """ Modified version of https://github.com/python/cpython/blob/d50a0700265536a20bcce3fb108c954746d97625/Lib/pkgutil.py#L53, to alvoid importing modules that are skipped - ''' + """ from .core_config import config def seen(p, m={}): @@ -144,7 +131,7 @@ def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterabl if is_not_module is not None: skip_reason = is_not_module - else: # active is True + else: # active is True # nothing to do, enabled explicitly pass @@ -175,54 +162,19 @@ def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterabl path = [p for p in path if not seen(p)] yield from _walk_packages(path, mname+'.', onerror) - -def modules_via_ast() -> Iterable[HPIModule]: - ''' - Experimental version, which isn't importing the modules, making it more robust and safe. - ''' - import ast - - my_root = Path(__file__).absolute().parent.parent - for f in sorted(my_root.rglob('*.py')): - if f.is_symlink(): - continue # meh - mp = f.relative_to(my_root.parent) - if mp.name == '__init__.py': - mp = mp.parent - m = str(mp.with_suffix('')).replace('/', '.') - if ignored(m): - continue - a = ast.parse(f.read_text()) - NM = '__NOT_HPI_MODULE__' - is_not_module = any( - getattr(node, 'name', None) == NM # direct definition - or - any(getattr(n, 'name', None) == NM for n in getattr(node, 'names', [])) # import from - for node in a.body) - if is_not_module: - continue - doc = ast.get_docstring(a, clean=False) - - yield HPIModule( - name=m, - skip_reason=None, - doc=doc, - file=f.relative_to(my_root.parent), # todo not sure if should be relative - ) - - # deprecate? def get_modules() -> List[HPIModule]: return list(modules()) - ### tests start ## FIXME: add test when there is an import error -- should be defensive and yield exception + def test_module_detection() -> None: from .core_config import _reset_config as reset + with reset() as cc: cc.disabled_modules = ['my.location.*', 'my.body.*', 'my.workouts.*', 'my.private.*'] mods = {m.name: m for m in modules()} @@ -233,7 +185,7 @@ def test_module_detection() -> None: cc.enabled_modules = ['my.demo'] mods = {m.name: m for m in modules()} - assert mods['my.demo'] .skip_reason is None # not skipped + assert mods['my.demo'] .skip_reason is None # not skipped assert mods['my.lastfm'].skip_reason == "suppressed in the user config" diff --git a/tests/core.py b/tests/core.py index 4aa8173..b50b309 100644 --- a/tests/core.py +++ b/tests/core.py @@ -10,7 +10,8 @@ we can run against the tests in my.core directly. ''' -from my.core.core_config import * -from my.core.error import * -from my.core.util import * -from my.core.common import * +from my.core.common import * +from my.core.core_config import * +from my.core.error import * +from my.core.util import * +from my.core.discovery_pure import *