core: make discovery even more static, has_stats via ast + tests
This commit is contained in:
parent
f90599d7e4
commit
3b4a2a378f
2 changed files with 82 additions and 13 deletions
|
@ -16,7 +16,7 @@ NOT_HPI_MODULE_VAR = '__NOT_HPI_MODULE__'
|
||||||
###
|
###
|
||||||
|
|
||||||
import ast
|
import ast
|
||||||
from typing import Optional, Sequence, NamedTuple, Iterable
|
from typing import Optional, Sequence, NamedTuple, Iterable, cast, Any
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import re
|
import re
|
||||||
import logging
|
import logging
|
||||||
|
@ -45,6 +45,29 @@ def ignored(m: str) -> bool:
|
||||||
return re.match(f'^my.({exs})$', m) is not None
|
return re.match(f'^my.({exs})$', m) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def has_stats(src: Path) -> bool:
|
||||||
|
# todo make sure consistent with get_stats?
|
||||||
|
return _has_stats(src.read_text())
|
||||||
|
|
||||||
|
|
||||||
|
def _has_stats(code: str) -> bool:
|
||||||
|
a: ast.Module = ast.parse(code)
|
||||||
|
for x in a.body:
|
||||||
|
try: # maybe assign
|
||||||
|
[tg] = cast(Any, x).targets
|
||||||
|
if tg.id == 'stats':
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try: # maybe def?
|
||||||
|
name = cast(Any, x).name
|
||||||
|
if name == 'stats':
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _is_not_module_src(src: Path) -> bool:
|
def _is_not_module_src(src: Path) -> bool:
|
||||||
a: ast.Module = ast.parse(src.read_text())
|
a: ast.Module = ast.parse(src.read_text())
|
||||||
return _is_not_module_ast(a)
|
return _is_not_module_ast(a)
|
||||||
|
@ -165,3 +188,21 @@ def test_pure() -> None:
|
||||||
src = Path(__file__).read_text()
|
src = Path(__file__).read_text()
|
||||||
assert 'import ' + 'my' not in src
|
assert 'import ' + 'my' not in src
|
||||||
assert 'from ' + 'my' not in src
|
assert 'from ' + 'my' not in src
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_stats() -> None:
|
||||||
|
assert not _has_stats('')
|
||||||
|
assert not _has_stats('x = lambda : whatever')
|
||||||
|
|
||||||
|
assert _has_stats('''
|
||||||
|
def stats():
|
||||||
|
pass
|
||||||
|
''')
|
||||||
|
|
||||||
|
assert _has_stats('''
|
||||||
|
stats = lambda: "something"
|
||||||
|
''')
|
||||||
|
|
||||||
|
assert _has_stats('''
|
||||||
|
stats = other_function
|
||||||
|
''')
|
||||||
|
|
|
@ -7,7 +7,7 @@ import re
|
||||||
import sys
|
import sys
|
||||||
from typing import List, Iterable, Optional
|
from typing import List, Iterable, Optional
|
||||||
|
|
||||||
from .discovery_pure import HPIModule, ignored, _is_not_module_src
|
from .discovery_pure import HPIModule, ignored, _is_not_module_src, has_stats
|
||||||
|
|
||||||
|
|
||||||
def modules() -> Iterable[HPIModule]:
|
def modules() -> Iterable[HPIModule]:
|
||||||
|
@ -38,18 +38,18 @@ def is_not_hpi_module(module: str) -> Optional[str]:
|
||||||
import importlib
|
import importlib
|
||||||
path: Optional[str] = None
|
path: Optional[str] = None
|
||||||
try:
|
try:
|
||||||
# # TODO annoying, this can cause import of the parent module?
|
# TODO annoying, this can cause import of the parent module?
|
||||||
spec = importlib.util.find_spec(module)
|
spec = importlib.util.find_spec(module)
|
||||||
assert spec is not None
|
assert spec is not None
|
||||||
path = spec.origin
|
path = spec.origin
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# todo a bit misleading.. it actually shouldn't import in most cases, it's just the weird parent module import thing
|
||||||
return "import error (possibly missing config entry)" # todo add exc message?
|
return "import error (possibly missing config entry)" # todo add exc message?
|
||||||
assert path is not None # not sure if can happen?
|
assert path is not None # not sure if can happen?
|
||||||
if _is_not_module_src(Path(path)):
|
if _is_not_module_src(Path(path)):
|
||||||
return f"marked explicitly (via {NOT_HPI_MODULE_VAR})"
|
return f"marked explicitly (via {NOT_HPI_MODULE_VAR})"
|
||||||
|
|
||||||
stats = get_stats(module)
|
if not has_stats(Path(path)):
|
||||||
if stats is None:
|
|
||||||
return "has no 'stats()' function"
|
return "has no 'stats()' function"
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -191,16 +191,46 @@ def test_module_detection() -> None:
|
||||||
assert mods['my.lastfm'].skip_reason == "suppressed in the user config"
|
assert mods['my.lastfm'].skip_reason == "suppressed in the user config"
|
||||||
|
|
||||||
|
|
||||||
def test_bad_module(tmp_path: Path) -> None:
|
def test_good_modules(tmp_path: Path) -> None:
|
||||||
|
badp = tmp_path / 'good'
|
||||||
|
par = badp / 'my'
|
||||||
|
par.mkdir(parents=True)
|
||||||
|
|
||||||
|
(par / 'good.py').write_text('def stats(): pass')
|
||||||
|
(par / 'disabled.py').write_text('''
|
||||||
|
from my.core import __NOT_HPI_MODULE__
|
||||||
|
''')
|
||||||
|
(par / 'nostats.py').write_text('''
|
||||||
|
# no stats!
|
||||||
|
''')
|
||||||
|
|
||||||
|
import sys
|
||||||
|
orig_path = list(sys.path)
|
||||||
|
try:
|
||||||
|
sys.path.insert(0, str(badp))
|
||||||
|
good = is_not_hpi_module('my.good')
|
||||||
|
disabled = is_not_hpi_module('my.disabled')
|
||||||
|
nostats = is_not_hpi_module('my.nostats')
|
||||||
|
finally:
|
||||||
|
sys.path = orig_path
|
||||||
|
|
||||||
|
assert good is None # good module!
|
||||||
|
assert disabled is not None
|
||||||
|
assert 'marked explicitly' in disabled
|
||||||
|
assert nostats is not None
|
||||||
|
assert 'stats' in nostats
|
||||||
|
|
||||||
|
|
||||||
|
def test_bad_modules(tmp_path: Path) -> None:
|
||||||
xx = tmp_path / 'precious_data'
|
xx = tmp_path / 'precious_data'
|
||||||
xx.write_text('some precious data')
|
xx.write_text('some precious data')
|
||||||
badp = tmp_path / 'bad'
|
badp = tmp_path / 'bad'
|
||||||
par = badp / 'my'
|
par = badp / 'my'
|
||||||
par.mkdir(parents=True)
|
par.mkdir(parents=True)
|
||||||
|
|
||||||
(par / 'badmodule.py').write_text(f'''
|
(par / 'malicious.py').write_text(f'''
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
Path('{xx}').write_text('') # overwrite file
|
Path('{xx}').write_text('aaand your data is gone!')
|
||||||
|
|
||||||
raise RuntimeError("FAIL ON IMPORT! naughy.")
|
raise RuntimeError("FAIL ON IMPORT! naughy.")
|
||||||
|
|
||||||
|
@ -212,14 +242,12 @@ def stats():
|
||||||
orig_path = list(sys.path)
|
orig_path = list(sys.path)
|
||||||
try:
|
try:
|
||||||
sys.path.insert(0, str(badp))
|
sys.path.insert(0, str(badp))
|
||||||
res = is_not_hpi_module('my.badmodule')
|
res = is_not_hpi_module('my.malicious')
|
||||||
finally:
|
finally:
|
||||||
sys.path = orig_path
|
sys.path = orig_path
|
||||||
# shouldn't crash at least
|
# shouldn't crash at least
|
||||||
assert res is not None # bad indeed
|
assert res is None # good as far as discovery is concerned
|
||||||
# TODO atm it says 'no stats()' function...
|
assert xx.read_text() == 'some precious data' # make sure module wasn't evauluated
|
||||||
# assert 'import error' in res
|
|
||||||
# assert xx.read_text() == 'some precious data' # make sure module wasn't evauluated
|
|
||||||
|
|
||||||
|
|
||||||
### tests end
|
### tests end
|
||||||
|
|
Loading…
Add table
Reference in a new issue