From feb8d5ff82c358c6ee1355134784d54696ffbdd8 Mon Sep 17 00:00:00 2001 From: Sean Breckenridge Date: Tue, 23 Mar 2021 11:36:43 -0700 Subject: [PATCH] initial my.core.query implementation --- my/core/query.py | 386 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 386 insertions(+) create mode 100644 my/core/query.py diff --git a/my/core/query.py b/my/core/query.py new file mode 100644 index 0000000..4db3afb --- /dev/null +++ b/my/core/query.py @@ -0,0 +1,386 @@ +""" +This lets you query, order, sort and filter items from one or more sources + +The main entrypoint to this library is the 'select' function below; try: +python3 -c "from my.core.query import select; help(select)" +""" + +import re +import dataclasses +import importlib +import inspect +import itertools +from datetime import datetime, date, timedelta +from typing import TypeVar, Tuple, Optional, Union, Callable, Iterable, Iterator, Dict, Any + +import more_itertools + +from .warnings import low +from .common import is_namedtuple +from .error import Res, unwrap +from .warnings import low + + +T = TypeVar("T") +ET = Res[T] + + +# e.g. ("my.reddit", "comments") +Locator = Tuple[str, str] +U = TypeVar("U") +# In a perfect world, the return value from a OrderFunc would just be U, +# not Optional[U]. However, since this has to deal with so many edge +# cases, theres a possibility that the functions generated by +# _generate_order_by_func can't find an attribute +OrderFunc = Callable[[ET], Optional[U]] +Where = Callable[[ET], bool] + +DateLike = Union[datetime, date] + + +class QueryException(KeyError): + """Used to differentiate query-related errors, so the CLI interface is more expressive""" + pass + + +def locate_function(module_name: str, function_name: str) -> Callable[[], Iterable[ET]]: + """ + Given a module name and a function, returns the corresponding function. + Since we're in the query module, it is assumed that this returns an + iterable of objects of some kind, which we want to query over, though + that isn't required + """ + try: + mod = importlib.import_module(module_name) + for (fname, func) in inspect.getmembers(mod, inspect.isfunction): + if fname == function_name: + return func + except Exception as e: + raise QueryException(str(e)) + raise QueryException(f"Could not find function {function_name} in {module_name}") + + +timedelta_regex = re.compile(r"^((?P[\.\d]+?)d)?((?P[\.\d]+?)h)?((?P[\.\d]+?)m)?((?P[\.\d]+?)s)?$") + + +# https://stackoverflow.com/a/51916936 +def parse_timedelta_string(timedelta_str: str) -> timedelta: + """ + This uses a syntax similar to the 'GNU sleep' command + e.g.: 10d5h10m50s means '10 days, 5 hours, 10 minutes, 50 seconds' + """ + parts = timedelta_regex.match(timedelta_str) + if parts is None: + raise ValueError(f"Could not parse time duration from {timedelta_str}.\nValid examples: '8h', '2d8h5m20s', '2m4s'") + time_params = {name: float(param) for name, param in parts.groupdict().items() if param} + return timedelta(**time_params) # type: ignore[arg-type] + + + +def _generate_order_by_func( + obj_res: Res[T], + key: Optional[str] = None, + where_function: Optional[Where] = None, + default: Optional[U] = None +) -> Optional[OrderFunc]: + """ + Accepts an object Res[T] (Instance of some class or Exception) + + If its an error, the generated function returns None + + Most of the time, you'd want to provide at least a 'key', a 'where_function' or a 'default'. + You can provide both a 'where_function' and a default, or a 'key' and a default, + incase the 'where_function' doesn't work for a particular type/you hit an error + + If a 'default' is provided, it is used for Exceptions and if an + OrderFunc function could not be determined for this type + + If a key is given (the user specified which attribute), the function + returns that key from the object + tries to find that key on the object + + Attempts to find an attribute which matches the 'where_function' on the object, + using some getattr/dict checks. Returns a function which when called with + this object returns the value to order by + """ + if isinstance(obj_res, Exception): + if default is not None: + return lambda _o: default + else: + low(f"""While creating order_by function, encountered exception {obj_res} +Value to order_by unknown, provide a 'default', filter exceptons with a 'where' predicate or +pass 'drop_errors' to ignore this""") + return lambda _o: None + + # shouldn't raise an error, as we return above if its an exception + obj: T = unwrap(obj_res) + + if key is not None: + + # in these cases, if your key existed on the initial Res[E] (instance that was passed to + # _generate_order_by_func and generates the OrderFunc) + # to run, but doesn't on others, it will return None in those cases + # If the interface to your ADT is not standard or very sparse, its better + # that you manually write an OrderFunc which + # handles the edge cases, or provide a default + # See tests for an example + # TODO: write test + if isinstance(obj, dict): + if key in obj: # acts as predicate instead of where_function + return lambda o: o.get(key, default) # type: ignore[union-attr] + else: + if hasattr(obj, key): + return lambda o: getattr(o, key, default) # type: ignore[arg-type] + + # Note: if the attribute you're ordering by is an Optional type, + # and on some objects it'll return None, the getattr(o, field_name, default) won't + # use the default, since it finds the attribute (it just happens to be set to None) + # should this do something like: 'lambda o: getattr(o, k, default) or default' + # that would fix the case, but is additional work. Perhaps the user should instead + # write a 'where' function, to check for that 'isinstance' on an Optional field, + # and not include those objects in the src iterable + + # user must provide either a key or a where predicate + if where_function is not None: + if isinstance(obj, dict): + for k, v in obj.items(): + if where_function(v): + return lambda o: o.get(k, default) # type: ignore[union-attr] + elif dataclasses.is_dataclass(obj): + for (field_name, _annotation) in obj.__annotations__.items(): + if where_function(getattr(obj, field_name)): + return lambda o: getattr(o, field_name, default) + elif is_namedtuple(obj): + assert hasattr(obj, '_fields'), "Could not find '_fields' on attribute which is assumed to be a NamedTuple" + for field_name in getattr(obj, '_fields'): + if where_function(getattr(obj, field_name)): + return lambda o: getattr(o, field_name, default) + # try using inpsect.getmembers (like 'dir()') even if the dataclass/NT checks failed, + # since the attribute one is searching for might be a @property + for k, v in inspect.getmembers(obj): + if where_function(v): + return lambda o: getattr(o, k, default) + + if default is not None: + # warn here? it seems like you typically wouldn't want to just set the order by to + # the same value everywhere, but maybe you did this on purpose? + return lambda _o: default + + return None # couldn't compute a OrderFunc for this class/instance + + +def _drop_errors(itr: Iterator[ET]) -> Iterator[T]: + """Return non-errors from the iterable""" + for o in itr: + if isinstance(o, Exception): + continue + yield o + +def _raise_errors(itr: Iterable[ET]) -> Iterator[T]: + """Raise errors from the iterable, stops the select function""" + for o in itr: + if isinstance(o, Exception): + raise o + yield o + + +# currently using the 'key set' as a proxy for 'this is the same type of thing' +def _determine_order_by_value_key(obj_res: ET) -> Any: + """ + Returns either the class, or the a tuple of the dictionary keys + """ + key = obj_res.__class__ + if key == dict: + # assuming same keys signify same way to determine ordering + return tuple(obj_res.keys()) # type: ignore[union-attr] + return key + + +def select( + src: Union[Locator, Iterable[ET], Callable[[], Iterable[ET]]], + *, + where: Optional[Where] = None, + order_by: Optional[OrderFunc] = None, + order_key: Optional[str] = None, + order_value: Optional[Where] = None, + default: Optional[U] = None, + reverse: bool = False, + limit: Optional[int] = None, + drop_errors: bool = False, + raise_errors: bool = False, +) -> Iterator[ET]: + """ + A function to query, order, sort and filter items from one or more sources + This supports iterables and lists of mixed types (including handling errors), + by allowing you to provide custom predicates (functions) which can sort + by a function, an attribute, dict key, or by the attributes values. + + Since this supports mixed types, theres always a possibility + of KeyErrors or AttributeErrors while trying to find some value to order by, + so this provides multiple mechanisms to deal with that + + 'where' lets you filter items before ordering, to remove possible errors + or filter the iterator by some condition + + There are multiple ways to instruct select on how to order items. The most + flexible is to provide an 'order_by' function, which takes an item in the + iterator, does any custom checks you may want and then returns the value to sort by + + 'order_key' is best used on items which have a similar structure, or have + the same attribute name for every item in the iterator. If you have a + iterator of objects whose datetime is accessed by the 'timestamp' attribute, + supplying order_key='timestamp' would sort by that (dictionary or attribute) key + + 'order_value' is the most confusing, but often the most useful. Instead of + testing against the keys of an item, this allows you to write a predicate + (function) to test against its values (dictionary, NamedTuple, dataclass, object). + If you had an iterator of mixed types and wanted to sort by the datetime, + but the attribute to access the datetime is different on each type, you can + provide `order_value=lambda v: isinstance(v, datetime)`, and this will + try to find that value for each type in the iterator, to sort it by + the value which is recieved when the predicate is true + + 'order_value' is often used in the 'hpi query' interface, because of its brevity. + Just given the input function, this can typically sort it by timestamp with + no human intervention. It can sort of be thought as an educated guess, + but it can always be improved by providing a more complete guess function + + Note that 'order_value' is also the most computationally expensive, as it has + to copy the iterator in memory (using itertools.tee) to determine how to order it + in memory + + The 'drop_errors' and 'raise_errors' let you ignore or raise when the src contain errors + + src: a locator to import a function from, an iterable of mixed types, + or a function to be called, as the input to this function + + where: a predicate which filters the results before sorting + + order_by: a function which when given an item in the src, + returns the value to sort by. Similar to the 'key' value + tpically passed directly to 'sorted' + + order_key: a string which represents a dict key or attribute name + to use as they key to sort by + + order_value: predicate which determines which attribute on an ADT-like item to sort by, + when given its value. lambda o: isinstance(o, datetime) is commonly passed to sort + by datetime, without knowing the attributes or interface for the items in the src + + default: while ordering, if the order for an object cannot be determined, + use this as the default value + + reverse: reverse the order of the resulting iterable + + limit: limit the results to this many items + + drop_errors: ignore any errors from the src + + raise_errors: raise errors when recieved from the input src + """ + + it: Iterable[ET] = [] # default + # check if this is a locator + if type(src) == tuple and len(src) == 2: # type: ignore[arg-type] + it = locate_function(src[0], src[1])() # type: ignore[index] + elif callable(src): + # hopefully this returns an iterable and not something that causes a bunch of lag when its called? + # should typically not be the common case, but giving the option to + # provide a function as input anyways + it = src() + else: + # assume it is already an iterable + if not isinstance(src, Iterable): + low(f"""Input was neither a locator for a function, or a function itself. +Expected 'src' to be an Iterable, but found {type(src).__name__}... +Will attempt to call iter() on the value""") + it = src + + # try/catch an explicit iter() call to making this an Iterator, + # to validate the input as something other helpers here can work with, + # else raise a QueryException + try: + itr: Iterator[ET] = iter(it) + except TypeError as t: + raise QueryException("Could not convert input src to an Iterator: " + str(t)) + + # if both drop_errors and raise_errors are provided for some reason, + # should raise errors before dropping them + if raise_errors: + itr = _raise_errors(itr) + + if drop_errors: + itr = _drop_errors(itr) + + if where is not None: + itr = filter(where, itr) + + if order_by is not None or order_key is not None or order_value is not None: + # we have some sort of input that specifies we should reorder the iterator + + order_by_chosen: Optional[OrderFunc] = order_by # if the user just supplied a function themselves + if order_by is None: + # https://more-itertools.readthedocs.io/en/stable/api.html#more_itertools.spy + [first_item], itrc = more_itertools.spy(itr) + # replace the 'itr' in the higher scope with itrc -- itr is consumed by more_itertools.spy + itr = itrc + # try to use a key, if it was supplied + # order_key doesn't use local state - it just tries to find the passed + # attribute, or default to the 'default' value. As mentioned above, + # best used for items with a similar structure + if order_key is not None: + order_by_chosen = _generate_order_by_func(first_item, key=order_key, default=default) + if order_by_chosen is None: + raise QueryException(f"Error while ordering: could not find {order_key} on {first_item}") + elif order_value is not None: + itr1, itr2 = itertools.tee(itr, 2) # expensive!!! + # TODO: add a kwarg to force lookup for every item? would sort of be like core.common.guess_datetime then + order_by_lookup: Dict[Any, OrderFunc] = {} + + # need to go through a copy of the whole iterator here to + # pre-generate functions to support sorting mixed types + for obj_res in itr1: + key: Any = _determine_order_by_value_key(obj_res) + if key not in order_by_lookup: + keyfunc: Optional[OrderFunc] = _generate_order_by_func(obj_res, where_function=order_value, default=default) + if keyfunc is None: + raise QueryException(f"Error while ordering: could not determine how to order {obj_res}") + order_by_lookup[key] = keyfunc + + # set the 'itr' (iterator in higher scope) + # to the copy (itertools.tee) of the iterator we haven't used yet + itr = itr2 + + # todo: cache results from above _determine_order_by_value_key call and use here somehow? + # would require additional state + # order_by_lookup[_determine_order_by_value_key(o)] returns a function which + # accepts o, and returns the value which sorted can use to order this by + order_by_chosen = lambda o: order_by_lookup[_determine_order_by_value_key(o)](o) + + # run the sort, with the computed order by function + itr = iter(sorted(itr, key=order_by_chosen, reverse=reverse)) # type: ignore[arg-type] + else: + # if not already done in the order_by block, reverse if specified + if reverse: + itr = more_itertools.always_reversible(itr) + + # apply limit argument + if limit is not None: + return itertools.islice(itr, limit) + + return itr + + + +def test_parse_timedelta_string(): + + import pytest + + with pytest.raises(ValueError) as v: + parse_timedelta_string("5xxx") + + assert v is not None + assert str(v.value).startswith("Could not parse time duration from") + + res = parse_timedelta_string("10d5h10m50s") + assert res == timedelta(days=10.0, hours=5.0, minutes=10.0, seconds=50.0)