add denylist implementation
This commit is contained in:
parent
ce2a42c45a
commit
2fa0b5cef8
5 changed files with 279 additions and 5 deletions
273
my/core/denylist.py
Normal file
273
my/core/denylist.py
Normal file
|
@ -0,0 +1,273 @@
|
|||
"""
|
||||
TODO: move this to doc/DENYLIST ?
|
||||
|
||||
A helper module for defining denylists for sources programatically
|
||||
(in lamens terms, this lets you remove some output from a module you don't want)
|
||||
|
||||
Lets you specify a class, an attribute to match on,
|
||||
and a json file containing a list of values to deny/filter out
|
||||
|
||||
As an example, for a class like this:
|
||||
|
||||
class IP(NamedTuple):
|
||||
ip: str
|
||||
dt: datetime
|
||||
|
||||
A possible denylist file would contain:
|
||||
|
||||
[
|
||||
{
|
||||
"ip": "192.168.1.1",
|
||||
},
|
||||
{
|
||||
"dt": "2020-06-02T03:12:00+00:00",
|
||||
}
|
||||
]
|
||||
|
||||
Note that if the value being compared to is not a single (non-array/object) JSON primitive
|
||||
(str, int, float, bool, None), it will be converted to a string before comparison
|
||||
|
||||
To use this in code:
|
||||
|
||||
```
|
||||
from my.ip.all import ips
|
||||
filtered = DenyList("~/data/ip_denylist.json").filter(ips())
|
||||
```
|
||||
|
||||
To add items to the denylist, in python (in a one-off script):
|
||||
|
||||
```
|
||||
from my.ip.all import ips
|
||||
from my.core.denylist import DenyList
|
||||
|
||||
d = DenyList("~/data/ip_denylist.json")
|
||||
|
||||
for ip in ips():
|
||||
# some custom code you define
|
||||
if ip.ip == ...:
|
||||
d.deny(key="ip", value=ip.ip)
|
||||
d.write()
|
||||
```
|
||||
|
||||
... or interactively, which requires `fzf` to be installed, after running
|
||||
|
||||
```
|
||||
from my.ip.all import ips
|
||||
from my.core.denylist import DenyList
|
||||
|
||||
d = DenyList("~/data/ip_denylist.json")
|
||||
d.deny_cli(ips())
|
||||
d.write()
|
||||
```
|
||||
|
||||
This is meant for relatively simple filters, where you want to filter out
|
||||
based on a single attribute of a namedtuple/dataclass. If you want to do something
|
||||
more complex, I would recommend overriding the all.py file for that source and
|
||||
writing your own filter function there.
|
||||
|
||||
For more info on all.py:
|
||||
https://github.com/karlicoss/HPI/blob/master/doc/MODULE_DESIGN.org#allpy
|
||||
|
||||
This would typically be used in an overriden all.py file, or in a one-off script
|
||||
which you may want to filter out some items from a source, progressively adding more
|
||||
items to the denylist as you go.
|
||||
|
||||
A potential my/ip/all.py file might look like:
|
||||
|
||||
```
|
||||
from typing import Iterator
|
||||
|
||||
from my.ip.common import IP # type: ignore[import]
|
||||
from my.core.denylist import DenyList
|
||||
|
||||
deny = DenyList("~/data/ip_denylist.json")
|
||||
|
||||
def ips() -> Iterator[IP]:
|
||||
from my.ip import discord
|
||||
|
||||
yield from deny.filter(discord.ips())
|
||||
```
|
||||
|
||||
|
||||
To add items to the denylist, you could create a __main__.py file, or:
|
||||
|
||||
```
|
||||
python3 -c 'from my.ip import all; all.deny.deny_cli(all.ips())'
|
||||
```
|
||||
|
||||
Sidenote: the reason why we want to specifically override
|
||||
the all.py and not just create a script that filters out the items you're
|
||||
not interested in is because we want to be able to import from `my.ip.all`
|
||||
or `my.location.all` from other modules and get the filtered results, without
|
||||
having to mix data filtering logic with parsing/loading/caching (the stuff HPI does)
|
||||
"""
|
||||
|
||||
# https://github.com/seanbreckenridge/pyfzf
|
||||
REQUIRES = ["pyfzf_iter"]
|
||||
|
||||
import json
|
||||
import functools
|
||||
from collections import defaultdict
|
||||
from typing import TypeVar, Set, Any, Mapping, Iterator, Dict, List
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
from more_itertools import seekable
|
||||
from my.core.serialize import dumps
|
||||
from my.core.common import PathIsh
|
||||
from my.core.warnings import medium
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
DenyMap = Mapping[str, Set[Any]]
|
||||
|
||||
|
||||
def _default_key_func(obj: T) -> str:
|
||||
return str(obj)
|
||||
|
||||
|
||||
class DenyList:
|
||||
def __init__(self, denylist_file: PathIsh):
|
||||
self.file = Path(denylist_file).expanduser().absolute()
|
||||
self._deny_raw_list: List[Dict[str, Any]] = []
|
||||
self._deny_map: DenyMap = defaultdict(set)
|
||||
|
||||
# deny cli, user can override these
|
||||
self.fzf_path = None
|
||||
self._fzf_options = ()
|
||||
self._deny_cli_key_func = None
|
||||
|
||||
def _load(self) -> None:
|
||||
if not self.file.exists():
|
||||
medium(f"denylist file {self.file} does not exist")
|
||||
return
|
||||
|
||||
deny_map: DenyMap = defaultdict(set)
|
||||
data: List[Dict[str, Any]]= json.loads(self.file.read_text())
|
||||
self._deny_raw_list = data
|
||||
|
||||
for ignore in data:
|
||||
for k, v in ignore.items():
|
||||
deny_map[k].add(v)
|
||||
|
||||
self._deny_map = deny_map
|
||||
|
||||
def load(self) -> DenyMap:
|
||||
self._load()
|
||||
return self._deny_map
|
||||
|
||||
def write(self) -> None:
|
||||
if not self._deny_raw_list:
|
||||
medium("no denylist data to write")
|
||||
return
|
||||
self.file.write_text(json.dumps(self._deny_raw_list))
|
||||
|
||||
@classmethod
|
||||
def _is_json_primitive(cls, val: Any) -> bool:
|
||||
return isinstance(val, (str, int, float, bool, type(None)))
|
||||
|
||||
@classmethod
|
||||
def _stringify_value(cls, val: Any) -> Any:
|
||||
# if it's a primitive, just return it
|
||||
if cls._is_json_primitive(val):
|
||||
return val
|
||||
# otherwise, stringify-and-back so we can compare to
|
||||
# json data loaded from the denylist file
|
||||
return json.loads(dumps(val))
|
||||
|
||||
@classmethod
|
||||
def _allow(cls, obj: T, deny_map: DenyMap) -> bool:
|
||||
for deny_key, deny_set in deny_map.items():
|
||||
# this should be done separately and not as part of the getattr
|
||||
# because 'null'/None could actually be a value in the denylist,
|
||||
# and the user may define behavior to filter that out
|
||||
if not hasattr(obj, deny_key):
|
||||
return False
|
||||
val = cls._stringify_value(getattr(obj, deny_key))
|
||||
# this object doesn't have have the attribute in the denylist
|
||||
if val in deny_set:
|
||||
return False
|
||||
# if we tried all the denylist keys and didn't return False,
|
||||
# then this object is allowed
|
||||
return True
|
||||
|
||||
def filter(
|
||||
self,
|
||||
itr: Iterator[T],
|
||||
invert: bool = False,
|
||||
) -> Iterator[T]:
|
||||
denyf = functools.partial(self._allow, deny_map=self.load())
|
||||
if invert:
|
||||
return filter(lambda x: not denyf(x), itr)
|
||||
return filter(denyf, itr)
|
||||
|
||||
def deny(self, key: str, value: Any, write: bool = False) -> None:
|
||||
'''
|
||||
add a key/value pair to the denylist
|
||||
'''
|
||||
if not self._deny_raw_list:
|
||||
self._load()
|
||||
self._deny_raw({key: self._stringify_value(value)}, write=write)
|
||||
|
||||
def _deny_raw(self, data: Dict[str, Any], write: bool = False) -> None:
|
||||
self._deny_raw_list.append(data)
|
||||
if write:
|
||||
self.write()
|
||||
|
||||
def _prompt_keys(self, item: T) -> str:
|
||||
import pprint
|
||||
|
||||
click.echo(pprint.pformat(item))
|
||||
# TODO: extract keys from item by checking if its dataclass/NT etc.?
|
||||
resp = click.prompt("Key to deny on").strip()
|
||||
if not hasattr(item, resp):
|
||||
click.echo(f"Could not find key '{resp}' on item", err=True)
|
||||
return self._prompt_keys(item)
|
||||
return resp
|
||||
|
||||
def _deny_cli_remember(
|
||||
self,
|
||||
items: Iterator[T],
|
||||
mem: Dict[str, T],
|
||||
) -> Iterator[str]:
|
||||
keyf = self._deny_cli_key_func or _default_key_func
|
||||
# i.e., convert each item to a string, and map str -> item
|
||||
for item in items:
|
||||
key = keyf(item)
|
||||
mem[key] = item
|
||||
yield key
|
||||
|
||||
def deny_cli(self, itr: Iterator[T]) -> None:
|
||||
from pyfzf import FzfPrompt
|
||||
|
||||
# wrap in seekable so we can use it multiple times
|
||||
# progressively caches the items as we iterate over them
|
||||
sit = seekable(itr)
|
||||
|
||||
prompt_continue = True
|
||||
|
||||
while prompt_continue:
|
||||
# reset the iterator
|
||||
sit.seek(0)
|
||||
# so we can map the selected string from fzf back to the original objects
|
||||
memory_map: Dict[str, T] = {}
|
||||
picker = FzfPrompt(
|
||||
executable_path=self.fzf_path, default_options="--no-multi"
|
||||
)
|
||||
picked_l = picker.prompt(
|
||||
self._deny_cli_remember(itr, memory_map),
|
||||
"--read0",
|
||||
*self._fzf_options,
|
||||
delimiter="\0",
|
||||
)
|
||||
assert isinstance(picked_l, list)
|
||||
if picked_l:
|
||||
picked: T = memory_map[picked_l[0]]
|
||||
key = self._prompt_keys(picked)
|
||||
self.deny(key, getattr(picked, key), write=True)
|
||||
click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True)
|
||||
else:
|
||||
click.echo("No item selected", err=True)
|
||||
|
||||
prompt_continue = click.confirm("Continue?")
|
Loading…
Add table
Add a link
Reference in a new issue