HPI/my/core/denylist.py
karlicoss d3f9a8e8b6
core: migrate code to benefit from 3.9 stuff (#401)
for now keeping ruff on 3.8 target version, need to sort out modules as well
2024-10-19 20:55:09 +01:00

179 lines
5.7 KiB
Python

"""
A helper module for defining denylists for sources programmatically
(in lamens terms, this lets you remove some output from a module you don't want)
For docs, see doc/DENYLIST.md
"""
from __future__ import annotations
import functools
import json
import sys
from collections import defaultdict
from collections.abc import Iterator, Mapping
from pathlib import Path
from typing import Any, TypeVar
import click
from more_itertools import seekable
from .serialize import dumps
from .warnings import medium
T = TypeVar("T")
DenyMap = Mapping[str, set[Any]]
def _default_key_func(obj: T) -> str:
return str(obj)
class DenyList:
def __init__(self, denylist_file: Path | str) -> None:
self.file = Path(denylist_file).expanduser().absolute()
self._deny_raw_list: list[dict[str, Any]] = []
self._deny_map: DenyMap = defaultdict(set)
# deny cli, user can override these
self.fzf_path = None
self._fzf_options = ()
self._deny_cli_key_func = None
def _load(self) -> None:
if not self.file.exists():
medium(f"denylist file {self.file} does not exist")
return
deny_map: DenyMap = defaultdict(set)
data: list[dict[str, Any]] = json.loads(self.file.read_text())
self._deny_raw_list = data
for ignore in data:
for k, v in ignore.items():
deny_map[k].add(v)
self._deny_map = deny_map
def load(self) -> DenyMap:
self._load()
return self._deny_map
def write(self) -> None:
if not self._deny_raw_list:
medium("no denylist data to write")
return
self.file.write_text(json.dumps(self._deny_raw_list))
@classmethod
def _is_json_primitive(cls, val: Any) -> bool:
return isinstance(val, (str, int, float, bool, type(None)))
@classmethod
def _stringify_value(cls, val: Any) -> Any:
# if it's a primitive, just return it
if cls._is_json_primitive(val):
return val
# otherwise, stringify-and-back so we can compare to
# json data loaded from the denylist file
return json.loads(dumps(val))
@classmethod
def _allow(cls, obj: T, deny_map: DenyMap) -> bool:
for deny_key, deny_set in deny_map.items():
# this should be done separately and not as part of the getattr
# because 'null'/None could actually be a value in the denylist,
# and the user may define behavior to filter that out
if not hasattr(obj, deny_key):
return False
val = cls._stringify_value(getattr(obj, deny_key))
# this object doesn't have have the attribute in the denylist
if val in deny_set:
return False
# if we tried all the denylist keys and didn't return False,
# then this object is allowed
return True
def filter(
self,
itr: Iterator[T],
*,
invert: bool = False,
) -> Iterator[T]:
denyf = functools.partial(self._allow, deny_map=self.load())
if invert:
return filter(lambda x: not denyf(x), itr)
return filter(denyf, itr)
def deny(self, key: str, value: Any, *, write: bool = False) -> None:
'''
add a key/value pair to the denylist
'''
if not self._deny_raw_list:
self._load()
self._deny_raw({key: self._stringify_value(value)}, write=write)
def _deny_raw(self, data: dict[str, Any], *, write: bool = False) -> None:
self._deny_raw_list.append(data)
if write:
self.write()
def _prompt_keys(self, item: T) -> str:
import pprint
click.echo(pprint.pformat(item))
# TODO: extract keys from item by checking if its dataclass/NT etc.?
resp = click.prompt("Key to deny on").strip()
if not hasattr(item, resp):
click.echo(f"Could not find key '{resp}' on item", err=True)
return self._prompt_keys(item)
return resp
def _deny_cli_remember(
self,
items: Iterator[T],
mem: dict[str, T],
) -> Iterator[str]:
keyf = self._deny_cli_key_func or _default_key_func
# i.e., convert each item to a string, and map str -> item
for item in items:
key = keyf(item)
mem[key] = item
yield key
def deny_cli(self, itr: Iterator[T]) -> None:
try:
from pyfzf import FzfPrompt
except ImportError:
click.echo("pyfzf is required to use the denylist cli, run 'python3 -m pip install pyfzf_iter'", err=True)
sys.exit(1)
# wrap in seekable so we can use it multiple times
# progressively caches the items as we iterate over them
sit = seekable(itr)
prompt_continue = True
while prompt_continue:
# reset the iterator
sit.seek(0)
# so we can map the selected string from fzf back to the original objects
memory_map: dict[str, T] = {}
picker = FzfPrompt(executable_path=self.fzf_path, default_options="--no-multi")
picked_l = picker.prompt(
self._deny_cli_remember(itr, memory_map),
"--read0",
*self._fzf_options,
delimiter="\0",
)
assert isinstance(picked_l, list)
if picked_l:
picked: T = memory_map[picked_l[0]]
key = self._prompt_keys(picked)
self.deny(key, getattr(picked, key), write=True)
click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True)
else:
click.echo("No item selected", err=True)
prompt_continue = click.confirm("Continue?")