HPI/my/core/discovery_pure.py
Dima Gerasimov 64a4782f0e core/ci: fix windows-specific issues
- use portable separators
- paths should be prepended with r' (so backwards slash isn't treated as escaping)
- sqlite connections should be closed (otherwise windows fails to remove the underlying db file)
- workaround for emojis via PYTHONUTF8=1 test for now
- make ZipPath portable
- properly use tox python environment everywhere

  this was causing issues on Windows
  e.g.
      WARNING: test command found but not installed in testenv
        cmd: C:\hostedtoolcache\windows\Python\3.9.12\x64\python3.EXE
2022-05-03 10:16:01 +01:00

240 lines
6.5 KiB
Python

'''
The idea of this module is to avoid imports of external HPI modules and code evaluation via ast module etc.
This potentially allows it to be:
- robust: can discover modules that can't be imported, generally makes it foolproof
- faster: importing is slow and with tens of modules can be noteiceable
- secure: can be executed in a sandbox & used during setup
It should be free of external modules, importlib, exec, etc. etc.
'''
REQUIRES = 'REQUIRES'
NOT_HPI_MODULE_VAR = '__NOT_HPI_MODULE__'
###
import ast
import os
from typing import Optional, Sequence, List, NamedTuple, Iterable, cast, Any
from pathlib import Path
import re
import logging
'''
None means that requirements weren't defined (different from empty requirements)
'''
Requires = Optional[Sequence[str]]
class HPIModule(NamedTuple):
name: str
skip_reason: Optional[str]
doc: Optional[str] = None
file: Optional[Path] = None
requires: Requires = None
def ignored(m: str) -> bool:
excluded = [
# legacy stuff left for backwards compatibility
'core.*',
'config.*',
]
exs = '|'.join(excluded)
return re.match(f'^my.({exs})$', m) is not None
def has_stats(src: Path) -> bool:
# todo make sure consistent with get_stats?
return _has_stats(src.read_text())
def _has_stats(code: str) -> bool:
a: ast.Module = ast.parse(code)
for x in a.body:
try: # maybe assign
[tg] = cast(Any, x).targets
if tg.id == 'stats':
return True
except:
pass
try: # maybe def?
name = cast(Any, x).name
if name == 'stats':
return True
except:
pass
return False
def _is_not_module_src(src: Path) -> bool:
a: ast.Module = ast.parse(src.read_text())
return _is_not_module_ast(a)
def _is_not_module_ast(a: ast.Module) -> bool:
return any(
getattr(node, 'name', None) == NOT_HPI_MODULE_VAR # direct definition
or any(getattr(n, 'name', None) == NOT_HPI_MODULE_VAR for n in getattr(node, 'names', [])) # import from
for node in a.body
)
# todo should be defensive? not sure
def _extract_requirements(a: ast.Module) -> Requires:
# find the assignment..
for x in a.body:
if not isinstance(x, ast.Assign):
continue
tg = x.targets
if len(tg) != 1:
continue
t = tg[0]
# could be Subscript.. so best to keep dynamic
id_ = getattr(t, 'id', None)
if id_ != REQUIRES:
continue
vals = x.value
# could be List/Tuple/Set?
elts = getattr(vals, 'elts', None)
if elts is None:
continue
deps = []
for c in elts:
if isinstance(c, ast.Constant):
deps.append(c.value)
elif isinstance(c, ast.Str):
deps.append(c.s)
else:
raise RuntimeError(f"Expecting string contants only in {REQUIRES} declaration")
return tuple(deps)
return None
# todo should probably be more defensive..
def all_modules() -> Iterable[HPIModule]:
"""
Return all importable modules under all items in the 'my' namespace package
Note: This returns all modules under all roots - if you have
several overlays (multiple items in my.__path__ and you've overridden
modules), this can return multiple HPIModule objects with the same
name. It should respect import order, as we're traversing
in my.__path__ order, so module_by_name should still work
and return the correctly resolved module, but all_modules
can have duplicates
"""
for my_root in _iter_my_roots():
yield from _modules_under_root(my_root)
def _iter_my_roots() -> Iterable[Path]:
import my # doesn't import any code, because of namespace package
paths: List[str] = list(my.__path__) # type: ignore[attr-defined]
if len(paths) == 0:
# should probably never happen?, if this code is running, it was imported
# because something was added to __path__ to match this name
raise RuntimeError("my.__path__ was empty, try re-installing HPI?")
else:
yield from map(Path, paths)
def _modules_under_root(my_root: Path) -> Iterable[HPIModule]:
"""
Experimental version, which isn't importing the modules, making it more robust and safe.
"""
for f in sorted(my_root.rglob('*.py')):
if f.is_symlink():
continue # meh
mp = f.relative_to(my_root.parent)
if mp.name == '__init__.py':
mp = mp.parent
m = str(mp.with_suffix('')).replace(os.sep, '.')
if ignored(m):
continue
a: ast.Module = ast.parse(f.read_text())
if _is_not_module_ast(a):
continue
doc = ast.get_docstring(a, clean=False)
requires: Requires = None
try:
requires = _extract_requirements(a)
except Exception as e:
logging.exception(e)
yield HPIModule(
name=m,
skip_reason=None,
doc=doc,
file=f.relative_to(my_root.parent),
requires=requires,
)
def module_by_name(name: str) -> HPIModule:
for m in all_modules():
if m.name == name:
return m
raise RuntimeError(f'No such module: {name}')
### tests
def test() -> None:
# TODO this should be a 'sanity check' or something
assert len(list(all_modules())) > 10 # kinda arbitrary
def test_demo() -> None:
demo = module_by_name('my.demo')
assert demo.doc is not None
assert demo.file == Path('my', 'demo.py')
assert demo.requires is None
def test_excluded() -> None:
for m in all_modules():
assert 'my.core.' not in m.name
def test_requires() -> None:
photos = module_by_name('my.photos.main')
r = photos.requires
assert r is not None
assert len(r) == 2 # fragile, but ok for now
def test_pure() -> None:
"""
We want to keep this module clean of other HPI imports
"""
# this uses string concatenation here to prevent
# these tests from testing against themselves
src = Path(__file__).read_text()
# 'import my' is allowed, but
# dont allow anything other HPI modules
assert re.findall('import ' + r'my\.\S+', src, re.M) == []
assert 'from ' + 'my' not in src
def test_has_stats() -> None:
assert not _has_stats('')
assert not _has_stats('x = lambda : whatever')
assert _has_stats('''
def stats():
pass
''')
assert _has_stats('''
stats = lambda: "something"
''')
assert _has_stats('''
stats = other_function
''')