178 lines
5.7 KiB
Python
178 lines
5.7 KiB
Python
"""
|
|
A helper module for defining denylists for sources programmatically
|
|
(in lamens terms, this lets you remove some output from a module you don't want)
|
|
|
|
For docs, see doc/DENYLIST.md
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import functools
|
|
from collections import defaultdict
|
|
from typing import TypeVar, Set, Any, Mapping, Iterator, Dict, List
|
|
from pathlib import Path
|
|
|
|
import click
|
|
from more_itertools import seekable
|
|
from my.core.serialize import dumps
|
|
from my.core.common import PathIsh
|
|
from my.core.warnings import medium
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
DenyMap = Mapping[str, Set[Any]]
|
|
|
|
|
|
def _default_key_func(obj: T) -> str:
|
|
return str(obj)
|
|
|
|
|
|
class DenyList:
|
|
def __init__(self, denylist_file: PathIsh):
|
|
self.file = Path(denylist_file).expanduser().absolute()
|
|
self._deny_raw_list: List[Dict[str, Any]] = []
|
|
self._deny_map: DenyMap = defaultdict(set)
|
|
|
|
# deny cli, user can override these
|
|
self.fzf_path = None
|
|
self._fzf_options = ()
|
|
self._deny_cli_key_func = None
|
|
|
|
def _load(self) -> None:
|
|
if not self.file.exists():
|
|
medium(f"denylist file {self.file} does not exist")
|
|
return
|
|
|
|
deny_map: DenyMap = defaultdict(set)
|
|
data: List[Dict[str, Any]]= json.loads(self.file.read_text())
|
|
self._deny_raw_list = data
|
|
|
|
for ignore in data:
|
|
for k, v in ignore.items():
|
|
deny_map[k].add(v)
|
|
|
|
self._deny_map = deny_map
|
|
|
|
def load(self) -> DenyMap:
|
|
self._load()
|
|
return self._deny_map
|
|
|
|
def write(self) -> None:
|
|
if not self._deny_raw_list:
|
|
medium("no denylist data to write")
|
|
return
|
|
self.file.write_text(json.dumps(self._deny_raw_list))
|
|
|
|
@classmethod
|
|
def _is_json_primitive(cls, val: Any) -> bool:
|
|
return isinstance(val, (str, int, float, bool, type(None)))
|
|
|
|
@classmethod
|
|
def _stringify_value(cls, val: Any) -> Any:
|
|
# if it's a primitive, just return it
|
|
if cls._is_json_primitive(val):
|
|
return val
|
|
# otherwise, stringify-and-back so we can compare to
|
|
# json data loaded from the denylist file
|
|
return json.loads(dumps(val))
|
|
|
|
@classmethod
|
|
def _allow(cls, obj: T, deny_map: DenyMap) -> bool:
|
|
for deny_key, deny_set in deny_map.items():
|
|
# this should be done separately and not as part of the getattr
|
|
# because 'null'/None could actually be a value in the denylist,
|
|
# and the user may define behavior to filter that out
|
|
if not hasattr(obj, deny_key):
|
|
return False
|
|
val = cls._stringify_value(getattr(obj, deny_key))
|
|
# this object doesn't have have the attribute in the denylist
|
|
if val in deny_set:
|
|
return False
|
|
# if we tried all the denylist keys and didn't return False,
|
|
# then this object is allowed
|
|
return True
|
|
|
|
def filter(
|
|
self,
|
|
itr: Iterator[T],
|
|
invert: bool = False,
|
|
) -> Iterator[T]:
|
|
denyf = functools.partial(self._allow, deny_map=self.load())
|
|
if invert:
|
|
return filter(lambda x: not denyf(x), itr)
|
|
return filter(denyf, itr)
|
|
|
|
def deny(self, key: str, value: Any, write: bool = False) -> None:
|
|
'''
|
|
add a key/value pair to the denylist
|
|
'''
|
|
if not self._deny_raw_list:
|
|
self._load()
|
|
self._deny_raw({key: self._stringify_value(value)}, write=write)
|
|
|
|
def _deny_raw(self, data: Dict[str, Any], write: bool = False) -> None:
|
|
self._deny_raw_list.append(data)
|
|
if write:
|
|
self.write()
|
|
|
|
def _prompt_keys(self, item: T) -> str:
|
|
import pprint
|
|
|
|
click.echo(pprint.pformat(item))
|
|
# TODO: extract keys from item by checking if its dataclass/NT etc.?
|
|
resp = click.prompt("Key to deny on").strip()
|
|
if not hasattr(item, resp):
|
|
click.echo(f"Could not find key '{resp}' on item", err=True)
|
|
return self._prompt_keys(item)
|
|
return resp
|
|
|
|
def _deny_cli_remember(
|
|
self,
|
|
items: Iterator[T],
|
|
mem: Dict[str, T],
|
|
) -> Iterator[str]:
|
|
keyf = self._deny_cli_key_func or _default_key_func
|
|
# i.e., convert each item to a string, and map str -> item
|
|
for item in items:
|
|
key = keyf(item)
|
|
mem[key] = item
|
|
yield key
|
|
|
|
def deny_cli(self, itr: Iterator[T]) -> None:
|
|
try:
|
|
from pyfzf import FzfPrompt
|
|
except ImportError:
|
|
click.echo("pyfzf is required to use the denylist cli, run 'python3 -m pip install pyfzf_iter'", err=True)
|
|
sys.exit(1)
|
|
|
|
# wrap in seekable so we can use it multiple times
|
|
# progressively caches the items as we iterate over them
|
|
sit = seekable(itr)
|
|
|
|
prompt_continue = True
|
|
|
|
while prompt_continue:
|
|
# reset the iterator
|
|
sit.seek(0)
|
|
# so we can map the selected string from fzf back to the original objects
|
|
memory_map: Dict[str, T] = {}
|
|
picker = FzfPrompt(
|
|
executable_path=self.fzf_path, default_options="--no-multi"
|
|
)
|
|
picked_l = picker.prompt(
|
|
self._deny_cli_remember(itr, memory_map),
|
|
"--read0",
|
|
*self._fzf_options,
|
|
delimiter="\0",
|
|
)
|
|
assert isinstance(picked_l, list)
|
|
if picked_l:
|
|
picked: T = memory_map[picked_l[0]]
|
|
key = self._prompt_keys(picked)
|
|
self.deny(key, getattr(picked, key), write=True)
|
|
click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True)
|
|
else:
|
|
click.echo("No item selected", err=True)
|
|
|
|
prompt_continue = click.confirm("Continue?")
|