""" TODO: move this to doc/DENYLIST ? A helper module for defining denylists for sources programatically (in lamens terms, this lets you remove some output from a module you don't want) Lets you specify a class, an attribute to match on, and a json file containing a list of values to deny/filter out As an example, for a class like this: class IP(NamedTuple): ip: str dt: datetime A possible denylist file would contain: [ { "ip": "192.168.1.1", }, { "dt": "2020-06-02T03:12:00+00:00", } ] Note that if the value being compared to is not a single (non-array/object) JSON primitive (str, int, float, bool, None), it will be converted to a string before comparison To use this in code: ``` from my.ip.all import ips filtered = DenyList("~/data/ip_denylist.json").filter(ips()) ``` To add items to the denylist, in python (in a one-off script): ``` from my.ip.all import ips from my.core.denylist import DenyList d = DenyList("~/data/ip_denylist.json") for ip in ips(): # some custom code you define if ip.ip == ...: d.deny(key="ip", value=ip.ip) d.write() ``` ... or interactively, which requires `fzf` to be installed, after running ``` from my.ip.all import ips from my.core.denylist import DenyList d = DenyList("~/data/ip_denylist.json") d.deny_cli(ips()) d.write() ``` This is meant for relatively simple filters, where you want to filter out based on a single attribute of a namedtuple/dataclass. If you want to do something more complex, I would recommend overriding the all.py file for that source and writing your own filter function there. For more info on all.py: https://github.com/karlicoss/HPI/blob/master/doc/MODULE_DESIGN.org#allpy This would typically be used in an overriden all.py file, or in a one-off script which you may want to filter out some items from a source, progressively adding more items to the denylist as you go. A potential my/ip/all.py file might look like: ``` from typing import Iterator from my.ip.common import IP # type: ignore[import] from my.core.denylist import DenyList deny = DenyList("~/data/ip_denylist.json") def ips() -> Iterator[IP]: from my.ip import discord yield from deny.filter(discord.ips()) ``` To add items to the denylist, you could create a __main__.py file, or: ``` python3 -c 'from my.ip import all; all.deny.deny_cli(all.ips())' ``` Sidenote: the reason why we want to specifically override the all.py and not just create a script that filters out the items you're not interested in is because we want to be able to import from `my.ip.all` or `my.location.all` from other modules and get the filtered results, without having to mix data filtering logic with parsing/loading/caching (the stuff HPI does) """ import json import functools from collections import defaultdict from typing import TypeVar, Set, Any, Mapping, Iterator, Dict, List from pathlib import Path import click from more_itertools import seekable from my.core.serialize import dumps from my.core.common import PathIsh from my.core.warnings import medium T = TypeVar("T") DenyMap = Mapping[str, Set[Any]] def _default_key_func(obj: T) -> str: return str(obj) class DenyList: def __init__(self, denylist_file: PathIsh): self.file = Path(denylist_file).expanduser().absolute() self._deny_raw_list: List[Dict[str, Any]] = [] self._deny_map: DenyMap = defaultdict(set) # deny cli, user can override these self.fzf_path = None self._fzf_options = () self._deny_cli_key_func = None def _load(self) -> None: if not self.file.exists(): medium(f"denylist file {self.file} does not exist") return deny_map: DenyMap = defaultdict(set) data: List[Dict[str, Any]]= json.loads(self.file.read_text()) self._deny_raw_list = data for ignore in data: for k, v in ignore.items(): deny_map[k].add(v) self._deny_map = deny_map def load(self) -> DenyMap: self._load() return self._deny_map def write(self) -> None: if not self._deny_raw_list: medium("no denylist data to write") return self.file.write_text(json.dumps(self._deny_raw_list)) @classmethod def _is_json_primitive(cls, val: Any) -> bool: return isinstance(val, (str, int, float, bool, type(None))) @classmethod def _stringify_value(cls, val: Any) -> Any: # if it's a primitive, just return it if cls._is_json_primitive(val): return val # otherwise, stringify-and-back so we can compare to # json data loaded from the denylist file return json.loads(dumps(val)) @classmethod def _allow(cls, obj: T, deny_map: DenyMap) -> bool: for deny_key, deny_set in deny_map.items(): # this should be done separately and not as part of the getattr # because 'null'/None could actually be a value in the denylist, # and the user may define behavior to filter that out if not hasattr(obj, deny_key): return False val = cls._stringify_value(getattr(obj, deny_key)) # this object doesn't have have the attribute in the denylist if val in deny_set: return False # if we tried all the denylist keys and didn't return False, # then this object is allowed return True def filter( self, itr: Iterator[T], invert: bool = False, ) -> Iterator[T]: denyf = functools.partial(self._allow, deny_map=self.load()) if invert: return filter(lambda x: not denyf(x), itr) return filter(denyf, itr) def deny(self, key: str, value: Any, write: bool = False) -> None: ''' add a key/value pair to the denylist ''' if not self._deny_raw_list: self._load() self._deny_raw({key: self._stringify_value(value)}, write=write) def _deny_raw(self, data: Dict[str, Any], write: bool = False) -> None: self._deny_raw_list.append(data) if write: self.write() def _prompt_keys(self, item: T) -> str: import pprint click.echo(pprint.pformat(item)) # TODO: extract keys from item by checking if its dataclass/NT etc.? resp = click.prompt("Key to deny on").strip() if not hasattr(item, resp): click.echo(f"Could not find key '{resp}' on item", err=True) return self._prompt_keys(item) return resp def _deny_cli_remember( self, items: Iterator[T], mem: Dict[str, T], ) -> Iterator[str]: keyf = self._deny_cli_key_func or _default_key_func # i.e., convert each item to a string, and map str -> item for item in items: key = keyf(item) mem[key] = item yield key def deny_cli(self, itr: Iterator[T]) -> None: from pyfzf import FzfPrompt # wrap in seekable so we can use it multiple times # progressively caches the items as we iterate over them sit = seekable(itr) prompt_continue = True while prompt_continue: # reset the iterator sit.seek(0) # so we can map the selected string from fzf back to the original objects memory_map: Dict[str, T] = {} picker = FzfPrompt( executable_path=self.fzf_path, default_options="--no-multi" ) picked_l = picker.prompt( self._deny_cli_remember(itr, memory_map), "--read0", *self._fzf_options, delimiter="\0", ) assert isinstance(picked_l, list) if picked_l: picked: T = memory_map[picked_l[0]] key = self._prompt_keys(picked) self.deny(key, getattr(picked, key), write=True) click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True) else: click.echo("No item selected", err=True) prompt_continue = click.confirm("Continue?")