estimate_from helper, via_home estimator, all.py

This commit is contained in:
Sean Breckenridge 2023-02-15 21:25:25 -08:00
parent 52b6acbcbf
commit 9769748939
3 changed files with 98 additions and 5 deletions

View file

@ -0,0 +1,19 @@
# TODO: add config here which passes kwargs to estimate_from (under_accuracy)
# overwritable by passing the kwarg name here to the top-level estimate_location
from typing import Union
from datetime import datetime
from my.location.fallback.common import estimate_from, FallbackLocation
def estimate_location(dt: Union[datetime, float, int]) -> FallbackLocation:
from my.location.fallback.via_home import estimate_location as via_home
loc = estimate_from(
dt,
estimators=(via_home,)
)
if loc is None:
raise ValueError("Could not estimate location")
return loc

View file

@ -1,17 +1,17 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional from typing import Optional, Callable, Sequence, Iterator, List, Union
from datetime import datetime, timedelta from datetime import datetime, timedelta
from ..common import LocationProtocol, Location from ..common import LocationProtocol, Location
DateIshExact = Union[datetime, float, int]
@dataclass @dataclass
class FallbackLocation(LocationProtocol): class FallbackLocation(LocationProtocol):
lat: float lat: float
lon: float lon: float
dt: datetime dt: datetime
duration: float # time in seconds for how long this is valid duration: Optional[float] = None # time in seconds for how long this is valid
accuracy: Optional[float] = None accuracy: Optional[float] = None
elevation: Optional[float] = None elevation: Optional[float] = None
datasource: Optional[str] = None # which module provided this, useful for debugging datasource: Optional[str] = None # which module provided this, useful for debugging
@ -22,7 +22,7 @@ class FallbackLocation(LocationProtocol):
If end is True, the start date + duration is used If end is True, the start date + duration is used
''' '''
dt: datetime = self.dt dt: datetime = self.dt
if end: if end and self.duration is not None:
dt += timedelta(self.duration) dt += timedelta(self.duration)
return Location( return Location(
lat=self.lat, lat=self.lat,
@ -48,7 +48,7 @@ class FallbackLocation(LocationProtocol):
Create FallbackLocation from a start date and an end date Create FallbackLocation from a start date and an end date
''' '''
if end_dt < dt: if end_dt < dt:
raise ValueError('end_date must be after dt') raise ValueError("end_date must be after dt")
duration = (end_dt - dt).total_seconds() duration = (end_dt - dt).total_seconds()
return cls( return cls(
lat=lat, lat=lat,
@ -61,4 +61,54 @@ class FallbackLocation(LocationProtocol):
) )
LocationEstimator = Callable[[DateIshExact], Optional[FallbackLocation]]
LocationEstimators = Sequence[LocationEstimator]
# helper function, instead of dealing with datetimes while comparing, just use epoch timestamps
def _datetime_timestamp(dt: DateIshExact) -> float:
if isinstance(dt, datetime):
return dt.timestamp()
return float(dt)
# TODO: create estimate location which uses other fallback_locations to estimate a location # TODO: create estimate location which uses other fallback_locations to estimate a location
def _iter_estimate_from(
dt: DateIshExact,
estimators: LocationEstimators,
) -> Iterator[FallbackLocation]:
for est in estimators:
loc = est(dt)
if loc is None:
continue
yield loc
def estimate_from(
dt: DateIshExact,
estimators: LocationEstimators,
*,
first_match: bool = False,
under_accuracy: Optional[int] = None,
) -> Optional[FallbackLocation]:
'''
first_match: if True, return the first location found
under_accuracy: if set, only return locations with accuracy under this value
'''
found: List[FallbackLocation] = []
for loc in _iter_estimate_from(dt, estimators):
if under_accuracy is not None and loc.accuracy is not None and loc.accuracy > under_accuracy:
continue
if first_match:
return loc
found.append(loc)
if not found:
return None
# if all items have accuracy, return the one with the lowest accuracy
# otherwise, we should prefer the order that the estimators are passed in as
if all(loc.accuracy is not None for loc in found):
# return the location with the lowest accuracy
return min(has_accuracy, key=lambda loc: loc.accuracy) # type: ignore[union-attr]
else:
# return the first location
return found[0]

View file

@ -10,6 +10,7 @@ from typing import Sequence, Tuple, Union, cast
from my.config import location as user_config from my.config import location as user_config
from my.location.common import LatLon, DateIsh from my.location.common import LatLon, DateIsh
from my.location.fallback.common import FallbackLocation
@dataclass @dataclass
class Config(user_config): class Config(user_config):
@ -70,3 +71,26 @@ def get_location(dt: datetime) -> LatLon:
else: else:
# I guess the most reasonable is to fallback on the first location # I guess the most reasonable is to fallback on the first location
return hist[-1][1] return hist[-1][1]
def estimate_location(dt: Union[datetime, int, float]) -> FallbackLocation:
from my.location.fallback.common import _datetime_timestamp
d: float = _datetime_timestamp(dt)
# TODO: cache this?
hist = list(reversed(config._history))
for pdt, (lat, lon) in hist:
if d >= pdt.timestamp():
# TODO: add accuracy?
return FallbackLocation(
lat=lat,
lon=lon,
dt=datetime.fromtimestamp(d, timezone.utc),
datasource='via_home')
else:
# I guess the most reasonable is to fallback on the first location
lat, lon = hist[-1][1]
return FallbackLocation(
lat=lat,
lon=lon,
dt=datetime.fromtimestamp(d, timezone.utc),
datasource='via_home')