my.core.query handle unsortable types, add tests

This commit is contained in:
Sean Breckenridge 2021-03-27 15:45:56 -07:00
parent 33b7ca0aac
commit d6f32ce787

View file

@ -11,7 +11,7 @@ import importlib
import inspect
import itertools
from datetime import datetime, date, timedelta
from typing import TypeVar, Tuple, Optional, Union, Callable, Iterable, Iterator, Dict, Any
from typing import TypeVar, Tuple, Optional, Union, Callable, Iterable, Iterator, Dict, Any, NamedTuple, List
import more_itertools
@ -38,6 +38,12 @@ Where = Callable[[ET], bool]
DateLike = Union[datetime, date]
# the generated OrderFunc couldn't handle sorting this
class Unsortable(NamedTuple):
obj: Any
class QueryException(KeyError):
"""Used to differentiate query-related errors, so the CLI interface is more expressive"""
pass
@ -60,23 +66,6 @@ def locate_function(module_name: str, function_name: str) -> Callable[[], Iterab
raise QueryException(f"Could not find function {function_name} in {module_name}")
timedelta_regex = re.compile(r"^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$")
# https://stackoverflow.com/a/51916936
def parse_timedelta_string(timedelta_str: str) -> timedelta:
"""
This uses a syntax similar to the 'GNU sleep' command
e.g.: 10d5h10m50s means '10 days, 5 hours, 10 minutes, 50 seconds'
"""
parts = timedelta_regex.match(timedelta_str)
if parts is None:
raise ValueError(f"Could not parse time duration from {timedelta_str}.\nValid examples: '8h', '2d8h5m20s', '2m4s'")
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params) # type: ignore[arg-type]
def _generate_order_by_func(
obj_res: Res[T],
key: Optional[str] = None,
@ -90,7 +79,7 @@ def _generate_order_by_func(
Most of the time, you'd want to provide at least a 'key', a 'where_function' or a 'default'.
You can provide both a 'where_function' and a default, or a 'key' and a default,
incase the 'where_function' doesn't work for a particular type/you hit an error
in case the 'where_function' doesn't work for a particular type/you hit an error
If a 'default' is provided, it is used for Exceptions and if an
OrderFunc function could not be determined for this type
@ -107,9 +96,11 @@ def _generate_order_by_func(
if default is not None:
return lambda _o: default
else:
low(f"""While creating order_by function, encountered exception {obj_res}
Value to order_by unknown, provide a 'default', filter exceptons with a 'where' predicate or
pass 'drop_errors' to ignore this""")
# perhaps this should be removed? as errors are now silently wrapped into Unsortable
# then again, its not strange that a src returning an error should warn, just don't cause a fatal error
low(f"""While creating order_by function, encountered exception '{type(obj_res)}: {obj_res}'
Value to order_by unknown, provide a 'default', filter exceptions with a 'where' predicate or
pass 'drop_exceptions' to ignore exceptions""")
return lambda _o: None
# shouldn't raise an error, as we return above if its an exception
@ -155,7 +146,7 @@ pass 'drop_errors' to ignore this""")
for field_name in getattr(obj, '_fields'):
if where_function(getattr(obj, field_name)):
return lambda o: getattr(o, field_name, default)
# try using inpsect.getmembers (like 'dir()') even if the dataclass/NT checks failed,
# try using inspect.getmembers (like 'dir()') even if the dataclass/NT checks failed,
# since the attribute one is searching for might be a @property
for k, v in inspect.getmembers(obj):
if where_function(v):
@ -169,14 +160,15 @@ pass 'drop_errors' to ignore this""")
return None # couldn't compute a OrderFunc for this class/instance
def _drop_errors(itr: Iterator[ET]) -> Iterator[T]:
def _drop_exceptions(itr: Iterator[ET]) -> Iterator[T]:
"""Return non-errors from the iterable"""
for o in itr:
if isinstance(o, Exception):
continue
yield o
def _raise_errors(itr: Iterable[ET]) -> Iterator[T]:
def _raise_exceptions(itr: Iterable[ET]) -> Iterator[T]:
"""Raise errors from the iterable, stops the select function"""
for o in itr:
if isinstance(o, Exception):
@ -187,7 +179,7 @@ def _raise_errors(itr: Iterable[ET]) -> Iterator[T]:
# currently using the 'key set' as a proxy for 'this is the same type of thing'
def _determine_order_by_value_key(obj_res: ET) -> Any:
"""
Returns either the class, or the a tuple of the dictionary keys
Returns either the class, or a tuple of the dictionary keys
"""
key = obj_res.__class__
if key == dict:
@ -196,6 +188,52 @@ def _determine_order_by_value_key(obj_res: ET) -> Any:
return key
def _drop_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Iterator[ET]:
for o in itr:
if isinstance(o, Unsortable):
continue
ordval = orderfunc(o)
if ordval is None:
continue
yield o
# hmm... is there really not a stdlib/more_itertools function to split an iterable by a predicate?
# similar to 'my.core.error.sort_res_by'?
def _wrap_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Tuple[Iterator[Unsortable], Iterator[ET]]:
unsortable: List[Unsortable] = []
sortable: List[ET] = []
for o in itr:
# if input to select was another select
if isinstance(o, Unsortable):
unsortable.append(o)
continue
ordval = orderfunc(o)
if ordval is None:
unsortable.append(Unsortable(o))
else:
sortable.append(o)
return iter(unsortable), iter(sortable)
# return two iterators, the first being the wrapped unsortable items,
# the second being items for which orderfunc returned a non-none value
def _handle_unsorted(
itr: Iterator[ET],
orderfunc: OrderFunc,
drop_unsorted: bool,
wrap_unsorted: bool
) -> Tuple[Iterator[Unsortable], Iterator[ET]]:
# prefer drop_unsorted to wrap_unsorted, if both were present
if drop_unsorted:
return iter([]), _drop_unsorted(itr, orderfunc)
elif wrap_unsorted:
return _wrap_unsorted(itr, orderfunc)
else:
# neither flag was present
return iter([]), itr
def select(
src: Union[Locator, Iterable[ET], Callable[[], Iterable[ET]]],
*,
@ -206,8 +244,10 @@ def select(
default: Optional[U] = None,
reverse: bool = False,
limit: Optional[int] = None,
drop_errors: bool = False,
raise_errors: bool = False,
drop_unsorted: bool = False,
wrap_unsorted: bool = True,
drop_exceptions: bool = False,
raise_exceptions: bool = False,
) -> Iterator[ET]:
"""
A function to query, order, sort and filter items from one or more sources
@ -238,7 +278,7 @@ def select(
but the attribute to access the datetime is different on each type, you can
provide `order_value=lambda v: isinstance(v, datetime)`, and this will
try to find that value for each type in the iterator, to sort it by
the value which is recieved when the predicate is true
the value which is received when the predicate is true
'order_value' is often used in the 'hpi query' interface, because of its brevity.
Just given the input function, this can typically sort it by timestamp with
@ -249,7 +289,7 @@ def select(
to copy the iterator in memory (using itertools.tee) to determine how to order it
in memory
The 'drop_errors' and 'raise_errors' let you ignore or raise when the src contain errors
The 'drop_exceptions' and 'raise_exceptions' let you ignore or raise when the src contains exceptions
src: a locator to import a function from, an iterable of mixed types,
or a function to be called, as the input to this function
@ -258,7 +298,7 @@ def select(
order_by: a function which when given an item in the src,
returns the value to sort by. Similar to the 'key' value
tpically passed directly to 'sorted'
typically passed directly to 'sorted'
order_key: a string which represents a dict key or attribute name
to use as they key to sort by
@ -274,9 +314,15 @@ def select(
limit: limit the results to this many items
drop_errors: ignore any errors from the src
drop_unsorted: before ordering, drop any items from the iterable for which a
order could not be determined. False by default
raise_errors: raise errors when recieved from the input src
wrap_unsorted: before ordering, wrap any items into an 'Unsortable' object. Place
them at the front of the list. True by default
drop_exceptions: ignore any exceptions from the src
raise_exceptions: raise exceptions when received from the input src
"""
it: Iterable[ET] = [] # default
@ -304,13 +350,13 @@ Will attempt to call iter() on the value""")
except TypeError as t:
raise QueryException("Could not convert input src to an Iterator: " + str(t))
# if both drop_errors and raise_errors are provided for some reason,
# should raise errors before dropping them
if raise_errors:
itr = _raise_errors(itr)
# if both drop_exceptions and drop_exceptions are provided for some reason,
# should raise exceptions before dropping them
if raise_exceptions:
itr = _raise_exceptions(itr)
if drop_errors:
itr = _drop_errors(itr)
if drop_exceptions:
itr = _drop_exceptions(itr)
if where is not None:
itr = filter(where, itr)
@ -332,6 +378,7 @@ Your 'src' may have been empty of the 'where' clause filtered the iterable to no
# order_key doesn't use local state - it just tries to find the passed
# attribute, or default to the 'default' value. As mentioned above,
# best used for items with a similar structure
# note: this could fail if the first item doesn't have a matching attr/key?
if order_key is not None:
order_by_chosen = _generate_order_by_func(first_item, key=order_key, default=default)
if order_by_chosen is None:
@ -361,8 +408,20 @@ Your 'src' may have been empty of the 'where' clause filtered the iterable to no
# accepts o, and returns the value which sorted can use to order this by
order_by_chosen = lambda o: order_by_lookup[_determine_order_by_value_key(o)](o)
assert order_by_chosen is not None
# note: can't just attach sort unsortable values in the same iterable as the
# other items because they don't have any lookups for order_key or functions
# to handle items in the order_by_lookup dictionary
unsortable, itr = _handle_unsorted(itr, order_by_chosen, drop_unsorted, wrap_unsorted)
# run the sort, with the computed order by function
itr = iter(sorted(itr, key=order_by_chosen, reverse=reverse)) # type: ignore[arg-type, type-var]
# re-attach unsortable values to the front/back of the list
if reverse:
itr = itertools.chain(itr, unsortable)
else:
itr = itertools.chain(unsortable, itr)
else:
# if not already done in the order_by block, reverse if specified
if reverse:
@ -375,6 +434,21 @@ Your 'src' may have been empty of the 'where' clause filtered the iterable to no
return itr
timedelta_regex = re.compile(r"^((?P<weeks>[\.\d]+?)w)?((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$")
# https://stackoverflow.com/a/51916936
def parse_timedelta_string(timedelta_str: str) -> timedelta:
"""
This uses a syntax similar to the 'GNU sleep' command
e.g.: 1w5d5h10m50s means '1 week, 5 days, 5 hours, 10 minutes, 50 seconds'
"""
parts = timedelta_regex.match(timedelta_str)
if parts is None:
raise ValueError(f"Could not parse time duration from {timedelta_str}.\nValid examples: '8h', '1w2d8h5m20s', '2m4s'")
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params) # type: ignore[arg-type]
def test_parse_timedelta_string():
@ -386,5 +460,249 @@ def test_parse_timedelta_string():
assert v is not None
assert str(v.value).startswith("Could not parse time duration from")
res = parse_timedelta_string("10d5h10m50s")
assert res == timedelta(days=10.0, hours=5.0, minutes=10.0, seconds=50.0)
res = parse_timedelta_string("1w5d5h10m50s")
assert res == timedelta(days=7.0 + 5.0, hours=5.0, minutes=10.0, seconds=50.0)
# classes to use in tests, need to be defined at the top level
# because of a mypy bug
class _Int(NamedTuple):
x: int
# to test order_key with compatible orderable (int, float) types
class _Float(NamedTuple):
x: float
def test_basic_orders() -> None:
import random
import pytest
def basic_iter() -> Iterator[_Int]:
for v in range(1, 6):
yield _Int(v)
def filter_two(obj: Any) -> bool:
return obj.x != 2
res = list(select(basic_iter(), where=filter_two, reverse=True))
assert len(res) == 4
for (actual, expected) in zip(res, (5, 4, 3, 1)):
assert actual == _Int(expected)
input_items = list(basic_iter())
random.shuffle(input_items)
res = list(select(input_items, order_key="x"))
assert len(res) == 5
for (actual, expected) in zip(res, (1, 2, 3, 4, 5)):
assert actual.x == expected # type: ignore
# default int ordering
def custom_order_by(obj: Any) -> Any:
return getattr(obj, "x")
# sort random ordered list, only return first two items
res = list(select(input_items, where=filter_two, order_by=custom_order_by, limit=2))
assert len(res) == 2
for (actual, expected) in zip(res, (1, 3)):
assert actual == _Int(expected)
# filter produces empty iterator
with pytest.warns(UserWarning, match=r"encountered empty iterable"):
res = list(select(input_items, where=lambda o: o is None, order_key="x"))
assert len(res) == 0
def test_order_key_multi_type() -> None:
def basic_iter() -> Iterator[_Int]:
for v in range(1, 6):
yield _Int(v)
def floaty_iter() -> Iterator[_Float]:
for v in range(1, 6):
yield _Float(float(v + 0.5))
res = list(select(itertools.chain(basic_iter(), floaty_iter()), order_key="x"))
assert len(res) == 10
for (actual, expected) in zip(res, (1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5)):
assert actual.x == expected # type: ignore
def test_couldnt_determine_order() -> None:
import pytest
with pytest.raises(QueryException, match=r"could not determine how to order"):
res = list(select(iter([object()]), order_value=lambda o: isinstance(o, datetime)))
# same value type, different keys, with clashing keys
class _A(NamedTuple):
x: datetime
y: int
z: int
class _B(NamedTuple):
y: datetime
# move these to tests/? They are re-used so much in the tests below,
# not sure where the best place for these is
def _mixed_iter() -> Iterator[Union[_A, _B]]:
yield _A(x=datetime(year=2009, month=5, day=10, hour=4, minute=10, second=1), y=5, z=10)
yield _B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1))
yield _A(x=datetime(year=2005, month=5, day=10, hour=4, minute=10, second=1), y=10, z=2)
yield _A(x=datetime(year=2009, month=3, day=10, hour=4, minute=10, second=1), y=12, z=1)
yield _B(y=datetime(year=1995, month=5, day=10, hour=4, minute=10, second=1))
yield _A(x=datetime(year=2005, month=4, day=10, hour=4, minute=10, second=1), y=2, z=-5)
def _mixed_iter_errors() -> Iterator[Res[Union[_A, _B]]]:
m = _mixed_iter()
for _ in range(3):
yield next(m)
yield RuntimeError("Unhandled error!")
yield from m
def test_order_value() -> None:
default_order = list(_mixed_iter())
# if the value for some attribute on this item is a datetime
sorted_by_datetime = list(select(_mixed_iter(), order_value=lambda o: isinstance(o, datetime)))
assert len(sorted_by_datetime) == 6
mixed_iter_sorted_indexes = (4,5,2,3,0,1)
for val, expected_index in zip(sorted_by_datetime, mixed_iter_sorted_indexes):
assert val == default_order[expected_index]
def test_key_clash() -> None:
import pytest
# clashing keys causes errors if you use order_key
with pytest.raises(TypeError, match=r"not supported between instances of 'datetime.datetime' and 'int'") as te:
list(select(_mixed_iter(), order_key="y"))
def test_wrap_unsortable() -> None:
from collections import Counter
# by default, wrap unsortable
res = list(select(_mixed_iter(), order_key="z"))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "Unsortable": 2})
def test_disabled_wrap_unsorted() -> None:
import pytest
# if disabled manually, should raise error
with pytest.raises(TypeError, match=r"not supported between instances of 'NoneType' and 'int'") as te2:
list(select(_mixed_iter(), order_key="z", wrap_unsorted=False))
def test_drop_unsorted() -> None:
from collections import Counter
# test drop unsortable, should remove them before the 'sorted' call
res = list(select(_mixed_iter(), order_key="z", wrap_unsorted=False, drop_unsorted=True))
assert len(res) == 4
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4})
def test_drop_exceptions() -> None:
assert more_itertools.ilen(_mixed_iter_errors()) == 7
# drop exceptions
res = list(select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime), drop_exceptions=True))
assert len(res) == 6
def test_raise_exceptions() -> None:
import pytest
# raise exceptions
with pytest.raises(RuntimeError) as r:
select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime), raise_exceptions=True)
assert str(r.value) == "Unhandled error!"
def test_wrap_unsortable_with_error_and_warning() -> None:
import pytest
from collections import Counter
# by default should wrap unsortable (error)
with pytest.warns(UserWarning, match=r"encountered exception") as w:
res = list(select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime)))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "_B": 2, "Unsortable": 1})
# compare the returned error wrapped in the Unsortable
returned_error = next(filter(lambda o: isinstance(o, Unsortable), res)).obj # type: ignore[union-attr]
assert "Unhandled error!" == str(returned_error)
def test_order_key_unsortable() -> None:
import pytest
from collections import Counter
# both unsortable and items which dont match the order_by (order_key) in this case should be classified unsorted
res = list(select(_mixed_iter_errors(), order_key="z"))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "Unsortable": 3})
def test_order_default_param() -> None:
# test default, shift items without a datetime to the end using reverse
epoch_time = datetime.fromtimestamp(0)
res = list(select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime), default=epoch_time, reverse=True))
assert len(res) == 7
# should be at the end, because we specified reverse=True
assert str(res[-1]) == "Unhandled error!"
def test_no_recursive_unsortables() -> None:
from collections import Counter
# select to select as input, wrapping unsortables the first time, second should drop them
# reverse=True to send errors to the end, so the below order_key works
res = list(select(_mixed_iter_errors(), order_key="z", reverse=True))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "Unsortable": 3})
# drop_unsorted
dropped = list(select(res, order_key="z", drop_unsorted=True))
for o in dropped:
assert isinstance(o, _A)
assert len(dropped) == 4
# wrap_unsorted -- shouldn't recursively wrap Unsortable
# wrap_unsorted is True by default
wrapped = list(select(res, order_key="z"))
assert len(wrapped) == 7
# make sure other types (exceptions/_B) aren't wrapped twice
for x in wrapped:
if isinstance(x, Unsortable):
assert not isinstance(x.obj, Unsortable)