core: add discovery_pure module to get modules and their dependencies via ast module
This commit is contained in:
parent
4ad4f34cda
commit
97650adf3b
3 changed files with 173 additions and 62 deletions
158
my/core/discovery_pure.py
Normal file
158
my/core/discovery_pure.py
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
'''
|
||||||
|
The idea of this module is to avoid imports of external HPI modules and code evaluation via ast module etc.
|
||||||
|
|
||||||
|
This potentially allows it to be:
|
||||||
|
|
||||||
|
- robust: can discover modules that can't be imported, generally makes it foolproof
|
||||||
|
- faster: importing is slow and with tens of modules can be noteiceable
|
||||||
|
- secure: can be executed in a sandbox & used during setup
|
||||||
|
'''
|
||||||
|
|
||||||
|
REQUIRES = 'REQUIRES'
|
||||||
|
NOT_HPI_MODULE_VAR = '__NOT_HPI_MODULE__'
|
||||||
|
|
||||||
|
###
|
||||||
|
|
||||||
|
from typing import Optional, Sequence, NamedTuple, Iterable
|
||||||
|
from pathlib import Path
|
||||||
|
import re
|
||||||
|
import logging
|
||||||
|
|
||||||
|
'''
|
||||||
|
None means that requirements weren't defined (different from empty requirements)
|
||||||
|
'''
|
||||||
|
Requires = Optional[Sequence[str]]
|
||||||
|
|
||||||
|
|
||||||
|
class HPIModule(NamedTuple):
|
||||||
|
name: str
|
||||||
|
skip_reason: Optional[str]
|
||||||
|
doc: Optional[str] = None
|
||||||
|
file: Optional[Path] = None
|
||||||
|
requires: Requires = None
|
||||||
|
|
||||||
|
|
||||||
|
def ignored(m: str) -> bool:
|
||||||
|
excluded = [
|
||||||
|
# legacy stuff left for backwards compatibility
|
||||||
|
'core.*',
|
||||||
|
'config.*',
|
||||||
|
]
|
||||||
|
exs = '|'.join(excluded)
|
||||||
|
return re.match(f'^my.({exs})$', m) is not None
|
||||||
|
|
||||||
|
|
||||||
|
import ast
|
||||||
|
|
||||||
|
# todo should be defensive? not sure
|
||||||
|
def _extract_requirements(a: ast.Module) -> Requires:
|
||||||
|
# find the assignment..
|
||||||
|
for x in a.body:
|
||||||
|
if not isinstance(x, ast.Assign):
|
||||||
|
continue
|
||||||
|
tg = x.targets
|
||||||
|
if len(tg) != 1:
|
||||||
|
continue
|
||||||
|
t = tg[0]
|
||||||
|
# could be Subscript.. so best to keep dynamic
|
||||||
|
id_ = getattr(t, 'id', None)
|
||||||
|
if id_ != REQUIRES:
|
||||||
|
continue
|
||||||
|
vals = x.value
|
||||||
|
# could be List/Tuple/Set?
|
||||||
|
elts = getattr(vals, 'elts', None)
|
||||||
|
if elts is None:
|
||||||
|
continue
|
||||||
|
deps = []
|
||||||
|
for c in elts:
|
||||||
|
if isinstance(c, ast.Constant):
|
||||||
|
deps.append(c.value)
|
||||||
|
elif isinstance(c, ast.Str):
|
||||||
|
deps.append(c.s)
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f"Expecting string contants only in {REQUIRES} declaration")
|
||||||
|
return tuple(deps)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# todo should probably be more defensive..
|
||||||
|
def all_modules() -> Iterable[HPIModule]:
|
||||||
|
"""
|
||||||
|
Experimental version, which isn't importing the modules, making it more robust and safe.
|
||||||
|
"""
|
||||||
|
my_root = Path(__file__).absolute().parent.parent
|
||||||
|
for f in sorted(my_root.rglob('*.py')):
|
||||||
|
if f.is_symlink():
|
||||||
|
continue # meh
|
||||||
|
mp = f.relative_to(my_root.parent)
|
||||||
|
if mp.name == '__init__.py':
|
||||||
|
mp = mp.parent
|
||||||
|
m = str(mp.with_suffix('')).replace('/', '.')
|
||||||
|
if ignored(m):
|
||||||
|
continue
|
||||||
|
a: ast.Module = ast.parse(f.read_text())
|
||||||
|
is_not_module = any(
|
||||||
|
getattr(node, 'name', None) == NOT_HPI_MODULE_VAR # direct definition
|
||||||
|
or any(getattr(n, 'name', None) == NOT_HPI_MODULE_VAR for n in getattr(node, 'names', [])) # import from
|
||||||
|
for node in a.body
|
||||||
|
)
|
||||||
|
if is_not_module:
|
||||||
|
continue
|
||||||
|
doc = ast.get_docstring(a, clean=False)
|
||||||
|
|
||||||
|
requires: Requires = None
|
||||||
|
try:
|
||||||
|
requires = _extract_requirements(a)
|
||||||
|
except Exception as e:
|
||||||
|
logging.exception(e)
|
||||||
|
|
||||||
|
yield HPIModule(
|
||||||
|
name=m,
|
||||||
|
skip_reason=None,
|
||||||
|
doc=doc,
|
||||||
|
file=f.relative_to(my_root.parent),
|
||||||
|
requires=requires,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def module_by_name(name: str) -> HPIModule:
|
||||||
|
for m in all_modules():
|
||||||
|
if m.name == name:
|
||||||
|
return m
|
||||||
|
raise RuntimeError(f'No such module: {name}')
|
||||||
|
|
||||||
|
|
||||||
|
### tests
|
||||||
|
|
||||||
|
|
||||||
|
def test() -> None:
|
||||||
|
# TODO this should be a 'sanity check' or something
|
||||||
|
assert len(list(all_modules())) > 10 # kinda arbitrary
|
||||||
|
|
||||||
|
|
||||||
|
def test_demo() -> None:
|
||||||
|
demo = module_by_name('my.demo')
|
||||||
|
assert demo.doc is not None
|
||||||
|
assert str(demo.file) == 'my/demo.py'
|
||||||
|
assert demo.requires is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_excluded() -> None:
|
||||||
|
for m in all_modules():
|
||||||
|
assert 'my.core.' not in m.name
|
||||||
|
|
||||||
|
|
||||||
|
def test_requires() -> None:
|
||||||
|
photos = module_by_name('my.photos.main')
|
||||||
|
r = photos.requires
|
||||||
|
assert r is not None
|
||||||
|
assert len(r) == 2 # fragile, but ok for now
|
||||||
|
|
||||||
|
|
||||||
|
def test_pure() -> None:
|
||||||
|
"""
|
||||||
|
We want to keep this module clean of other HPI imports
|
||||||
|
"""
|
||||||
|
src = Path(__file__).read_text()
|
||||||
|
assert 'import ' + 'my' not in src
|
||||||
|
assert 'from ' + 'my' not in src
|
|
@ -5,14 +5,9 @@ import os
|
||||||
import pkgutil
|
import pkgutil
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
from typing import List, Iterable, NamedTuple, Optional
|
from typing import List, Iterable, Optional
|
||||||
|
|
||||||
|
from .discovery_pure import HPIModule, ignored # legacy
|
||||||
class HPIModule(NamedTuple):
|
|
||||||
name: str
|
|
||||||
skip_reason: Optional[str]
|
|
||||||
doc: Optional[str] = None
|
|
||||||
file: Optional[Path] = None
|
|
||||||
|
|
||||||
|
|
||||||
def modules() -> Iterable[HPIModule]:
|
def modules() -> Iterable[HPIModule]:
|
||||||
|
@ -21,16 +16,6 @@ def modules() -> Iterable[HPIModule]:
|
||||||
yield m
|
yield m
|
||||||
|
|
||||||
|
|
||||||
def ignored(m: str) -> bool:
|
|
||||||
excluded = [
|
|
||||||
# legacy stuff left for backwards compatibility
|
|
||||||
'core.*',
|
|
||||||
'config.*',
|
|
||||||
]
|
|
||||||
exs = '|'.join(excluded)
|
|
||||||
return re.match(f'^my.({exs})$', m) is not None
|
|
||||||
|
|
||||||
|
|
||||||
from .common import StatsFun
|
from .common import StatsFun
|
||||||
def get_stats(module: str) -> Optional[StatsFun]:
|
def get_stats(module: str) -> Optional[StatsFun]:
|
||||||
# todo detect via ast?
|
# todo detect via ast?
|
||||||
|
@ -43,6 +28,8 @@ def get_stats(module: str) -> Optional[StatsFun]:
|
||||||
|
|
||||||
|
|
||||||
__NOT_HPI_MODULE__ = 'Import this to mark a python file as a helper, not an actual HPI module'
|
__NOT_HPI_MODULE__ = 'Import this to mark a python file as a helper, not an actual HPI module'
|
||||||
|
from .discovery_pure import NOT_HPI_MODULE_VAR
|
||||||
|
assert NOT_HPI_MODULE_VAR in globals() # check name consistency
|
||||||
|
|
||||||
def has_not_module_flag(module: str) -> bool:
|
def has_not_module_flag(module: str) -> bool:
|
||||||
# if module == 'my.books.kobo':
|
# if module == 'my.books.kobo':
|
||||||
|
@ -113,10 +100,10 @@ def _discover_path_importables(pkg_pth: Path, pkg_name: str) -> Iterable[HPIModu
|
||||||
|
|
||||||
|
|
||||||
def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterable[HPIModule]:
|
def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterable[HPIModule]:
|
||||||
'''
|
"""
|
||||||
Modified version of https://github.com/python/cpython/blob/d50a0700265536a20bcce3fb108c954746d97625/Lib/pkgutil.py#L53,
|
Modified version of https://github.com/python/cpython/blob/d50a0700265536a20bcce3fb108c954746d97625/Lib/pkgutil.py#L53,
|
||||||
to alvoid importing modules that are skipped
|
to alvoid importing modules that are skipped
|
||||||
'''
|
"""
|
||||||
from .core_config import config
|
from .core_config import config
|
||||||
|
|
||||||
def seen(p, m={}):
|
def seen(p, m={}):
|
||||||
|
@ -144,7 +131,7 @@ def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterabl
|
||||||
if is_not_module is not None:
|
if is_not_module is not None:
|
||||||
skip_reason = is_not_module
|
skip_reason = is_not_module
|
||||||
|
|
||||||
else: # active is True
|
else: # active is True
|
||||||
# nothing to do, enabled explicitly
|
# nothing to do, enabled explicitly
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -175,54 +162,19 @@ def _walk_packages(path: Iterable[str], prefix: str='', onerror=None) -> Iterabl
|
||||||
path = [p for p in path if not seen(p)]
|
path = [p for p in path if not seen(p)]
|
||||||
yield from _walk_packages(path, mname+'.', onerror)
|
yield from _walk_packages(path, mname+'.', onerror)
|
||||||
|
|
||||||
|
|
||||||
def modules_via_ast() -> Iterable[HPIModule]:
|
|
||||||
'''
|
|
||||||
Experimental version, which isn't importing the modules, making it more robust and safe.
|
|
||||||
'''
|
|
||||||
import ast
|
|
||||||
|
|
||||||
my_root = Path(__file__).absolute().parent.parent
|
|
||||||
for f in sorted(my_root.rglob('*.py')):
|
|
||||||
if f.is_symlink():
|
|
||||||
continue # meh
|
|
||||||
mp = f.relative_to(my_root.parent)
|
|
||||||
if mp.name == '__init__.py':
|
|
||||||
mp = mp.parent
|
|
||||||
m = str(mp.with_suffix('')).replace('/', '.')
|
|
||||||
if ignored(m):
|
|
||||||
continue
|
|
||||||
a = ast.parse(f.read_text())
|
|
||||||
NM = '__NOT_HPI_MODULE__'
|
|
||||||
is_not_module = any(
|
|
||||||
getattr(node, 'name', None) == NM # direct definition
|
|
||||||
or
|
|
||||||
any(getattr(n, 'name', None) == NM for n in getattr(node, 'names', [])) # import from
|
|
||||||
for node in a.body)
|
|
||||||
if is_not_module:
|
|
||||||
continue
|
|
||||||
doc = ast.get_docstring(a, clean=False)
|
|
||||||
|
|
||||||
yield HPIModule(
|
|
||||||
name=m,
|
|
||||||
skip_reason=None,
|
|
||||||
doc=doc,
|
|
||||||
file=f.relative_to(my_root.parent), # todo not sure if should be relative
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# deprecate?
|
# deprecate?
|
||||||
def get_modules() -> List[HPIModule]:
|
def get_modules() -> List[HPIModule]:
|
||||||
return list(modules())
|
return list(modules())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### tests start
|
### tests start
|
||||||
|
|
||||||
## FIXME: add test when there is an import error -- should be defensive and yield exception
|
## FIXME: add test when there is an import error -- should be defensive and yield exception
|
||||||
|
|
||||||
|
|
||||||
def test_module_detection() -> None:
|
def test_module_detection() -> None:
|
||||||
from .core_config import _reset_config as reset
|
from .core_config import _reset_config as reset
|
||||||
|
|
||||||
with reset() as cc:
|
with reset() as cc:
|
||||||
cc.disabled_modules = ['my.location.*', 'my.body.*', 'my.workouts.*', 'my.private.*']
|
cc.disabled_modules = ['my.location.*', 'my.body.*', 'my.workouts.*', 'my.private.*']
|
||||||
mods = {m.name: m for m in modules()}
|
mods = {m.name: m for m in modules()}
|
||||||
|
@ -233,7 +185,7 @@ def test_module_detection() -> None:
|
||||||
cc.enabled_modules = ['my.demo']
|
cc.enabled_modules = ['my.demo']
|
||||||
mods = {m.name: m for m in modules()}
|
mods = {m.name: m for m in modules()}
|
||||||
|
|
||||||
assert mods['my.demo'] .skip_reason is None # not skipped
|
assert mods['my.demo'] .skip_reason is None # not skipped
|
||||||
assert mods['my.lastfm'].skip_reason == "suppressed in the user config"
|
assert mods['my.lastfm'].skip_reason == "suppressed in the user config"
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,8 @@ we can run against the tests in my.core directly.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
from my.core.core_config import *
|
from my.core.common import *
|
||||||
from my.core.error import *
|
from my.core.core_config import *
|
||||||
from my.core.util import *
|
from my.core.error import *
|
||||||
from my.core.common import *
|
from my.core.util import *
|
||||||
|
from my.core.discovery_pure import *
|
||||||
|
|
Loading…
Add table
Reference in a new issue