""" A helper module for defining denylists for sources programmatically (in lamens terms, this lets you remove some output from a module you don't want) For docs, see doc/DENYLIST.md """ import functools import json import sys from collections import defaultdict from pathlib import Path from typing import Any, Dict, Iterator, List, Mapping, Set, TypeVar import click from more_itertools import seekable from my.core.common import PathIsh from my.core.serialize import dumps from my.core.warnings import medium T = TypeVar("T") DenyMap = Mapping[str, Set[Any]] def _default_key_func(obj: T) -> str: return str(obj) class DenyList: def __init__(self, denylist_file: PathIsh): self.file = Path(denylist_file).expanduser().absolute() self._deny_raw_list: List[Dict[str, Any]] = [] self._deny_map: DenyMap = defaultdict(set) # deny cli, user can override these self.fzf_path = None self._fzf_options = () self._deny_cli_key_func = None def _load(self) -> None: if not self.file.exists(): medium(f"denylist file {self.file} does not exist") return deny_map: DenyMap = defaultdict(set) data: List[Dict[str, Any]]= json.loads(self.file.read_text()) self._deny_raw_list = data for ignore in data: for k, v in ignore.items(): deny_map[k].add(v) self._deny_map = deny_map def load(self) -> DenyMap: self._load() return self._deny_map def write(self) -> None: if not self._deny_raw_list: medium("no denylist data to write") return self.file.write_text(json.dumps(self._deny_raw_list)) @classmethod def _is_json_primitive(cls, val: Any) -> bool: return isinstance(val, (str, int, float, bool, type(None))) @classmethod def _stringify_value(cls, val: Any) -> Any: # if it's a primitive, just return it if cls._is_json_primitive(val): return val # otherwise, stringify-and-back so we can compare to # json data loaded from the denylist file return json.loads(dumps(val)) @classmethod def _allow(cls, obj: T, deny_map: DenyMap) -> bool: for deny_key, deny_set in deny_map.items(): # this should be done separately and not as part of the getattr # because 'null'/None could actually be a value in the denylist, # and the user may define behavior to filter that out if not hasattr(obj, deny_key): return False val = cls._stringify_value(getattr(obj, deny_key)) # this object doesn't have have the attribute in the denylist if val in deny_set: return False # if we tried all the denylist keys and didn't return False, # then this object is allowed return True def filter( self, itr: Iterator[T], *, invert: bool = False, ) -> Iterator[T]: denyf = functools.partial(self._allow, deny_map=self.load()) if invert: return filter(lambda x: not denyf(x), itr) return filter(denyf, itr) def deny(self, key: str, value: Any, *, write: bool = False) -> None: ''' add a key/value pair to the denylist ''' if not self._deny_raw_list: self._load() self._deny_raw({key: self._stringify_value(value)}, write=write) def _deny_raw(self, data: Dict[str, Any], *, write: bool = False) -> None: self._deny_raw_list.append(data) if write: self.write() def _prompt_keys(self, item: T) -> str: import pprint click.echo(pprint.pformat(item)) # TODO: extract keys from item by checking if its dataclass/NT etc.? resp = click.prompt("Key to deny on").strip() if not hasattr(item, resp): click.echo(f"Could not find key '{resp}' on item", err=True) return self._prompt_keys(item) return resp def _deny_cli_remember( self, items: Iterator[T], mem: Dict[str, T], ) -> Iterator[str]: keyf = self._deny_cli_key_func or _default_key_func # i.e., convert each item to a string, and map str -> item for item in items: key = keyf(item) mem[key] = item yield key def deny_cli(self, itr: Iterator[T]) -> None: try: from pyfzf import FzfPrompt except ImportError: click.echo("pyfzf is required to use the denylist cli, run 'python3 -m pip install pyfzf_iter'", err=True) sys.exit(1) # wrap in seekable so we can use it multiple times # progressively caches the items as we iterate over them sit = seekable(itr) prompt_continue = True while prompt_continue: # reset the iterator sit.seek(0) # so we can map the selected string from fzf back to the original objects memory_map: Dict[str, T] = {} picker = FzfPrompt( executable_path=self.fzf_path, default_options="--no-multi" ) picked_l = picker.prompt( self._deny_cli_remember(itr, memory_map), "--read0", *self._fzf_options, delimiter="\0", ) assert isinstance(picked_l, list) if picked_l: picked: T = memory_map[picked_l[0]] key = self._prompt_keys(picked) self.deny(key, getattr(picked, key), write=True) click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True) else: click.echo("No item selected", err=True) prompt_continue = click.confirm("Continue?")