my.core.query: test styling fixes

This commit is contained in:
Sean Breckenridge 2021-03-27 23:22:38 -07:00
parent 9730951185
commit 36db0f43d8

View file

@ -15,7 +15,6 @@ from typing import TypeVar, Tuple, Optional, Union, Callable, Iterable, Iterator
import more_itertools import more_itertools
from .warnings import low
from .common import is_namedtuple from .common import is_namedtuple
from .error import Res, unwrap from .error import Res, unwrap
from .warnings import low from .warnings import low
@ -207,7 +206,6 @@ def _drop_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Iterator[ET]:
yield o yield o
# hmm... is there really not a stdlib/more_itertools function to split an iterable by a predicate?
# similar to 'my.core.error.sort_res_by'? # similar to 'my.core.error.sort_res_by'?
def _wrap_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Tuple[Iterator[Unsortable], Iterator[ET]]: def _wrap_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Tuple[Iterator[Unsortable], Iterator[ET]]:
unsortable: List[Unsortable] = [] unsortable: List[Unsortable] = []
@ -393,7 +391,7 @@ Your 'src' may have been empty of the 'where' clause filtered the iterable to no
if order_by_chosen is None: if order_by_chosen is None:
raise QueryException(f"Error while ordering: could not find {order_key} on {first_item}") raise QueryException(f"Error while ordering: could not find {order_key} on {first_item}")
elif order_value is not None: elif order_value is not None:
itr1, itr2 = itertools.tee(itr, 2) # expensive!!! itr1, itr2 = itertools.tee(itr, 2)
# TODO: add a kwarg to force lookup for every item? would sort of be like core.common.guess_datetime then # TODO: add a kwarg to force lookup for every item? would sort of be like core.common.guess_datetime then
order_by_lookup: Dict[Any, OrderFunc] = {} order_by_lookup: Dict[Any, OrderFunc] = {}
@ -463,12 +461,9 @@ def test_parse_timedelta_string():
import pytest import pytest
with pytest.raises(ValueError) as v: with pytest.raises(ValueError, match=r"Could not parse time duration from"):
parse_timedelta_string("5xxx") parse_timedelta_string("5xxx")
assert v is not None
assert str(v.value).startswith("Could not parse time duration from")
res = parse_timedelta_string("1w5d5h10m50s") res = parse_timedelta_string("1w5d5h10m50s")
assert res == timedelta(days=7.0 + 5.0, hours=5.0, minutes=10.0, seconds=50.0) assert res == timedelta(days=7.0 + 5.0, hours=5.0, minutes=10.0, seconds=50.0)
@ -498,20 +493,13 @@ def test_basic_orders() -> None:
return obj.x != 2 return obj.x != 2
res = list(select(basic_iter(), where=filter_two, reverse=True)) res = list(select(basic_iter(), where=filter_two, reverse=True))
assert len(res) == 4 assert res == [_Int(5), _Int(4), _Int(3), _Int(1)]
for (actual, expected) in zip(res, (5, 4, 3, 1)):
assert actual == _Int(expected)
input_items = list(basic_iter()) input_items = list(basic_iter())
random.shuffle(input_items) random.shuffle(input_items)
res = list(select(input_items, order_key="x")) res = list(select(input_items, order_key="x"))
assert len(res) == 5 assert res == [_Int(1),_Int(2),_Int(3),_Int(4),_Int(5)]
for (actual, expected) in zip(res, (1, 2, 3, 4, 5)):
assert actual.x == expected # type: ignore
# default int ordering # default int ordering
def custom_order_by(obj: Any) -> Any: def custom_order_by(obj: Any) -> Any:
@ -519,10 +507,7 @@ def test_basic_orders() -> None:
# sort random ordered list, only return first two items # sort random ordered list, only return first two items
res = list(select(input_items, where=filter_two, order_by=custom_order_by, limit=2)) res = list(select(input_items, where=filter_two, order_by=custom_order_by, limit=2))
assert len(res) == 2 assert res == [_Int(1), _Int(3)]
for (actual, expected) in zip(res, (1, 3)):
assert actual == _Int(expected)
# filter produces empty iterator # filter produces empty iterator
with pytest.warns(UserWarning, match=r"encountered empty iterable"): with pytest.warns(UserWarning, match=r"encountered empty iterable"):
@ -543,10 +528,13 @@ def test_order_key_multi_type() -> None:
res = list(select(itertools.chain(basic_iter(), floaty_iter()), order_key="x")) res = list(select(itertools.chain(basic_iter(), floaty_iter()), order_key="x"))
assert len(res) == 10 assert res == [
_Int(1), _Float(1.5),
for (actual, expected) in zip(res, (1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5)): _Int(2), _Float(2.5),
assert actual.x == expected # type: ignore _Int(3), _Float(3.5),
_Int(4), _Float(4.5),
_Int(5), _Float(5.5),
]
def test_couldnt_determine_order() -> None: def test_couldnt_determine_order() -> None:
@ -581,8 +569,7 @@ def _mixed_iter() -> Iterator[Union[_A, _B]]:
def _mixed_iter_errors() -> Iterator[Res[Union[_A, _B]]]: def _mixed_iter_errors() -> Iterator[Res[Union[_A, _B]]]:
m = _mixed_iter() m = _mixed_iter()
for _ in range(3): yield from itertools.islice(m, 0, 3)
yield next(m)
yield RuntimeError("Unhandled error!") yield RuntimeError("Unhandled error!")
yield from m yield from m
@ -593,12 +580,14 @@ def test_order_value() -> None:
# if the value for some attribute on this item is a datetime # if the value for some attribute on this item is a datetime
sorted_by_datetime = list(select(_mixed_iter(), order_value=lambda o: isinstance(o, datetime))) sorted_by_datetime = list(select(_mixed_iter(), order_value=lambda o: isinstance(o, datetime)))
assert len(sorted_by_datetime) == 6 assert sorted_by_datetime == [
_B(y=datetime(year=1995, month=5, day=10, hour=4, minute=10, second=1)),
mixed_iter_sorted_indexes = (4,5,2,3,0,1) _A(x=datetime(year=2005, month=4, day=10, hour=4, minute=10, second=1), y=2, z=-5),
_A(x=datetime(year=2005, month=5, day=10, hour=4, minute=10, second=1), y=10, z=2),
for val, expected_index in zip(sorted_by_datetime, mixed_iter_sorted_indexes): _A(x=datetime(year=2009, month=3, day=10, hour=4, minute=10, second=1), y=12, z=1),
assert val == default_order[expected_index] _A(x=datetime(year=2009, month=5, day=10, hour=4, minute=10, second=1), y=5, z=10),
_B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1)),
]
def test_key_clash() -> None: def test_key_clash() -> None:
@ -667,7 +656,7 @@ def test_wrap_unsortable_with_error_and_warning() -> None:
res = list(select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime))) res = list(select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime)))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "_B": 2, "Unsortable": 1}) assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "_B": 2, "Unsortable": 1})
# compare the returned error wrapped in the Unsortable # compare the returned error wrapped in the Unsortable
returned_error = next(filter(lambda o: isinstance(o, Unsortable), res)).obj # type: ignore[union-attr] returned_error = next((o for o in res if isinstance(o, Unsortable))).obj
assert "Unhandled error!" == str(returned_error) assert "Unhandled error!" == str(returned_error)