diff --git a/my/core/query.py b/my/core/query.py index 84998c7..4ab7aec 100644 --- a/my/core/query.py +++ b/my/core/query.py @@ -39,7 +39,7 @@ class Unsortable(NamedTuple): obj: Any -class QueryException(KeyError): +class QueryException(ValueError): """Used to differentiate query-related errors, so the CLI interface is more expressive""" pass @@ -61,6 +61,50 @@ def locate_function(module_name: str, function_name: str) -> Callable[[], Iterab raise QueryException(f"Could not find function {function_name} in {module_name}") +def attribute_func(obj: T, where: Where, default: Optional[U] = None) -> Optional[OrderFunc]: + """ + Attempts to find an attribute which matches the 'where_function' on the object, + using some getattr/dict checks. Returns a function which when called with + this object returns the value which the 'where' matched against + + As an example: + + from typing import NamedTuple + from datetime import datetime + from my.core.query import attribute_func + + class A(NamedTuple): + x: int + y: datetime + + val = A(x=4, y=datetime.now()) + val.y + > datetime.datetime(2021, 4, 5, 10, 52, 14, 395195) + orderfunc = attribute_func(val, where=lambda o: isinstance(o, datetime)) + orderfunc(val) + > datetime.datetime(2021, 4, 5, 10, 52, 14, 395195) + """ + if isinstance(obj, dict): + for k, v in obj.items(): + if where(v): + return lambda o: o.get(k, default) # type: ignore[union-attr] + elif dataclasses.is_dataclass(obj): + for (field_name, _annotation) in obj.__annotations__.items(): + if where(getattr(obj, field_name)): + return lambda o: getattr(o, field_name, default) + elif is_namedtuple(obj): + assert hasattr(obj, '_fields'), "Could not find '_fields' on attribute which is assumed to be a NamedTuple" + for field_name in getattr(obj, '_fields'): + if where(getattr(obj, field_name)): + return lambda o: getattr(o, field_name, default) + # try using inspect.getmembers (like 'dir()') even if the dataclass/NT checks failed, + # since the attribute one is searching for might be a @property + for k, v in inspect.getmembers(obj): + if where(v): + return lambda o: getattr(o, k, default) + return None + + def _generate_order_by_func( obj_res: Res[T], key: Optional[str] = None, @@ -115,7 +159,6 @@ pass 'drop_exceptions' to ignore exceptions""") # that you manually write an OrderFunc which # handles the edge cases, or provide a default # See tests for an example - # TODO: write test if isinstance(obj, dict): if key in obj: # acts as predicate instead of where_function return lambda o: o.get(key, default) # type: ignore[union-attr] @@ -126,31 +169,16 @@ pass 'drop_exceptions' to ignore exceptions""") # Note: if the attribute you're ordering by is an Optional type, # and on some objects it'll return None, the getattr(o, field_name, default) won't # use the default, since it finds the attribute (it just happens to be set to None) - # should this do something like: 'lambda o: getattr(o, k, default) or default' + # perhaps this should do something like: 'lambda o: getattr(o, k, default) or default' # that would fix the case, but is additional work. Perhaps the user should instead # write a 'where' function, to check for that 'isinstance' on an Optional field, - # and not include those objects in the src iterable + # and not include those objects in the src iterable... becomes a bit messy with multiple sources # user must provide either a key or a where predicate if where_function is not None: - if isinstance(obj, dict): - for k, v in obj.items(): - if where_function(v): - return lambda o: o.get(k, default) # type: ignore[union-attr] - elif dataclasses.is_dataclass(obj): - for (field_name, _annotation) in obj.__annotations__.items(): - if where_function(getattr(obj, field_name)): - return lambda o: getattr(o, field_name, default) - elif is_namedtuple(obj): - assert hasattr(obj, '_fields'), "Could not find '_fields' on attribute which is assumed to be a NamedTuple" - for field_name in getattr(obj, '_fields'): - if where_function(getattr(obj, field_name)): - return lambda o: getattr(o, field_name, default) - # try using inspect.getmembers (like 'dir()') even if the dataclass/NT checks failed, - # since the attribute one is searching for might be a @property - for k, v in inspect.getmembers(obj): - if where_function(v): - return lambda o: getattr(o, k, default) + func: Optional[OrderFunc] = attribute_func(obj, where_function, default) + if func is not None: + return func if default is not None: # warn here? it seems like you typically wouldn't want to just set the order by to @@ -202,6 +230,18 @@ def _drop_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Iterator[ET]: yield o +# try getting the first value from the iterator +# similar to my.core.common.warn_if_empty? this doesnt go through the whole iterator though +def _peek_iter(itr: Iterator[ET]) -> Tuple[Optional[ET], Iterator[ET]]: + itr = more_itertools.peekable(itr) + try: + first_item = itr.peek() + except StopIteration: + return None, itr + else: + return first_item, itr + + # similar to 'my.core.error.sort_res_by'? def _wrap_unsorted(itr: Iterator[ET], orderfunc: OrderFunc) -> Tuple[Iterator[Unsortable], Iterator[ET]]: unsortable: List[Unsortable] = [] @@ -237,6 +277,69 @@ def _handle_unsorted( return iter([]), itr +# handles creating an order_value functon, using a lookup for +# different types. ***This consumes the iterator***, so +# you should definitely itertoolts.tee it beforehand +# as to not exhaust the values +def _generate_order_value_func(itr: Iterator[ET], order_value: Where, default: Optional[U] = None) -> OrderFunc: + # TODO: add a kwarg to force lookup for every item? would sort of be like core.common.guess_datetime then + order_by_lookup: Dict[Any, OrderFunc] = {} + + # need to go through a copy of the whole iterator here to + # pre-generate functions to support sorting mixed types + for obj_res in itr: + key: Any = _determine_order_by_value_key(obj_res) + if key not in order_by_lookup: + keyfunc: Optional[OrderFunc] = _generate_order_by_func( + obj_res, + where_function=order_value, + default=default, + force_unsortable=True) + # should never be none, as we have force_unsortable=True + assert keyfunc is not None + order_by_lookup[key] = keyfunc + + # todo: cache results from above _determine_order_by_value_key call and use here somehow? + # would require additional state + # order_by_lookup[_determine_order_by_value_key(o)] returns a function which + # accepts o, and returns the value which sorted can use to order this by + return lambda o: order_by_lookup[_determine_order_by_value_key(o)](o) + + +# handles the arguments from the user, creating a order_value function +# at least one of order_by, order_key or order_value must have a value +def _handle_generate_order_by( + itr, + *, + order_by: Optional[OrderFunc] = None, + order_key: Optional[str] = None, + order_value: Optional[Where] = None, + default: Optional[U] = None, +) -> Tuple[Optional[OrderFunc], Iterator[ET]]: + order_by_chosen: Optional[OrderFunc] = order_by # if the user just supplied a function themselves + if order_by is not None: + return order_by, itr + if order_key is not None: + first_item, itr = _peek_iter(itr) + if first_item is None: + # signify the iterator was empty, return immediately from parent + return None, itr + # try to use a key, if it was supplied + # order_key doesn't use local state - it just tries to find the passed + # attribute, or default to the 'default' value. As mentioned above, + # best used for items with a similar structure + # note: this could fail if the first item doesn't have a matching attr/key? + order_by_chosen = _generate_order_by_func(first_item, key=order_key, default=default) + if order_by_chosen is None: + raise QueryException(f"Error while ordering: could not find {order_key} on {first_item}") + return order_by_chosen, itr + if order_value is not None: + itr, itr2 = itertools.tee(itr, 2) + order_by_chosen = _generate_order_value_func(itr2, order_value, default) + return order_by_chosen, itr + raise QueryException("Could not determine a way to order src iterable - at least one of the order args must be set") + + def select( src: Union[Locator, Iterable[ET], Callable[[], Iterable[ET]]], *, @@ -365,51 +468,21 @@ Will attempt to call iter() on the value""") itr = filter(where, itr) if order_by is not None or order_key is not None or order_value is not None: - # we have some sort of input that specifies we should reorder the iterator + order_by_chosen, itr = _handle_generate_order_by(itr, order_by=order_by, + order_key=order_key, + order_value=order_value, + default=default) - order_by_chosen: Optional[OrderFunc] = order_by # if the user just supplied a function themselves - if order_by is None: - itr = more_itertools.peekable(itr) - try: - first_item = itr.peek() - except StopIteration: - low("""While determining order_key, encountered empty iterable. -Your 'src' may have been empty of the 'where' clause filtered the iterable to nothing""") - # 'itr' is an empty iterable - return itr - # try to use a key, if it was supplied - # order_key doesn't use local state - it just tries to find the passed - # attribute, or default to the 'default' value. As mentioned above, - # best used for items with a similar structure - # note: this could fail if the first item doesn't have a matching attr/key? - if order_key is not None: - order_by_chosen = _generate_order_by_func(first_item, key=order_key, default=default) - if order_by_chosen is None: - raise QueryException(f"Error while ordering: could not find {order_key} on {first_item}") - elif order_value is not None: - itr1, itr2 = itertools.tee(itr, 2) - # TODO: add a kwarg to force lookup for every item? would sort of be like core.common.guess_datetime then - order_by_lookup: Dict[Any, OrderFunc] = {} - - # need to go through a copy of the whole iterator here to - # pre-generate functions to support sorting mixed types - for obj_res in itr1: - key: Any = _determine_order_by_value_key(obj_res) - if key not in order_by_lookup: - keyfunc: Optional[OrderFunc] = _generate_order_by_func(obj_res, where_function=order_value, default=default, force_unsortable=True) - # should never be none, as we have force_unsortable=True - assert keyfunc is not None - order_by_lookup[key] = keyfunc - - # set the 'itr' (iterator in higher scope) - # to the copy (itertools.tee) of the iterator we haven't used yet - itr = itr2 - - # todo: cache results from above _determine_order_by_value_key call and use here somehow? - # would require additional state - # order_by_lookup[_determine_order_by_value_key(o)] returns a function which - # accepts o, and returns the value which sorted can use to order this by - order_by_chosen = lambda o: order_by_lookup[_determine_order_by_value_key(o)](o) + # signifies itr was filtered down to no data + if order_by_chosen is None: + # previously would send an warning message here, + # but sending the warning discourages this use-case + # e.g. take this iterable and see if I've had an event in + # the last week, else notify me to do something + # + # low("""While determining order_key, encountered empty iterable. + # Your 'src' may have been empty of the 'where' clause filtered the iterable to nothing""") + return itr assert order_by_chosen is not None # note: can't just attach sort unsortable values in the same iterable as the @@ -479,9 +552,8 @@ def test_basic_orders() -> None: res = list(select(input_items, where=filter_two, order_by=custom_order_by, limit=2)) assert res == [_Int(1), _Int(3)] - # filter produces empty iterator - with pytest.warns(UserWarning, match=r"encountered empty iterable"): - res = list(select(input_items, where=lambda o: o is None, order_key="x")) + # filter produces empty iterator (previously this used to warn, doesn't anymore) + res = list(select(input_items, where=lambda o: o is None, order_key="x")) assert len(res) == 0 diff --git a/my/core/query_cli.py b/my/core/query_cli.py deleted file mode 100644 index 6ff34f2..0000000 --- a/my/core/query_cli.py +++ /dev/null @@ -1,33 +0,0 @@ -import re -from datetime import date, datetime, timedelta -from typing import Callable, Iterator, Union - -from .query import QueryException, select - -DateLike = Union[datetime, date] - -timedelta_regex = re.compile(r"^((?P[\.\d]+?)w)?((?P[\.\d]+?)d)?((?P[\.\d]+?)h)?((?P[\.\d]+?)m)?((?P[\.\d]+?)s)?$") - - -# https://stackoverflow.com/a/51916936 -def parse_timedelta_string(timedelta_str: str) -> timedelta: - """ - This uses a syntax similar to the 'GNU sleep' command - e.g.: 1w5d5h10m50s means '1 week, 5 days, 5 hours, 10 minutes, 50 seconds' - """ - parts = timedelta_regex.match(timedelta_str) - if parts is None: - raise ValueError(f"Could not parse time duration from {timedelta_str}.\nValid examples: '8h', '1w2d8h5m20s', '2m4s'") - time_params = {name: float(param) for name, param in parts.groupdict().items() if param} - return timedelta(**time_params) # type: ignore[arg-type] - - -def test_parse_timedelta_string(): - - import pytest - - with pytest.raises(ValueError, match=r"Could not parse time duration from"): - parse_timedelta_string("5xxx") - - res = parse_timedelta_string("1w5d5h10m50s") - assert res == timedelta(days=7.0 + 5.0, hours=5.0, minutes=10.0, seconds=50.0) diff --git a/my/core/query_range.py b/my/core/query_range.py new file mode 100644 index 0000000..939682b --- /dev/null +++ b/my/core/query_range.py @@ -0,0 +1,517 @@ +""" +An extension of the my.core.query.select function, allowing you to specify +a type or key to filter the range by -- this creates a filter function +given those values, coercing values on the iterable, returning you a +filtered iterator + +See the select_range function below +""" + +import re +import time +from functools import lru_cache +from datetime import datetime, timedelta, date +from typing import Callable, Iterator, NamedTuple, Optional, Any, Type + +import more_itertools + +from .query import ( + QueryException, + select, + OrderFunc, + Where, + _handle_generate_order_by, + ET, +) + +fromisoformat: Callable[[str], datetime] +import sys +if sys.version_info[:2] >= (3, 7): + # prevent mypy on py3.6 from complaining... + fromisoformat_real = datetime.fromisoformat + fromisoformat = fromisoformat_real +else: + from .py37 import fromisoformat + + +timedelta_regex = re.compile(r"^((?P[\.\d]+?)w)?((?P[\.\d]+?)d)?((?P[\.\d]+?)h)?((?P[\.\d]+?)m)?((?P[\.\d]+?)s)?$") + + +# https://stackoverflow.com/a/51916936 +def parse_timedelta_string(timedelta_str: str) -> timedelta: + """ + This uses a syntax similar to the 'GNU sleep' command + e.g.: 1w5d5h10m50s means '1 week, 5 days, 5 hours, 10 minutes, 50 seconds' + """ + parts = timedelta_regex.match(timedelta_str) + if parts is None: + raise ValueError(f"Could not parse time duration from {timedelta_str}.\nValid examples: '8h', '1w2d8h5m20s', '2m4s'") + time_params = {name: float(param) for name, param in parts.groupdict().items() if param} + return timedelta(**time_params) # type: ignore[arg-type] + + +def parse_timedelta_float(timedelta_str: str) -> float: + return parse_timedelta_string(timedelta_str).total_seconds() + + +def parse_datetime_float(date_str: str) -> float: + """ + parses multiple possible representations of a datetime + into a float, else raises a QueryException + + the query_cli interface compares floats instead of timestamps + when comparing datetimes since handling it is unknown + whether the sources the user is selecting from is tz-aware + or not (or perhaps a mix of both?) + """ + ds = date_str.strip() + # special case + if ds == "now": + return time.time() + # epoch timestamp + try: + # also handles epoch timestamps as integers + ds_float = float(ds) + # convert to make sure its a valid datetime + datetime.fromtimestamp(ds_float) + except ValueError: + pass + else: + return ds_float + try: + # isoformat - default format when you call str() on datetime + return fromisoformat(ds).timestamp() + except ValueError: + raise QueryException(f"Was not able to parse {ds} into a datetime") + # todo: add isoparse from my.core.common? not sure how it handles + # timezones and this purposefully avoids that by converting all + # datelike items to floats instead + + +# probably DateLike input? but a user could specify an order_key +# which is an epoch timestamp or a float value which they +# expect to be converted to a datetime to compare +@lru_cache(maxsize=None) +def _datelike_to_float(dl: Any) -> float: + if isinstance(dl, datetime): + return dl.timestamp() + elif isinstance(dl, date): + # hmm... sets the hours/minutes/seconds to 0 -- make this configurable? + return (datetime.combine(dl, datetime.min.time())).timestamp() + else: + try: + return parse_datetime_float(dl) + except QueryException as q: + raise QueryException(f"While attempting to extract datetime from {dl}, to order by datetime:\n\n" + str(q)) + + +class RangeTuple(NamedTuple): + """Can specify 0, 1 or 2 non-none items in a range -- but not all 3 + + As an example, using datetimes/timedelta (some date, and possibly a duration) + + where 1 arg is not None + - after is not None: filters it to any items 'after' the datetime + - before is not None: filters to any items 'before' the datetime + - within: filters to any items 'within' the timedelta, assuming you meant within the current + timeframe, so before = time.time() + + when 2 args are not None: + - after and within, filters anything after the initial 'after' time + but 'within' the timeframe (parsed timedelta, e.g. 5d) + - before and within, anything 'within' the timeframe, starting at the end + of the timeframe -- 'before' + - before and after - anything after 'after' and before 'before', acts as a time range + """ + after: Optional[Any] + before: Optional[Any] + within: Optional[Any] + + +Converter = Callable[[Any], Any] + + +def _parse_range( + *, + unparsed_range: RangeTuple, + end_parser: Converter, + within_parser: Converter, + parsed_range: Optional[RangeTuple] = None, + error_message: Optional[str] = None) -> Optional[RangeTuple]: + + if parsed_range is not None: + return parsed_range + + err_msg = error_message or RangeTuple.__doc__ + after, before, within = None, None, None + + none_count = more_itertools.ilen(filter(lambda o: o is None, list(unparsed_range))) + if none_count == 3: + return None + if none_count == 0: + raise QueryException("Cannot specify 'after', 'before' and 'within' at the same time!\n\n" + err_msg) + + [after_str, before_str, within_str] = tuple(unparsed_range) + after = end_parser(after_str) if after_str is not None else None + before = end_parser(before_str) if before_str is not None else None + within = within_parser(within_str) if within_str is not None else None + + return RangeTuple(after=after, before=before, within=within) + + +def _create_range_filter( + *, + unparsed_range: RangeTuple, + end_parser: Converter, + within_parser: Converter, + attr_func: Where, + parsed_range: Optional[RangeTuple] = None, + default_before: Optional[Any] = None, + value_coercion_func: Optional[Converter] = None, + error_message: Optional[str] = None, +) -> Optional[Where]: + """ + Handles: + - parsing the user input into values that are comparable to items the iterable returns + - unparsed_range: tuple of raw values from user + - end_parser: parses 'before' and 'after' (e.g. start/end dates) + - within_parser: parser for the 'range' (e.g. timedelta) + - error_message: allow overriding the default error message while parsing + - converting items from the iterable to some coerced value, so that its comparable to + the before, after and within parts of the range + - if value_coercion_func is present, tries to use that + to convert the value returned by the attr_func + + unparsed_range is a tuple of the input data from the user + + parsed_range can be passed if you've already parsed unparsed_range + + 'default_before' specifies what to set if no before or after was specified in + RangeTuple and we need an endpoint to end the range at. For example, if you wanted + data from an iterable from the last week, you could specify default_before to be now (time.time()), + and unparsed_range.within to be 7 days + + Creates a predicate that checks if some item from the iterator is + within some range. this is typically used for datelike input, but the user could + specify an integer or float item to order the values by/in some timeframe + + It requires the value you're comparing by to support comparable/addition operators (=, <, >, +, -) + + attr_func is a function which accepts the object from the iterator and returns + the value to compare the range boundaries to. typically generated by _generate_order_by_func + + To force the values you're sorting by to be in some specified type, + this allows a 'conversion_func, which optionally converts the value + returned by attr_func to some shared type (see _datelike_to_float for an example) + """ + + rn = _parse_range(unparsed_range=unparsed_range, + end_parser=end_parser, + within_parser=within_parser, + parsed_range=parsed_range, + error_message=error_message) + + # user specified all 'None' items in the range, don't need to filter + if rn is None: + return None + + # destructure namedtuple + (after, before, within) = rn + + # hmm... not sure how to correctly manage + # inclusivity here? Is [after, before) currently, + # items are included on the lower bound but not the + # upper bound + # typically used for datetimes so doesnt have to + # be exact in that case + def generated_predicate(obj: Any) -> bool: + ov: Any = attr_func(obj) + if value_coercion_func is not None: + ov = value_coercion_func(ov) + if after is not None: + if before is not None: + # squeeze between before/after + return ov >= after and ov < before + elif within is not None: + # after some start point + some range + allow_before = after + within + return ov >= after and ov < allow_before + else: + return ov >= after + elif before is not None: + if within is not None: + allow_after = before - within + # before a startpoint + some range + return ov >= allow_after and ov < before + else: + # just before the startpoint + return ov < before + else: + # only specified within, default before to now + if default_before is None: + raise QueryException("Only recieved a range length, with no start or end point to compare against") + allow_after = default_before - within + return ov >= allow_after and ov < default_before + + return generated_predicate + + +# main interface to this file from my.core.__main__.py +def select_range( + itr: Iterator[ET], + *, + where: Optional[Where] = None, + order_key: Optional[str] = None, + order_value: Optional[Where] = None, + order_by_value_type: Optional[Type] = None, + unparsed_range: Optional[RangeTuple] = None, + reverse: bool = False, + limit: Optional[int] = None, + drop_unsorted: bool = True, + wrap_unsorted: bool = False, + drop_exceptions: bool = False, + raise_exceptions: bool = False, +) -> Iterator[ET]: + """ + A specialized select function which offers generating functions + to filter/query ranges from an iterable + + order_key and order_value are used in the same way they are in select + + If you specify order_by_value_type, it tries to search for an attribute + on each object/type which has that type, ordering the iterable by that value + + unparsed_range is a tuple of length 3, specifying 'after', 'before', 'duration', + i.e. some start point to allow the computed value we're ordering by, some + end point and a duration (can use the RangeTuple NamedTuple to construct one) + + (this is typically parsed/created in my.core.__main__, from CLI flags + + If you specify a range, drop_unsorted is forced to be True + """ + + # some operations to do before ordering/filtering + if drop_exceptions or raise_exceptions or where is not None: + # doesnt wrap unsortable items, because we pass no order related kwargs + itr = select(itr, where=where, drop_exceptions=drop_exceptions, raise_exceptions=raise_exceptions) + + order_by_chosen: Optional[OrderFunc] = None + + # if the user didn't specify an attribute to order value, but specified a type + # we should search for on each value in the iterator + if order_value is None and order_by_value_type is not None: + # search for that type on the iterator object + order_value = lambda o: isinstance(o, order_by_value_type) # type: ignore + + # if the user supplied a order_key, and/or we've generated an order_value, create + # the function that accesses that type on each value in the iterator + if order_key is not None or order_value is not None: + order_by_chosen, itr = _handle_generate_order_by(itr, order_key=order_key, order_value=order_value) + # signifies that itr is empty -- can early return here + if order_by_chosen is None: + return itr + + # test if the user is trying to specify a range to filter the items by + if unparsed_range is not None: + + if order_by_chosen is None: + raise QueryException("""Can't order by range if we have no way to order_by! +Specify a type or a key to order the value by""") + else: + # force drop_unsorted=True so we can use _create_range_filter + # sort the iterable by the generated order_by_chosen function + itr = select(itr, order_by=order_by_chosen, drop_unsorted=True) + filter_func: Optional[Where] + if order_by_value_type == datetime: + filter_func = _create_range_filter( + unparsed_range=unparsed_range, + end_parser=parse_datetime_float, + within_parser=parse_timedelta_float, + attr_func=order_by_chosen, # type: ignore[arg-type] + default_before=time.time(), + value_coercion_func=_datelike_to_float) + elif order_by_value_type in [str, int, float]: + # allow primitives to be converted using the default int(), float() callables + filter_func = _create_range_filter( + unparsed_range=unparsed_range, + end_parser=order_by_value_type, + within_parser=order_by_value_type, + attr_func=order_by_chosen, # type: ignore[arg-type] + default_before=None, + value_coercion_func=order_by_value_type) + else: + # TODO: add additional kwargs to let the user sort by other values, by specifying the parsers? + # would need to allow passing the end_parser, within parser, default before and value_coercion_func... + # (seems like a lot?) + raise QueryException("Sorting by custom types is currently unsupported") + + # use the created filter function in select (could be None if the user specified a RangeTuple + # like (None, None, None) somehow, but select handles that case + # apply leftover arguments from the user (limit, reverse) + # we've already applied drop_exceptions and kwargs related to unsortable values above + itr = select(itr, where=filter_func, limit=limit, reverse=reverse) + else: + # wrap_unsorted may be used here if the user is just specifying an order_key + # and nothing else to order or filter range by + # i.e. none of the range-related code ran, this is just a select with a key + itr = select(itr, + order_by=order_by_chosen, + wrap_unsorted=wrap_unsorted, + drop_unsorted=drop_unsorted, + limit=limit, + reverse=reverse) + return itr + + +# re-use items from query for testing +from .query import _A, _B, _Float, _mixed_iter_errors + + +def test_filter_in_timeframe() -> None: + + from itertools import chain + + jan_1_2005 = datetime(year=2005, month=1, day=1, hour=1, minute=1, second=1) + jan_1_2016 = datetime(year=2016, month=1, day=1, hour=1, minute=1, second=1) + + rng = RangeTuple(after=str(jan_1_2005), before=str(jan_1_2016), within=None) + + # items between 2005 and 2016 + res = list(select_range(_mixed_iter_errors(), order_by_value_type=datetime, unparsed_range=rng, drop_exceptions=True)) + + assert res == [_A(x=datetime(2005, 4, 10, 4, 10, 1), y=2, z=-5), + _A(x=datetime(2005, 5, 10, 4, 10, 1), y=10, z=2), + _A(x=datetime(2009, 3, 10, 4, 10, 1), y=12, z=1), + _A(x=datetime(2009, 5, 10, 4, 10, 1), y=5, z=10), + _B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1))] + + + rng = RangeTuple(before=str(jan_1_2016), within="52w", after=None) + + # from 2016, going back 52 weeks (about a year?) + res = list(select_range(_mixed_iter_errors(), order_by_value_type=datetime, unparsed_range=rng, drop_exceptions=True)) + + assert res == [_B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1))] + + # test passing just a within while using a datetime. should default to using current time + recent_time = datetime.now() - timedelta(days=5) + obj = _A(x=recent_time, y=2, z=-5) + + rng = RangeTuple(before=None, after=None, within="1w") + res = list(select_range(chain(_mixed_iter_errors(), iter([obj])), + order_by_value_type=datetime, + unparsed_range=rng, drop_exceptions=True)) + + assert res == [obj] + + # dont pass any range related stuff, use where/drop_exceptions and the limit flag + # to make sure this falls through properly to using select kwargs + + using_range = list(select_range(_mixed_iter_errors(), drop_exceptions=True, limit=5)) + normal = list(select(_mixed_iter_errors(), limit=5, where=lambda o: not isinstance(o, Exception))) + + assert using_range == normal + + +def test_query_range_float_value_type() -> None: + + def floaty_iter() -> Iterator[_Float]: + for v in range(1, 6): + yield _Float(float(v + 0.5)) + + rng = RangeTuple(after=2, before=6.1, within=None) + res = list(select_range(floaty_iter(), order_by_value_type=float, unparsed_range=rng, drop_exceptions=True)) + assert res == [_Float(2.5), _Float(3.5), _Float(4.5), _Float(5.5)] + + +def test_range_predicate() -> None: + + from functools import partial + + def src() -> Iterator[str]: + yield from map(str, range(15)) + + identity = lambda o: o + + # convert any float values to ints + coerce_int_parser = lambda o: int(float(o)) + int_filter_func = partial(_create_range_filter, attr_func=identity, end_parser=coerce_int_parser, + within_parser=coerce_int_parser, value_coercion_func=coerce_int_parser) + + # filter from 0 to 5 + rn: Optional[RangeTuple] = RangeTuple("0", "5", None) + zero_to_five_filter: Optional[Where] = int_filter_func(unparsed_range=rn) + assert zero_to_five_filter is not None + # this is just a Where function, given some input it return True/False if the value is allowed + assert zero_to_five_filter(3) is True + assert zero_to_five_filter(10) is False + + # this is expected, range_predicate is not inclusive on the far end + assert list(filter(zero_to_five_filter, src())) == ["0", "1", "2", "3", "4"] + + # items less than 3, going 3.5 (converted to 3 by the coerce_int_parser) down + rn = RangeTuple(None, 3, "3.5") + assert list(filter(int_filter_func(unparsed_range=rn, attr_func=identity), src())) == ["0", "1", "2"] + +def test_parse_range() -> None: + + from functools import partial + + import pytest + + rn = RangeTuple("0", "5", None) + res = _parse_range(unparsed_range=rn, end_parser=int, within_parser=int) + + assert res == RangeTuple(after=0, before=5, within=None) + + dt_parse_range = partial(_parse_range, end_parser=parse_datetime_float, within_parser=parse_timedelta_float) + + start_date = datetime.now() + end_date = start_date + timedelta(seconds=60) + + # convert start items to strings, which need to be parsed back + rn = RangeTuple(str(start_date), str(end_date.timestamp()), None) + res2 = dt_parse_range(unparsed_range=rn) + + assert res2 == RangeTuple(after=start_date.timestamp(), before=end_date.timestamp(), within=None) + + # cant specify all three + with pytest.raises(QueryException, match=r"Cannot specify 'after', 'before' and 'within'"): + dt_parse_range(unparsed_range=RangeTuple(str(start_date), str(end_date.timestamp()), "7d")) + + # if you specify noting, should return None + res3 = dt_parse_range(unparsed_range=RangeTuple(None, None, None)) + assert res3 is None + + +def test_parse_timedelta_string() -> None: + + import pytest + + with pytest.raises(ValueError, match=r"Could not parse time duration from"): + parse_timedelta_string("5xxx") + + res = parse_timedelta_string("1w5d5h10m50s") + assert res == timedelta(days=7.0 + 5.0, hours=5.0, minutes=10.0, seconds=50.0) + + +def test_parse_datetime_float() -> None: + + pnow = parse_datetime_float("now") + sec_diff = abs((pnow - datetime.now().timestamp())) + # should probably never fail? could mock time.time + # but there seems to be issues with doing that use C-libraries (as time.time) does + # https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking + assert sec_diff < 60 + + dt = datetime.now() + dt_float_s = str(dt.timestamp()) + dt_int_s = str(int(dt.timestamp())) + + # float/int representations as strings + assert dt.timestamp() == parse_datetime_float(dt_float_s) + assert int(dt.timestamp()) == int(parse_datetime_float(dt_int_s)) + + # test parsing isoformat + assert dt.timestamp() == parse_datetime_float(str(dt)) +