add via_ip.estimate_location using binary search
This commit is contained in:
parent
7aed18042d
commit
0a48393589
5 changed files with 84 additions and 24 deletions
|
@ -66,17 +66,18 @@ class google:
|
||||||
|
|
||||||
|
|
||||||
from typing import Sequence, Union, Tuple
|
from typing import Sequence, Union, Tuple
|
||||||
from datetime import datetime, date
|
from datetime import datetime, date, timedelta
|
||||||
DateIsh = Union[datetime, date, str]
|
DateIsh = Union[datetime, date, str]
|
||||||
LatLon = Tuple[float, float]
|
LatLon = Tuple[float, float]
|
||||||
class location:
|
class location:
|
||||||
# todo ugh, need to think about it... mypy wants the type here to be general, otherwise it can't deduce
|
# todo ugh, need to think about it... mypy wants the type here to be general, otherwise it can't deduce
|
||||||
# and we can't import the types from the module itself, otherwise would be circular. common module?
|
# and we can't import the types from the module itself, otherwise would be circular. common module?
|
||||||
home: Union[LatLon, Sequence[Tuple[DateIsh, LatLon]]] = (1.0, -1.0)
|
home: Union[LatLon, Sequence[Tuple[DateIsh, LatLon]]] = (1.0, -1.0)
|
||||||
home_accuracy = 1000.0
|
home_accuracy = 30_000.0
|
||||||
|
|
||||||
class via_ip:
|
class via_ip:
|
||||||
accuracy: float
|
accuracy: float
|
||||||
|
for_duration: timedelta
|
||||||
|
|
||||||
class gpslogger:
|
class gpslogger:
|
||||||
export_path: Paths = ''
|
export_path: Paths = ''
|
||||||
|
|
|
@ -4,7 +4,12 @@
|
||||||
from typing import Iterator
|
from typing import Iterator
|
||||||
|
|
||||||
from my.core.source import import_source
|
from my.core.source import import_source
|
||||||
from my.location.fallback.common import estimate_from, FallbackLocation, DateExact
|
from my.location.fallback.common import (
|
||||||
|
estimate_from,
|
||||||
|
FallbackLocation,
|
||||||
|
DateExact,
|
||||||
|
LocationEstimator,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# can comment/uncomment sources here to enable/disable them
|
# can comment/uncomment sources here to enable/disable them
|
||||||
|
@ -12,8 +17,15 @@ def fallback_locations() -> Iterator[FallbackLocation]:
|
||||||
yield from _ip_fallback_locations()
|
yield from _ip_fallback_locations()
|
||||||
|
|
||||||
|
|
||||||
|
def fallback_estimators() -> Iterator[LocationEstimator]:
|
||||||
|
# can comment/uncomment estimators here to enable/disable them
|
||||||
|
# the order of the estimators determines priority if location accuries are equal/unavailable
|
||||||
|
yield _ip_estimate
|
||||||
|
yield _home_estimate
|
||||||
|
|
||||||
|
|
||||||
def estimate_location(dt: DateExact) -> FallbackLocation:
|
def estimate_location(dt: DateExact) -> FallbackLocation:
|
||||||
loc = estimate_from(dt, estimators=(_home_estimate,))
|
loc = estimate_from(dt, estimators=list(fallback_estimators()))
|
||||||
if loc is None:
|
if loc is None:
|
||||||
raise ValueError("Could not estimate location")
|
raise ValueError("Could not estimate location")
|
||||||
return loc
|
return loc
|
||||||
|
@ -21,9 +33,16 @@ def estimate_location(dt: DateExact) -> FallbackLocation:
|
||||||
|
|
||||||
@import_source(module_name="my.location.fallback.via_home")
|
@import_source(module_name="my.location.fallback.via_home")
|
||||||
def _home_estimate(dt: DateExact) -> Iterator[FallbackLocation]:
|
def _home_estimate(dt: DateExact) -> Iterator[FallbackLocation]:
|
||||||
from my.location.fallback.via_home import estimate_location as via_home
|
from my.location.fallback.via_home import estimate_location as via_home_estimate
|
||||||
|
|
||||||
yield from via_home(dt)
|
yield from via_home_estimate(dt)
|
||||||
|
|
||||||
|
|
||||||
|
@import_source(module_name="my.location.fallback.via_ip")
|
||||||
|
def _ip_estimate(dt: DateExact) -> Iterator[FallbackLocation]:
|
||||||
|
from my.location.fallback.via_ip import estimate_location as via_ip_estimate
|
||||||
|
|
||||||
|
yield from via_ip_estimate(dt)
|
||||||
|
|
||||||
|
|
||||||
@import_source(module_name="my.location.fallback.via_ip")
|
@import_source(module_name="my.location.fallback.via_ip")
|
||||||
|
|
|
@ -79,10 +79,7 @@ def _iter_estimate_from(
|
||||||
estimators: LocationEstimators,
|
estimators: LocationEstimators,
|
||||||
) -> Iterator[FallbackLocation]:
|
) -> Iterator[FallbackLocation]:
|
||||||
for est in estimators:
|
for est in estimators:
|
||||||
loc = list(est(dt))
|
yield from est(dt)
|
||||||
if not loc:
|
|
||||||
continue
|
|
||||||
yield from loc
|
|
||||||
|
|
||||||
|
|
||||||
def estimate_from(
|
def estimate_from(
|
||||||
|
|
|
@ -22,10 +22,10 @@ class Config(user_config):
|
||||||
]]
|
]]
|
||||||
]
|
]
|
||||||
|
|
||||||
# default ~1km accuracy
|
# default ~30km accuracy
|
||||||
# this is called 'home_accuracy' since it lives on the base location.config object,
|
# this is called 'home_accuracy' since it lives on the base location.config object,
|
||||||
# to differentiate it from accuracy for other providers
|
# to differentiate it from accuracy for other providers
|
||||||
home_accuracy: float = 1000
|
home_accuracy: float = 30_000
|
||||||
|
|
||||||
# TODO could make current Optional and somehow determine from system settings?
|
# TODO could make current Optional and somehow determine from system settings?
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -4,28 +4,39 @@ Converts IP addresses provided by my.location.ip to estimated locations
|
||||||
|
|
||||||
REQUIRES = ["git+https://github.com/seanbreckenridge/ipgeocache"]
|
REQUIRES = ["git+https://github.com/seanbreckenridge/ipgeocache"]
|
||||||
|
|
||||||
from my.core import dataclass, Stats
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from my.core import dataclass, Stats, make_config
|
||||||
from my.config import location
|
from my.config import location
|
||||||
from my.core.warnings import medium
|
from my.core.warnings import medium
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class config(location.via_ip):
|
class ip_config(location.via_ip):
|
||||||
# no real science to this, just a guess of ~15km accuracy for IP addresses
|
# no real science to this, just a guess of ~15km accuracy for IP addresses
|
||||||
accuracy: float = 15_000.0
|
accuracy: float = 15_000.0
|
||||||
# default to being accurate for ~10 minutes
|
# default to being accurate for a day
|
||||||
for_duration: float = 60 * 10
|
for_duration: timedelta = timedelta(hours=24)
|
||||||
|
|
||||||
|
|
||||||
from typing import Iterator
|
# TODO: move config to location.fallback.via_location instead and add migration
|
||||||
|
config = make_config(ip_config)
|
||||||
|
|
||||||
from ..common import Location
|
|
||||||
from .common import FallbackLocation
|
import bisect
|
||||||
|
from functools import lru_cache
|
||||||
|
from typing import Iterator, List
|
||||||
|
|
||||||
|
from my.core.common import LazyLogger
|
||||||
from my.ip.all import ips
|
from my.ip.all import ips
|
||||||
|
from my.location.common import Location
|
||||||
|
from my.location.fallback.common import FallbackLocation, DateExact, _datetime_timestamp
|
||||||
|
|
||||||
|
logger = LazyLogger(__name__, level="warning")
|
||||||
|
|
||||||
|
|
||||||
def fallback_locations() -> Iterator[FallbackLocation]:
|
def fallback_locations() -> Iterator[FallbackLocation]:
|
||||||
|
dur = config.for_duration.total_seconds()
|
||||||
for ip in ips():
|
for ip in ips():
|
||||||
lat, lon = ip.latlon
|
lat, lon = ip.latlon
|
||||||
yield FallbackLocation(
|
yield FallbackLocation(
|
||||||
|
@ -33,9 +44,9 @@ def fallback_locations() -> Iterator[FallbackLocation]:
|
||||||
lon=lon,
|
lon=lon,
|
||||||
dt=ip.dt,
|
dt=ip.dt,
|
||||||
accuracy=config.accuracy,
|
accuracy=config.accuracy,
|
||||||
duration=config.for_duration,
|
duration=dur,
|
||||||
elevation=None,
|
elevation=None,
|
||||||
datasource="ip",
|
datasource="via_ip",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -45,8 +56,40 @@ def locations() -> Iterator[Location]:
|
||||||
yield from map(FallbackLocation.to_location, fallback_locations())
|
yield from map(FallbackLocation.to_location, fallback_locations())
|
||||||
|
|
||||||
|
|
||||||
def estimate_location(dt: datetime) -> Location:
|
@lru_cache(1)
|
||||||
raise NotImplementedError("not implemented yet")
|
def _sorted_fallback_locations() -> List[FallbackLocation]:
|
||||||
|
fl = list(filter(lambda l: l.duration is not None, fallback_locations()))
|
||||||
|
logger.debug(f"Fallback locations: {len(fl)}, sorting...:")
|
||||||
|
fl.sort(key=lambda l: l.dt.timestamp())
|
||||||
|
return fl
|
||||||
|
|
||||||
|
|
||||||
|
def estimate_location(dt: DateExact) -> Iterator[FallbackLocation]:
|
||||||
|
# logger.debug(f"Estimating location for: {dt}")
|
||||||
|
fl = _sorted_fallback_locations()
|
||||||
|
dt_ts = _datetime_timestamp(dt)
|
||||||
|
|
||||||
|
# search to find the first possible location which contains dt (something that started up to
|
||||||
|
# config.for_duration ago, and ends after dt)
|
||||||
|
idx = bisect.bisect_left(fl, dt_ts - config.for_duration.total_seconds(), key=lambda l: l.dt.timestamp()) # type: ignore[operator]
|
||||||
|
|
||||||
|
# all items are before the given dt
|
||||||
|
if idx == len(fl):
|
||||||
|
return
|
||||||
|
|
||||||
|
# iterate through in sorted order, until we find a location that is after the given dt
|
||||||
|
while idx < len(fl):
|
||||||
|
loc = fl[idx]
|
||||||
|
start_time = loc.dt.timestamp()
|
||||||
|
# loc.duration is filtered for in _sorted_fallback_locations
|
||||||
|
end_time = start_time + loc.duration # type: ignore[operator]
|
||||||
|
if start_time <= dt_ts <= end_time:
|
||||||
|
# logger.debug(f"Found location for {dt}: {loc}")
|
||||||
|
yield loc
|
||||||
|
if end_time > dt_ts:
|
||||||
|
# logger.debug(f"Passed end time: {end_time} > {dt_ts} ({datetime.fromtimestamp(end_time)} > {datetime.fromtimestamp(dt_ts)})")
|
||||||
|
break
|
||||||
|
idx += 1
|
||||||
|
|
||||||
|
|
||||||
def stats() -> Stats:
|
def stats() -> Stats:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue