HPI/my/core/denylist.py
2023-02-21 16:11:39 -08:00

275 lines
8.3 KiB
Python

"""
TODO: move this to doc/DENYLIST ?
A helper module for defining denylists for sources programatically
(in lamens terms, this lets you remove some output from a module you don't want)
Lets you specify a class, an attribute to match on,
and a json file containing a list of values to deny/filter out
As an example, for a class like this:
class IP(NamedTuple):
ip: str
dt: datetime
A possible denylist file would contain:
[
{
"ip": "192.168.1.1",
},
{
"dt": "2020-06-02T03:12:00+00:00",
}
]
Note that if the value being compared to is not a single (non-array/object) JSON primitive
(str, int, float, bool, None), it will be converted to a string before comparison
To use this in code:
```
from my.ip.all import ips
filtered = DenyList("~/data/ip_denylist.json").filter(ips())
```
To add items to the denylist, in python (in a one-off script):
```
from my.ip.all import ips
from my.core.denylist import DenyList
d = DenyList("~/data/ip_denylist.json")
for ip in ips():
# some custom code you define
if ip.ip == ...:
d.deny(key="ip", value=ip.ip)
d.write()
```
... or interactively, which requires `fzf` to be installed, after running
```
from my.ip.all import ips
from my.core.denylist import DenyList
d = DenyList("~/data/ip_denylist.json")
d.deny_cli(ips())
d.write()
```
This is meant for relatively simple filters, where you want to filter out
based on a single attribute of a namedtuple/dataclass. If you want to do something
more complex, I would recommend overriding the all.py file for that source and
writing your own filter function there.
For more info on all.py:
https://github.com/karlicoss/HPI/blob/master/doc/MODULE_DESIGN.org#allpy
This would typically be used in an overriden all.py file, or in a one-off script
which you may want to filter out some items from a source, progressively adding more
items to the denylist as you go.
A potential my/ip/all.py file might look like:
```
from typing import Iterator
from my.ip.common import IP # type: ignore[import]
from my.core.denylist import DenyList
deny = DenyList("~/data/ip_denylist.json")
def ips() -> Iterator[IP]:
from my.ip import discord
yield from deny.filter(discord.ips())
```
To add items to the denylist, you could create a __main__.py file, or:
```
python3 -c 'from my.ip import all; all.deny.deny_cli(all.ips())'
```
Sidenote: the reason why we want to specifically override
the all.py and not just create a script that filters out the items you're
not interested in is because we want to be able to import from `my.ip.all`
or `my.location.all` from other modules and get the filtered results, without
having to mix data filtering logic with parsing/loading/caching (the stuff HPI does)
"""
import sys
import json
import functools
from collections import defaultdict
from typing import TypeVar, Set, Any, Mapping, Iterator, Dict, List
from pathlib import Path
import click
from more_itertools import seekable
from my.core.serialize import dumps
from my.core.common import PathIsh
from my.core.warnings import medium
T = TypeVar("T")
DenyMap = Mapping[str, Set[Any]]
def _default_key_func(obj: T) -> str:
return str(obj)
class DenyList:
def __init__(self, denylist_file: PathIsh):
self.file = Path(denylist_file).expanduser().absolute()
self._deny_raw_list: List[Dict[str, Any]] = []
self._deny_map: DenyMap = defaultdict(set)
# deny cli, user can override these
self.fzf_path = None
self._fzf_options = ()
self._deny_cli_key_func = None
def _load(self) -> None:
if not self.file.exists():
medium(f"denylist file {self.file} does not exist")
return
deny_map: DenyMap = defaultdict(set)
data: List[Dict[str, Any]]= json.loads(self.file.read_text())
self._deny_raw_list = data
for ignore in data:
for k, v in ignore.items():
deny_map[k].add(v)
self._deny_map = deny_map
def load(self) -> DenyMap:
self._load()
return self._deny_map
def write(self) -> None:
if not self._deny_raw_list:
medium("no denylist data to write")
return
self.file.write_text(json.dumps(self._deny_raw_list))
@classmethod
def _is_json_primitive(cls, val: Any) -> bool:
return isinstance(val, (str, int, float, bool, type(None)))
@classmethod
def _stringify_value(cls, val: Any) -> Any:
# if it's a primitive, just return it
if cls._is_json_primitive(val):
return val
# otherwise, stringify-and-back so we can compare to
# json data loaded from the denylist file
return json.loads(dumps(val))
@classmethod
def _allow(cls, obj: T, deny_map: DenyMap) -> bool:
for deny_key, deny_set in deny_map.items():
# this should be done separately and not as part of the getattr
# because 'null'/None could actually be a value in the denylist,
# and the user may define behavior to filter that out
if not hasattr(obj, deny_key):
return False
val = cls._stringify_value(getattr(obj, deny_key))
# this object doesn't have have the attribute in the denylist
if val in deny_set:
return False
# if we tried all the denylist keys and didn't return False,
# then this object is allowed
return True
def filter(
self,
itr: Iterator[T],
invert: bool = False,
) -> Iterator[T]:
denyf = functools.partial(self._allow, deny_map=self.load())
if invert:
return filter(lambda x: not denyf(x), itr)
return filter(denyf, itr)
def deny(self, key: str, value: Any, write: bool = False) -> None:
'''
add a key/value pair to the denylist
'''
if not self._deny_raw_list:
self._load()
self._deny_raw({key: self._stringify_value(value)}, write=write)
def _deny_raw(self, data: Dict[str, Any], write: bool = False) -> None:
self._deny_raw_list.append(data)
if write:
self.write()
def _prompt_keys(self, item: T) -> str:
import pprint
click.echo(pprint.pformat(item))
# TODO: extract keys from item by checking if its dataclass/NT etc.?
resp = click.prompt("Key to deny on").strip()
if not hasattr(item, resp):
click.echo(f"Could not find key '{resp}' on item", err=True)
return self._prompt_keys(item)
return resp
def _deny_cli_remember(
self,
items: Iterator[T],
mem: Dict[str, T],
) -> Iterator[str]:
keyf = self._deny_cli_key_func or _default_key_func
# i.e., convert each item to a string, and map str -> item
for item in items:
key = keyf(item)
mem[key] = item
yield key
def deny_cli(self, itr: Iterator[T]) -> None:
try:
from pyfzf import FzfPrompt
except ImportError:
click.echo("pyfzf is required to use the denylist cli, run 'python3 -m pip install pyfzf_iter'", err=True)
sys.exit(1)
# wrap in seekable so we can use it multiple times
# progressively caches the items as we iterate over them
sit = seekable(itr)
prompt_continue = True
while prompt_continue:
# reset the iterator
sit.seek(0)
# so we can map the selected string from fzf back to the original objects
memory_map: Dict[str, T] = {}
picker = FzfPrompt(
executable_path=self.fzf_path, default_options="--no-multi"
)
picked_l = picker.prompt(
self._deny_cli_remember(itr, memory_map),
"--read0",
*self._fzf_options,
delimiter="\0",
)
assert isinstance(picked_l, list)
if picked_l:
picked: T = memory_map[picked_l[0]]
key = self._prompt_keys(picked)
self.deny(key, getattr(picked, key), write=True)
click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True)
else:
click.echo("No item selected", err=True)
prompt_continue = click.confirm("Continue?")