HPI/my/core/query_range.py
2022-01-27 19:25:42 +00:00

520 lines
20 KiB
Python

"""
An extension of the my.core.query.select function, allowing you to specify
a type or key to filter the range by -- this creates a filter function
given those values, coercing values on the iterable, returning you a
filtered iterator
See the select_range function below
"""
import re
import time
from functools import lru_cache
from datetime import datetime, timedelta, date
from typing import Callable, Iterator, NamedTuple, Optional, Any, Type
import more_itertools
from .query import (
QueryException,
select,
OrderFunc,
Where,
_handle_generate_order_by,
ET,
)
from .common import isoparse
timedelta_regex = re.compile(r"^((?P<weeks>[\.\d]+?)w)?((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$")
# https://stackoverflow.com/a/51916936
def parse_timedelta_string(timedelta_str: str) -> timedelta:
"""
This uses a syntax similar to the 'GNU sleep' command
e.g.: 1w5d5h10m50s means '1 week, 5 days, 5 hours, 10 minutes, 50 seconds'
"""
parts = timedelta_regex.match(timedelta_str)
if parts is None:
raise ValueError(f"Could not parse time duration from {timedelta_str}.\nValid examples: '8h', '1w2d8h5m20s', '2m4s'")
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params) # type: ignore[arg-type]
def parse_timedelta_float(timedelta_str: str) -> float:
return parse_timedelta_string(timedelta_str).total_seconds()
def parse_datetime_float(date_str: str) -> float:
"""
parses multiple possible representations of a datetime
into a float, else raises a QueryException
the query_cli interface compares floats instead of timestamps
when comparing datetimes since handling it is unknown
whether the sources the user is selecting from is tz-aware
or not (or perhaps a mix of both?)
"""
ds = date_str.strip()
# special case
if ds == "now":
return time.time()
# epoch timestamp
try:
# also handles epoch timestamps as integers
ds_float = float(ds)
# convert to make sure its a valid datetime
datetime.fromtimestamp(ds_float)
except ValueError:
pass
else:
return ds_float
try:
# isoformat - default format when you call str() on datetime
return datetime.fromisoformat(ds).timestamp()
except ValueError:
pass
try:
return isoparse(ds).timestamp()
except (AssertionError, ValueError):
raise QueryException(f"Was not able to parse {ds} into a datetime")
# probably DateLike input? but a user could specify an order_key
# which is an epoch timestamp or a float value which they
# expect to be converted to a datetime to compare
@lru_cache(maxsize=None)
def _datelike_to_float(dl: Any) -> float:
if isinstance(dl, datetime):
return dl.timestamp()
elif isinstance(dl, date):
# hmm... sets the hours/minutes/seconds to 0 -- make this configurable?
return (datetime.combine(dl, datetime.min.time())).timestamp()
else:
try:
return parse_datetime_float(dl)
except QueryException as q:
raise QueryException(f"While attempting to extract datetime from {dl}, to order by datetime:\n\n" + str(q))
class RangeTuple(NamedTuple):
"""Can specify 0, 1 or 2 non-none items in a range -- but not all 3
As an example, using datetimes/timedelta (some date, and possibly a duration)
where 1 arg is not None
- after is not None: filters it to any items 'after' the datetime
- before is not None: filters to any items 'before' the datetime
- within: filters to any items 'within' the timedelta, assuming you meant within the current
timeframe, so before = time.time()
when 2 args are not None:
- after and within, filters anything after the initial 'after' time
but 'within' the timeframe (parsed timedelta, e.g. 5d)
- before and within, anything 'within' the timeframe, starting at the end
of the timeframe -- 'before'
- before and after - anything after 'after' and before 'before', acts as a time range
"""
# technically doesn't need to be Optional[Any],
# just to make it more clear these can be None
after: Optional[Any]
before: Optional[Any]
within: Optional[Any]
Converter = Callable[[Any], Any]
def _parse_range(
*,
unparsed_range: RangeTuple,
end_parser: Converter,
within_parser: Converter,
parsed_range: Optional[RangeTuple] = None,
error_message: Optional[str] = None) -> Optional[RangeTuple]:
if parsed_range is not None:
return parsed_range
err_msg = error_message or RangeTuple.__doc__
after, before, within = None, None, None
none_count = more_itertools.ilen(filter(lambda o: o is None, list(unparsed_range)))
if none_count == 3:
return None
if none_count == 0:
raise QueryException("Cannot specify 'after', 'before' and 'within' at the same time!\n\n" + err_msg)
[after_str, before_str, within_str] = tuple(unparsed_range)
after = end_parser(after_str) if after_str is not None else None
before = end_parser(before_str) if before_str is not None else None
within = within_parser(within_str) if within_str is not None else None
return RangeTuple(after=after, before=before, within=within)
def _create_range_filter(
*,
unparsed_range: RangeTuple,
end_parser: Converter,
within_parser: Converter,
attr_func: Where,
parsed_range: Optional[RangeTuple] = None,
default_before: Optional[Any] = None,
value_coercion_func: Optional[Converter] = None,
error_message: Optional[str] = None,
) -> Optional[Where]:
"""
Handles:
- parsing the user input into values that are comparable to items the iterable returns
- unparsed_range: tuple of raw values from user
- end_parser: parses 'before' and 'after' (e.g. start/end dates)
- within_parser: parser for the 'range' (e.g. timedelta)
- error_message: allow overriding the default error message while parsing
- converting items from the iterable to some coerced value, so that its comparable to
the before, after and within parts of the range
- if value_coercion_func is present, tries to use that
to convert the value returned by the attr_func
unparsed_range is a tuple of the input data from the user
parsed_range can be passed if you've already parsed unparsed_range
'default_before' specifies what to set if no before or after was specified in
RangeTuple and we need an endpoint to end the range at. For example, if you wanted
data from an iterable from the last week, you could specify default_before to be now (time.time()),
and unparsed_range.within to be 7 days
Creates a predicate that checks if some item from the iterator is
within some range. this is typically used for datelike input, but the user could
specify an integer or float item to order the values by/in some timeframe
It requires the value you're comparing by to support comparable/addition operators (=, <, >, +, -)
attr_func is a function which accepts the object from the iterator and returns
the value to compare the range boundaries to. typically generated by _generate_order_by_func
To force the values you're sorting by to be in some specified type,
this allows a 'value_coercion_func', which optionally converts the value
returned by attr_func to some shared type (see _datelike_to_float for an example)
"""
rn = _parse_range(unparsed_range=unparsed_range,
end_parser=end_parser,
within_parser=within_parser,
parsed_range=parsed_range,
error_message=error_message)
# user specified all 'None' items in the range, don't need to filter
if rn is None:
return None
after = rn.after
before = rn.before
within = rn.within
# hmm... not sure how to correctly manage
# inclusivity here? Is [after, before) currently,
# items are included on the lower bound but not the
# upper bound
# typically used for datetimes so doesnt have to
# be exact in that case
def generated_predicate(obj: Any) -> bool:
ov: Any = attr_func(obj)
if value_coercion_func is not None:
ov = value_coercion_func(ov)
if after is not None:
if before is not None:
# squeeze between before/after
return ov >= after and ov < before
elif within is not None:
# after some start point + some range
allow_before = after + within
return ov >= after and ov < allow_before
else:
return ov >= after
elif before is not None:
if within is not None:
allow_after = before - within
# before a startpoint + some range
return ov >= allow_after and ov < before
else:
# just before the startpoint
return ov < before
else:
# only specified within, default before to now
if default_before is None:
raise QueryException("Only received a range length, with no start or end point to compare against")
allow_after = default_before - within
return ov >= allow_after and ov < default_before
return generated_predicate
# main interface to this file from my.core.__main__.py
def select_range(
itr: Iterator[ET],
*,
where: Optional[Where] = None,
order_key: Optional[str] = None,
order_value: Optional[Where] = None,
order_by_value_type: Optional[Type] = None,
unparsed_range: Optional[RangeTuple] = None,
reverse: bool = False,
limit: Optional[int] = None,
drop_unsorted: bool = False,
wrap_unsorted: bool = False,
drop_exceptions: bool = False,
raise_exceptions: bool = False,
) -> Iterator[ET]:
"""
A specialized select function which offers generating functions
to filter/query ranges from an iterable
order_key and order_value are used in the same way they are in select
If you specify order_by_value_type, it tries to search for an attribute
on each object/type which has that type, ordering the iterable by that value
unparsed_range is a tuple of length 3, specifying 'after', 'before', 'duration',
i.e. some start point to allow the computed value we're ordering by, some
end point and a duration (can use the RangeTuple NamedTuple to construct one)
(this is typically parsed/created in my.core.__main__, from CLI flags
If you specify a range, drop_unsorted is forced to be True
"""
# if the user specified a range with no data, set the unparsed_range to None
if unparsed_range == RangeTuple(None, None, None):
unparsed_range = None
# some operations to do before ordering/filtering
if drop_exceptions or raise_exceptions or where is not None:
# doesnt wrap unsortable items, because we pass no order related kwargs
itr = select(itr, where=where, drop_exceptions=drop_exceptions, raise_exceptions=raise_exceptions)
order_by_chosen: Optional[OrderFunc] = None
# if the user didn't specify an attribute to order value, but specified a type
# we should search for on each value in the iterator
if order_value is None and order_by_value_type is not None:
# search for that type on the iterator object
order_value = lambda o: isinstance(o, order_by_value_type) # type: ignore
# if the user supplied a order_key, and/or we've generated an order_value, create
# the function that accesses that type on each value in the iterator
if order_key is not None or order_value is not None:
order_by_chosen, itr = _handle_generate_order_by(itr, order_key=order_key, order_value=order_value)
# signifies that itr is empty -- can early return here
if order_by_chosen is None:
return itr
# test if the user is trying to specify a range to filter the items by
if unparsed_range is not None:
if order_by_chosen is None:
raise QueryException("""Can't order by range if we have no way to order_by!
Specify a type or a key to order the value by""")
else:
# force drop_unsorted=True so we can use _create_range_filter
# sort the iterable by the generated order_by_chosen function
itr = select(itr, order_by=order_by_chosen, drop_unsorted=True)
filter_func: Optional[Where]
if order_by_value_type in [datetime, date]:
filter_func = _create_range_filter(
unparsed_range=unparsed_range,
end_parser=parse_datetime_float,
within_parser=parse_timedelta_float,
attr_func=order_by_chosen, # type: ignore[arg-type]
default_before=time.time(),
value_coercion_func=_datelike_to_float)
elif order_by_value_type in [int, float]:
# allow primitives to be converted using the default int(), float() callables
filter_func = _create_range_filter(
unparsed_range=unparsed_range,
end_parser=order_by_value_type,
within_parser=order_by_value_type,
attr_func=order_by_chosen, # type: ignore[arg-type]
default_before=None,
value_coercion_func=order_by_value_type)
else:
# TODO: add additional kwargs to let the user sort by other values, by specifying the parsers?
# would need to allow passing the end_parser, within parser, default before and value_coercion_func...
# (seems like a lot?)
raise QueryException("Sorting by custom types is currently unsupported")
# use the created filter function
# we've already applied drop_exceptions and kwargs related to unsortable values above
itr = select(itr, where=filter_func, limit=limit, reverse=reverse)
else:
# wrap_unsorted may be used here if the user specified an order_key,
# or manually passed a order_value function
#
# this select is also run if the user didn't specify anything to
# order by, and is just returning the data in the same order as
# as the srouce iterable
# i.e. none of the range-related filtering code ran, this is just a select
itr = select(itr,
order_by=order_by_chosen,
wrap_unsorted=wrap_unsorted,
drop_unsorted=drop_unsorted,
limit=limit,
reverse=reverse)
return itr
# re-use items from query for testing
from .query import _A, _B, _Float, _mixed_iter_errors
def test_filter_in_timeframe() -> None:
from itertools import chain
jan_1_2005 = datetime(year=2005, month=1, day=1, hour=1, minute=1, second=1)
jan_1_2016 = datetime(year=2016, month=1, day=1, hour=1, minute=1, second=1)
rng = RangeTuple(after=str(jan_1_2005), before=str(jan_1_2016), within=None)
# items between 2005 and 2016
res = list(select_range(_mixed_iter_errors(), order_by_value_type=datetime, unparsed_range=rng, drop_exceptions=True))
assert res == [_A(x=datetime(2005, 4, 10, 4, 10, 1), y=2, z=-5),
_A(x=datetime(2005, 5, 10, 4, 10, 1), y=10, z=2),
_A(x=datetime(2009, 3, 10, 4, 10, 1), y=12, z=1),
_A(x=datetime(2009, 5, 10, 4, 10, 1), y=5, z=10),
_B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1))]
rng = RangeTuple(before=str(jan_1_2016), within="52w", after=None)
# from 2016, going back 52 weeks (about a year?)
res = list(select_range(_mixed_iter_errors(), order_by_value_type=datetime, unparsed_range=rng, drop_exceptions=True))
assert res == [_B(y=datetime(year=2015, month=5, day=10, hour=4, minute=10, second=1))]
# test passing just a within while using a datetime. should default to using current time
recent_time = datetime.now() - timedelta(days=5)
obj = _A(x=recent_time, y=2, z=-5)
rng = RangeTuple(before=None, after=None, within="1w")
res = list(select_range(chain(_mixed_iter_errors(), iter([obj])),
order_by_value_type=datetime,
unparsed_range=rng, drop_exceptions=True))
assert res == [obj]
# dont pass any range related stuff, use where/drop_exceptions and the limit flag
# to make sure this falls through properly to using select kwargs
using_range = list(select_range(_mixed_iter_errors(), drop_exceptions=True, limit=5))
normal = list(select(_mixed_iter_errors(), limit=5, where=lambda o: not isinstance(o, Exception)))
assert using_range == normal
def test_query_range_float_value_type() -> None:
def floaty_iter() -> Iterator[_Float]:
for v in range(1, 6):
yield _Float(float(v + 0.5))
rng = RangeTuple(after=2, before=6.1, within=None)
res = list(select_range(floaty_iter(), order_by_value_type=float, unparsed_range=rng, drop_exceptions=True))
assert res == [_Float(2.5), _Float(3.5), _Float(4.5), _Float(5.5)]
def test_range_predicate() -> None:
from functools import partial
def src() -> Iterator[str]:
yield from map(str, range(15))
identity = lambda o: o
# convert any float values to ints
coerce_int_parser = lambda o: int(float(o))
int_filter_func = partial(_create_range_filter, attr_func=identity, end_parser=coerce_int_parser,
within_parser=coerce_int_parser, value_coercion_func=coerce_int_parser)
# filter from 0 to 5
rn: Optional[RangeTuple] = RangeTuple("0", "5", None)
zero_to_five_filter: Optional[Where] = int_filter_func(unparsed_range=rn)
assert zero_to_five_filter is not None
# this is just a Where function, given some input it return True/False if the value is allowed
assert zero_to_five_filter(3) is True
assert zero_to_five_filter(10) is False
# this is expected, range_predicate is not inclusive on the far end
assert list(filter(zero_to_five_filter, src())) == ["0", "1", "2", "3", "4"]
# items less than 3, going 3.5 (converted to 3 by the coerce_int_parser) down
rn = RangeTuple(None, 3, "3.5")
assert list(filter(int_filter_func(unparsed_range=rn, attr_func=identity), src())) == ["0", "1", "2"]
def test_parse_range() -> None:
from functools import partial
import pytest
rn = RangeTuple("0", "5", None)
res = _parse_range(unparsed_range=rn, end_parser=int, within_parser=int)
assert res == RangeTuple(after=0, before=5, within=None)
dt_parse_range = partial(_parse_range, end_parser=parse_datetime_float, within_parser=parse_timedelta_float)
start_date = datetime.now()
end_date = start_date + timedelta(seconds=60)
# convert start items to strings, which need to be parsed back
rn = RangeTuple(str(start_date), str(end_date.timestamp()), None)
res2 = dt_parse_range(unparsed_range=rn)
assert res2 == RangeTuple(after=start_date.timestamp(), before=end_date.timestamp(), within=None)
# cant specify all three
with pytest.raises(QueryException, match=r"Cannot specify 'after', 'before' and 'within'"):
dt_parse_range(unparsed_range=RangeTuple(str(start_date), str(end_date.timestamp()), "7d"))
# if you specify noting, should return None
res3 = dt_parse_range(unparsed_range=RangeTuple(None, None, None))
assert res3 is None
def test_parse_timedelta_string() -> None:
import pytest
with pytest.raises(ValueError, match=r"Could not parse time duration from"):
parse_timedelta_string("5xxx")
res = parse_timedelta_string("1w5d5h10m50s")
assert res == timedelta(days=7.0 + 5.0, hours=5.0, minutes=10.0, seconds=50.0)
def test_parse_datetime_float() -> None:
pnow = parse_datetime_float("now")
sec_diff = abs((pnow - datetime.now().timestamp()))
# should probably never fail? could mock time.time
# but there seems to be issues with doing that use C-libraries (as time.time) does
# https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking
assert sec_diff < 60
dt = datetime.now()
dt_float_s = str(dt.timestamp())
dt_int_s = str(int(dt.timestamp()))
# float/int representations as strings
assert dt.timestamp() == parse_datetime_float(dt_float_s)
assert int(dt.timestamp()) == int(parse_datetime_float(dt_int_s))
# test parsing isoformat
assert dt.timestamp() == parse_datetime_float(str(dt))