From 98b086f746fdc519cf0d51b86a777a265c359552 Mon Sep 17 00:00:00 2001 From: seanbreckenridge Date: Mon, 27 Feb 2023 20:30:06 -0800 Subject: [PATCH] location fallback (#263) see https://github.com/karlicoss/HPI/issues/262 * move home to fallback/via_home.py * move via_ip to fallback * add fallback model * add stub via_ip file * add fallback_locations for via_ip * use protocol for locations * estimate_from helper, via_home estimator, all.py * via_home: add accuracy, cache history * add datasources to gpslogger/google_takeout * tz/via_location.py: update import to fallback * denylist docs/installation instructions * tz.via_location: let user customize cachew refresh time * add via_ip.estimate_location using binary search * use estimate_location in via_home.get_location * tests: add gpslogger to location config stub * tests: install tz related libs in test env * tz: add regression test for broken windows dates * vendorize bisect_left from python src doesnt have a 'key' parameter till python3.10 --- doc/DENYLIST.md | 130 ++++++++++++++++++++++ doc/MODULE_DESIGN.org | 3 +- my/config.py | 4 +- my/core/compat.py | 32 ++++++ my/core/denylist.py | 178 +++++++++++++++++++++++++++++++ my/ip/all.py | 2 +- my/ip/common.py | 8 +- my/location/all.py | 1 + my/location/common.py | 23 +++- my/location/fallback/all.py | 53 +++++++++ my/location/fallback/common.py | 120 +++++++++++++++++++++ my/location/fallback/via_home.py | 104 ++++++++++++++++++ my/location/fallback/via_ip.py | 99 +++++++++++++++++ my/location/google_takeout.py | 7 +- my/location/gpslogger.py | 10 +- my/location/home.py | 74 +------------ my/location/via_ip.py | 38 +------ my/time/tz/via_location.py | 95 ++++++++++++----- setup.py | 1 + tests/core/test_denylist.py | 106 ++++++++++++++++++ tests/location.py | 23 +--- tests/location_fallback.py | 125 ++++++++++++++++++++++ tests/shared_config.py | 65 +++++++++++ tests/tz.py | 47 +++----- tox.ini | 8 +- 25 files changed, 1166 insertions(+), 190 deletions(-) create mode 100644 doc/DENYLIST.md create mode 100644 my/core/denylist.py create mode 100644 my/location/fallback/all.py create mode 100644 my/location/fallback/common.py create mode 100644 my/location/fallback/via_home.py create mode 100644 my/location/fallback/via_ip.py create mode 100644 tests/core/test_denylist.py create mode 100644 tests/location_fallback.py create mode 100644 tests/shared_config.py diff --git a/doc/DENYLIST.md b/doc/DENYLIST.md new file mode 100644 index 0000000..d57b8b1 --- /dev/null +++ b/doc/DENYLIST.md @@ -0,0 +1,130 @@ +For code reference, see: [`my.core.denylist.py`](../my/core/denylist.py) + +A helper module for defining denylists for sources programmatically (in layman's terms, this lets you remove some particular output from a module you don't want) + +Lets you specify a class, an attribute to match on, +and a JSON file containing a list of values to deny/filter out + +As an example, this will use the `my.ip` module, as filtering incorrect IPs was the original use case for this module: + +```python +class IP(NamedTuple): + addr: str + dt: datetime +``` + +A possible denylist file would contain: + +```json +[ + { + "addr": "192.168.1.1", + }, + { + "dt": "2020-06-02T03:12:00+00:00", + } +] +``` + +Note that if the value being compared to is not a single (non-array/object) JSON primitive +(str, int, float, bool, None), it will be converted to a string before comparison + +To use this in code: + +```python +from my.ip.all import ips +filtered = DenyList("~/data/ip_denylist.json").filter(ips()) +``` + +To add items to the denylist, in python (in a one-off script): + +```python +from my.ip.all import ips +from my.core.denylist import DenyList + +d = DenyList("~/data/ip_denylist.json") + +for ip in ips(): + # some custom code you define + if ip.addr == ...: + d.deny(key="ip", value=ip.ip) + d.write() +``` + +... or interactively, which requires [`fzf`](https://github.com/junegunn/fzf) and [`pyfzf-iter`](https://pypi.org/project/pyfzf-iter/) (`python3 -m pip install pyfzf-iter`) to be installed: + +```python +from my.ip.all import ips +from my.core.denylist import DenyList + +d = DenyList("~/data/ip_denylist.json") +d.deny_cli(ips()) # automatically writes after each selection +``` + +That will open up an interactive `fzf` prompt, where you can select an item to add to the denylist + +This is meant for relatively simple filters, where you want to filter items out +based on a single attribute of a namedtuple/dataclass. If you want to do something +more complex, I would recommend overriding the `all.py` file for that source and +writing your own filter function there. + +For more info on all.py: + +https://github.com/karlicoss/HPI/blob/master/doc/MODULE_DESIGN.org#allpy + +This would typically be used in an overridden `all.py` file, or in a one-off script +which you may want to filter out some items from a source, progressively adding more +items to the denylist as you go. + +A potential `my/ip/all.py` file might look like (Sidenote: `discord` module from [here](https://github.com/seanbreckenridge/HPI)): + +```python +from typing import Iterator + +from my.ip.common import IP +from my.core.denylist import DenyList + +deny = DenyList("~/data/ip_denylist.json") + +# all possible data from the source +def _ips() -> Iterator[IP]: + from my.ip import discord + # could add other imports here + + yield from discord.ips() + + +# filtered data +def ips() -> Iterator[IP]: + yield from deny.filter(_ips()) +``` + +To add items to the denylist, you could create a `__main__.py` in your namespace package (in this case, `my/ip/__main__.py`), with contents like: + +```python +from my.ip import all + +if __name__ == "__main__": + all.deny.deny_cli(all.ips()) +``` + +Which could then be called like: `python3 -m my.ip` + +Or, you could just run it from the command line: + +``` +python3 -c 'from my.ip import all; all.deny.deny_cli(all.ips())' +``` + +To edit the `all.py`, you could either: + +- install it as editable (`python3 -m pip install --user -e ./HPI`), and then edit the file directly +- or, create a namespace package, which splits the package across multiple directories. For info on that see [`MODULE_DESIGN`](https://github.com/karlicoss/HPI/blob/master/doc/MODULE_DESIGN.org#namespace-packages), [`reorder_editable`](https://github.com/seanbreckenridge/reorder_editable), and possibly the [`HPI-template`](https://github.com/seanbreckenridge/HPI-template) to create your own HPI namespace package to create your own `all.py` file. + +TODO: link to seanbreckenridge/HPI-personal for an example of this once this is merged/settled + +Sidenote: the reason why we want to specifically override +the all.py and not just create a script that filters out the items you're +not interested in is because we want to be able to import from `my.ip.all` +or `my.location.all` from other modules and get the filtered results, without +having to mix data filtering logic with parsing/loading/caching (the stuff HPI does) diff --git a/doc/MODULE_DESIGN.org b/doc/MODULE_DESIGN.org index b17526d..691dd1c 100644 --- a/doc/MODULE_DESIGN.org +++ b/doc/MODULE_DESIGN.org @@ -226,8 +226,7 @@ The main goals are: - doesn't require you to maintain a fork of this repository, though you can maintain a separate HPI repository (so no patching/merge conflicts) - allows you to easily add/remove sources to the ~all.py~ module, either by: - overriding an ~all.py~ in your own repository - - just commenting out the source/adding 2 lines to import and ~yield - from~ your new source + - just commenting out the source/adding 2 lines to import and ~yield from~ your new source - doing nothing! (~import_source~ will catch the error and just warn you and continue to work without changing any code) diff --git a/my/config.py b/my/config.py index e9eafec..7075d1d 100644 --- a/my/config.py +++ b/my/config.py @@ -72,16 +72,18 @@ class google: from typing import Sequence, Union, Tuple -from datetime import datetime, date +from datetime import datetime, date, timedelta DateIsh = Union[datetime, date, str] LatLon = Tuple[float, float] class location: # todo ugh, need to think about it... mypy wants the type here to be general, otherwise it can't deduce # and we can't import the types from the module itself, otherwise would be circular. common module? home: Union[LatLon, Sequence[Tuple[DateIsh, LatLon]]] = (1.0, -1.0) + home_accuracy = 30_000.0 class via_ip: accuracy: float + for_duration: timedelta class gpslogger: export_path: Paths = '' diff --git a/my/core/compat.py b/my/core/compat.py index 3c825f2..dcf97cc 100644 --- a/my/core/compat.py +++ b/my/core/compat.py @@ -125,3 +125,35 @@ else: else: from typing import Dict TypedDict = Dict + + +# bisect_left doesnt have a 'key' parameter (which we use) +# till python3.10 +if sys.version_info[:2] <= (3, 9): + from typing import List, TypeVar, Any, Optional, Callable + X = TypeVar('X') + # copied from python src + def bisect_left(a: List[Any], x: Any, lo: int=0, hi: Optional[int]=None, *, key: Optional[Callable[..., Any]]=None) -> int: + if lo < 0: + raise ValueError('lo must be non-negative') + if hi is None: + hi = len(a) + # Note, the comparison uses "<" to match the + # __lt__() logic in list.sort() and in heapq. + if key is None: + while lo < hi: + mid = (lo + hi) // 2 + if a[mid] < x: + lo = mid + 1 + else: + hi = mid + else: + while lo < hi: + mid = (lo + hi) // 2 + if key(a[mid]) < x: + lo = mid + 1 + else: + hi = mid + return lo +else: + from bisect import bisect_left # type: ignore[misc] diff --git a/my/core/denylist.py b/my/core/denylist.py new file mode 100644 index 0000000..fcf3e2b --- /dev/null +++ b/my/core/denylist.py @@ -0,0 +1,178 @@ +""" +A helper module for defining denylists for sources programatically +(in lamens terms, this lets you remove some output from a module you don't want) + +For docs, see doc/DENYLIST.md +""" + +import sys +import json +import functools +from collections import defaultdict +from typing import TypeVar, Set, Any, Mapping, Iterator, Dict, List +from pathlib import Path + +import click +from more_itertools import seekable +from my.core.serialize import dumps +from my.core.common import PathIsh +from my.core.warnings import medium + + +T = TypeVar("T") + +DenyMap = Mapping[str, Set[Any]] + + +def _default_key_func(obj: T) -> str: + return str(obj) + + +class DenyList: + def __init__(self, denylist_file: PathIsh): + self.file = Path(denylist_file).expanduser().absolute() + self._deny_raw_list: List[Dict[str, Any]] = [] + self._deny_map: DenyMap = defaultdict(set) + + # deny cli, user can override these + self.fzf_path = None + self._fzf_options = () + self._deny_cli_key_func = None + + def _load(self) -> None: + if not self.file.exists(): + medium(f"denylist file {self.file} does not exist") + return + + deny_map: DenyMap = defaultdict(set) + data: List[Dict[str, Any]]= json.loads(self.file.read_text()) + self._deny_raw_list = data + + for ignore in data: + for k, v in ignore.items(): + deny_map[k].add(v) + + self._deny_map = deny_map + + def load(self) -> DenyMap: + self._load() + return self._deny_map + + def write(self) -> None: + if not self._deny_raw_list: + medium("no denylist data to write") + return + self.file.write_text(json.dumps(self._deny_raw_list)) + + @classmethod + def _is_json_primitive(cls, val: Any) -> bool: + return isinstance(val, (str, int, float, bool, type(None))) + + @classmethod + def _stringify_value(cls, val: Any) -> Any: + # if it's a primitive, just return it + if cls._is_json_primitive(val): + return val + # otherwise, stringify-and-back so we can compare to + # json data loaded from the denylist file + return json.loads(dumps(val)) + + @classmethod + def _allow(cls, obj: T, deny_map: DenyMap) -> bool: + for deny_key, deny_set in deny_map.items(): + # this should be done separately and not as part of the getattr + # because 'null'/None could actually be a value in the denylist, + # and the user may define behavior to filter that out + if not hasattr(obj, deny_key): + return False + val = cls._stringify_value(getattr(obj, deny_key)) + # this object doesn't have have the attribute in the denylist + if val in deny_set: + return False + # if we tried all the denylist keys and didn't return False, + # then this object is allowed + return True + + def filter( + self, + itr: Iterator[T], + invert: bool = False, + ) -> Iterator[T]: + denyf = functools.partial(self._allow, deny_map=self.load()) + if invert: + return filter(lambda x: not denyf(x), itr) + return filter(denyf, itr) + + def deny(self, key: str, value: Any, write: bool = False) -> None: + ''' + add a key/value pair to the denylist + ''' + if not self._deny_raw_list: + self._load() + self._deny_raw({key: self._stringify_value(value)}, write=write) + + def _deny_raw(self, data: Dict[str, Any], write: bool = False) -> None: + self._deny_raw_list.append(data) + if write: + self.write() + + def _prompt_keys(self, item: T) -> str: + import pprint + + click.echo(pprint.pformat(item)) + # TODO: extract keys from item by checking if its dataclass/NT etc.? + resp = click.prompt("Key to deny on").strip() + if not hasattr(item, resp): + click.echo(f"Could not find key '{resp}' on item", err=True) + return self._prompt_keys(item) + return resp + + def _deny_cli_remember( + self, + items: Iterator[T], + mem: Dict[str, T], + ) -> Iterator[str]: + keyf = self._deny_cli_key_func or _default_key_func + # i.e., convert each item to a string, and map str -> item + for item in items: + key = keyf(item) + mem[key] = item + yield key + + def deny_cli(self, itr: Iterator[T]) -> None: + try: + from pyfzf import FzfPrompt + except ImportError: + click.echo("pyfzf is required to use the denylist cli, run 'python3 -m pip install pyfzf_iter'", err=True) + sys.exit(1) + + # wrap in seekable so we can use it multiple times + # progressively caches the items as we iterate over them + sit = seekable(itr) + + prompt_continue = True + + while prompt_continue: + # reset the iterator + sit.seek(0) + # so we can map the selected string from fzf back to the original objects + memory_map: Dict[str, T] = {} + picker = FzfPrompt( + executable_path=self.fzf_path, default_options="--no-multi" + ) + picked_l = picker.prompt( + self._deny_cli_remember(itr, memory_map), + "--read0", + *self._fzf_options, + delimiter="\0", + ) + assert isinstance(picked_l, list) + if picked_l: + picked: T = memory_map[picked_l[0]] + key = self._prompt_keys(picked) + self.deny(key, getattr(picked, key), write=True) + click.echo(f"Added {self._deny_raw_list[-1]} to denylist", err=True) + else: + click.echo("No item selected", err=True) + + prompt_continue = click.confirm("Continue?") diff --git a/my/ip/all.py b/my/ip/all.py index b21b543..f4cdb37 100644 --- a/my/ip/all.py +++ b/my/ip/all.py @@ -13,7 +13,7 @@ from typing import Iterator from my.core.common import Stats, warn_if_empty -from .common import IP +from my.ip.common import IP @warn_if_empty diff --git a/my/ip/common.py b/my/ip/common.py index 82008e2..b4bfc8e 100644 --- a/my/ip/common.py +++ b/my/ip/common.py @@ -7,7 +7,7 @@ REQUIRES = ["git+https://github.com/seanbreckenridge/ipgeocache"] from my.core import __NOT_HPI_MODULE__ import ipaddress -from typing import NamedTuple, Iterator +from typing import NamedTuple, Iterator, Tuple from datetime import datetime import ipgeocache @@ -23,6 +23,12 @@ class IP(NamedTuple): def ipgeocache(self) -> Json: return ipgeocache.get(self.addr) + @property + def latlon(self) -> Tuple[float, float]: + loc: str = self.ipgeocache()["loc"] + lat, _, lon = loc.partition(",") + return float(lat), float(lon) + @property def tzname(self) -> str: tz: str = self.ipgeocache()["timezone"] diff --git a/my/location/all.py b/my/location/all.py index eec4bcc..8d51a82 100644 --- a/my/location/all.py +++ b/my/location/all.py @@ -32,6 +32,7 @@ def _gpslogger_locations() -> Iterator[Location]: yield from gpslogger.locations() +# TODO: remove, user should use fallback.estimate_location or fallback.fallback_locations instead @import_source(module_name="my.location.via_ip") def _ip_locations() -> Iterator[Location]: from . import via_ip diff --git a/my/location/common.py b/my/location/common.py index b0676ec..fa8bdad 100644 --- a/my/location/common.py +++ b/my/location/common.py @@ -1,17 +1,34 @@ from datetime import date, datetime -from typing import Union, Tuple, NamedTuple, Optional +from typing import Union, Tuple, Optional +from dataclasses import dataclass from my.core import __NOT_HPI_MODULE__ +from my.core.compat import Protocol DateIsh = Union[datetime, date, str] LatLon = Tuple[float, float] -# TODO: add timezone to this? can use timezonefinder in tz provider instead though -class Location(NamedTuple): +class LocationProtocol(Protocol): lat: float lon: float dt: datetime accuracy: Optional[float] elevation: Optional[float] + datasource: Optional[str] = None # which module provided this, useful for debugging + + +# TODO: add timezone to this? can use timezonefinder in tz provider instead though + + +# converted from namedtuple to a dataclass so datasource field can be added optionally +# if we want, can eventually be converted back to a namedtuple when all datasources are compliant +@dataclass(frozen=True, eq=True) +class Location(LocationProtocol): + lat: float + lon: float + dt: datetime + accuracy: Optional[float] + elevation: Optional[float] + datasource: Optional[str] = None # which module provided this, useful for debugging diff --git a/my/location/fallback/all.py b/my/location/fallback/all.py new file mode 100644 index 0000000..0c7b8cd --- /dev/null +++ b/my/location/fallback/all.py @@ -0,0 +1,53 @@ +# TODO: add config here which passes kwargs to estimate_from (under_accuracy) +# overwritable by passing the kwarg name here to the top-level estimate_location + +from typing import Iterator, Optional + +from my.core.source import import_source +from my.location.fallback.common import ( + estimate_from, + FallbackLocation, + DateExact, + LocationEstimator, +) + + +def fallback_locations() -> Iterator[FallbackLocation]: + # can comment/uncomment sources here to enable/disable them + yield from _ip_fallback_locations() + + +def fallback_estimators() -> Iterator[LocationEstimator]: + # can comment/uncomment estimators here to enable/disable them + # the order of the estimators determines priority if location accuries are equal/unavailable + yield _ip_estimate + yield _home_estimate + + +def estimate_location(dt: DateExact, first_match: bool=False, under_accuracy: Optional[int] = None) -> FallbackLocation: + loc = estimate_from(dt, estimators=list(fallback_estimators()), first_match=first_match, under_accuracy=under_accuracy) + # should never happen if the user has home configured + if loc is None: + raise ValueError("Could not estimate location") + return loc + + +@import_source(module_name="my.location.fallback.via_home") +def _home_estimate(dt: DateExact) -> Iterator[FallbackLocation]: + from my.location.fallback.via_home import estimate_location as via_home_estimate + + yield from via_home_estimate(dt) + + +@import_source(module_name="my.location.fallback.via_ip") +def _ip_estimate(dt: DateExact) -> Iterator[FallbackLocation]: + from my.location.fallback.via_ip import estimate_location as via_ip_estimate + + yield from via_ip_estimate(dt) + + +@import_source(module_name="my.location.fallback.via_ip") +def _ip_fallback_locations() -> Iterator[FallbackLocation]: + from my.location.fallback.via_ip import fallback_locations as via_ip_fallback + + yield from via_ip_fallback() diff --git a/my/location/fallback/common.py b/my/location/fallback/common.py new file mode 100644 index 0000000..fa1d4c5 --- /dev/null +++ b/my/location/fallback/common.py @@ -0,0 +1,120 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Optional, Callable, Sequence, Iterator, List, Union +from datetime import datetime, timedelta, timezone + +from ..common import LocationProtocol, Location +DateExact = Union[datetime, float, int] # float/int as epoch timestamps + +Second = float + +@dataclass +class FallbackLocation(LocationProtocol): + lat: float + lon: float + dt: datetime + duration: Optional[Second] = None + accuracy: Optional[float] = None + elevation: Optional[float] = None + datasource: Optional[str] = None # which module provided this, useful for debugging + + def to_location(self, end: bool = False) -> Location: + ''' + by default the start date is used for the location + If end is True, the start date + duration is used + ''' + dt: datetime = self.dt + if end and self.duration is not None: + dt += timedelta(self.duration) + return Location( + lat=self.lat, + lon=self.lon, + dt=dt, + accuracy=self.accuracy, + elevation=self.elevation, + datasource=self.datasource, + ) + + @classmethod + def from_end_date( + cls, + *, + lat: float, + lon: float, + dt: datetime, + end_dt: datetime, + accuracy: Optional[float] = None, + elevation: Optional[float] = None, + datasource: Optional[str] = None, + ) -> FallbackLocation: + ''' + Create FallbackLocation from a start date and an end date + ''' + if end_dt < dt: + raise ValueError("end_date must be after dt") + duration = (end_dt - dt).total_seconds() + return cls( + lat=lat, + lon=lon, + dt=dt, + duration=duration, + accuracy=accuracy, + elevation=elevation, + datasource=datasource, + ) + + +# a location estimator can return multiple fallbacks, incase there are +# differing accuracies/to allow for possible matches to be computed +# iteratively +LocationEstimator = Callable[[DateExact], Iterator[FallbackLocation]] +LocationEstimators = Sequence[LocationEstimator] + +# helper function, instead of dealing with datetimes while comparing, just use epoch timestamps +def _datetime_timestamp(dt: DateExact) -> float: + if isinstance(dt, datetime): + try: + return dt.timestamp() + except ValueError: + # https://github.com/python/cpython/issues/75395 + return dt.replace(tzinfo=timezone.utc).timestamp() + return float(dt) + +def _iter_estimate_from( + dt: DateExact, + estimators: LocationEstimators, +) -> Iterator[FallbackLocation]: + for est in estimators: + yield from est(dt) + + +def estimate_from( + dt: DateExact, + estimators: LocationEstimators, + *, + first_match: bool = False, + under_accuracy: Optional[int] = None, +) -> Optional[FallbackLocation]: + ''' + first_match: if True, return the first location found + under_accuracy: if set, only return locations with accuracy under this value + ''' + found: List[FallbackLocation] = [] + for loc in _iter_estimate_from(dt, estimators): + if under_accuracy is not None and loc.accuracy is not None and loc.accuracy > under_accuracy: + continue + if first_match: + return loc + found.append(loc) + + if not found: + return None + + # if all items have accuracy, return the one with the lowest accuracy + # otherwise, we should prefer the order that the estimators are passed in as + if all(loc.accuracy is not None for loc in found): + # return the location with the lowest accuracy + return min(found, key=lambda loc: loc.accuracy) # type: ignore[return-value, arg-type] + else: + # return the first location + return found[0] diff --git a/my/location/fallback/via_home.py b/my/location/fallback/via_home.py new file mode 100644 index 0000000..240da84 --- /dev/null +++ b/my/location/fallback/via_home.py @@ -0,0 +1,104 @@ +''' +Simple location provider, serving as a fallback when more detailed data isn't available +''' + +from dataclasses import dataclass +from datetime import datetime, time, timezone +from functools import lru_cache +from typing import Sequence, Tuple, Union, cast, List, Iterator + +from my.config import location as user_config + +from my.location.common import LatLon, DateIsh +from my.location.fallback.common import FallbackLocation, DateExact + +@dataclass +class Config(user_config): + home: Union[ + LatLon, # either single, 'current' location + Sequence[Tuple[ # or, a sequence of location history + DateIsh, # date when you moved to + LatLon, # the location + ]] + ] + + # default ~30km accuracy + # this is called 'home_accuracy' since it lives on the base location.config object, + # to differentiate it from accuracy for other providers + home_accuracy: float = 30_000 + + # TODO could make current Optional and somehow determine from system settings? + @property + def _history(self) -> Sequence[Tuple[datetime, LatLon]]: + home1 = self.home + # todo ugh, can't test for isnstance LatLon, it's a tuple itself + home2: Sequence[Tuple[DateIsh, LatLon]] + if isinstance(home1[0], tuple): + # already a sequence + home2 = cast(Sequence[Tuple[DateIsh, LatLon]], home1) + else: + # must be a pair of coordinates. also doesn't really matter which date to pick? + loc = cast(LatLon, home1) + home2 = [(datetime.min, loc)] + + # todo cache? + res = [] + for x, loc in home2: + dt: datetime + if isinstance(x, str): + dt = datetime.fromisoformat(x) + elif isinstance(x, datetime): + dt = x + else: + dt = datetime.combine(x, time.min) + # todo not sure about doing it here, but makes it easier to compare.. + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + res.append((dt, loc)) + res = list(sorted(res, key=lambda p: p[0])) + return res + + +from ...core.cfg import make_config +config = make_config(Config) + + +@lru_cache(maxsize=None) +def get_location(dt: datetime) -> LatLon: + ''' + Interpolates the location at dt + ''' + loc = list(estimate_location(dt)) + assert len(loc) == 1 + return loc[0].lat, loc[0].lon + + +# TODO: in python3.9, use functools.cached_property instead? +@lru_cache(maxsize=None) +def homes_cached() -> List[Tuple[datetime, LatLon]]: + return list(config._history) + + +def estimate_location(dt: DateExact) -> Iterator[FallbackLocation]: + from my.location.fallback.common import _datetime_timestamp + d: float = _datetime_timestamp(dt) + hist = list(reversed(homes_cached())) + for pdt, (lat, lon) in hist: + if d >= pdt.timestamp(): + yield FallbackLocation( + lat=lat, + lon=lon, + accuracy=config.home_accuracy, + dt=datetime.fromtimestamp(d, timezone.utc), + datasource='via_home') + return + else: + # I guess the most reasonable is to fallback on the first location + lat, lon = hist[-1][1] + yield FallbackLocation( + lat=lat, + lon=lon, + accuracy=config.home_accuracy, + dt=datetime.fromtimestamp(d, timezone.utc), + datasource='via_home') + return diff --git a/my/location/fallback/via_ip.py b/my/location/fallback/via_ip.py new file mode 100644 index 0000000..1da2315 --- /dev/null +++ b/my/location/fallback/via_ip.py @@ -0,0 +1,99 @@ +""" +Converts IP addresses provided by my.location.ip to estimated locations +""" + +REQUIRES = ["git+https://github.com/seanbreckenridge/ipgeocache"] + +from datetime import timedelta + +from my.core import dataclass, Stats, make_config +from my.config import location +from my.core.warnings import medium + + +@dataclass +class ip_config(location.via_ip): + # no real science to this, just a guess of ~15km accuracy for IP addresses + accuracy: float = 15_000.0 + # default to being accurate for a day + for_duration: timedelta = timedelta(hours=24) + + +# TODO: move config to location.fallback.via_location instead and add migration +config = make_config(ip_config) + + +from functools import lru_cache +from typing import Iterator, List + +from my.core.common import LazyLogger +from my.core.compat import bisect_left +from my.ip.all import ips +from my.location.common import Location +from my.location.fallback.common import FallbackLocation, DateExact, _datetime_timestamp + +logger = LazyLogger(__name__, level="warning") + + +def fallback_locations() -> Iterator[FallbackLocation]: + dur = config.for_duration.total_seconds() + for ip in ips(): + lat, lon = ip.latlon + yield FallbackLocation( + lat=lat, + lon=lon, + dt=ip.dt, + accuracy=config.accuracy, + duration=dur, + elevation=None, + datasource="via_ip", + ) + + +# for compatibility with my.location.via_ip, this shouldnt be used by other modules +def locations() -> Iterator[Location]: + medium("locations is deprecated, should use fallback_locations or estimate_location") + yield from map(FallbackLocation.to_location, fallback_locations()) + + +@lru_cache(1) +def _sorted_fallback_locations() -> List[FallbackLocation]: + fl = list(filter(lambda l: l.duration is not None, fallback_locations())) + logger.debug(f"Fallback locations: {len(fl)}, sorting...:") + fl.sort(key=lambda l: l.dt.timestamp()) + return fl + + +def estimate_location(dt: DateExact) -> Iterator[FallbackLocation]: + # logger.debug(f"Estimating location for: {dt}") + fl = _sorted_fallback_locations() + dt_ts = _datetime_timestamp(dt) + + # search to find the first possible location which contains dt (something that started up to + # config.for_duration ago, and ends after dt) + idx = bisect_left(fl, dt_ts - config.for_duration.total_seconds(), key=lambda l: l.dt.timestamp()) # type: ignore[operator,call-arg,type-var] + + # all items are before the given dt + if idx == len(fl): + return + + # iterate through in sorted order, until we find a location that is after the given dt + while idx < len(fl): + loc = fl[idx] + start_time = loc.dt.timestamp() + # loc.duration is filtered for in _sorted_fallback_locations + end_time = start_time + loc.duration # type: ignore[operator] + if start_time <= dt_ts <= end_time: + # logger.debug(f"Found location for {dt}: {loc}") + yield loc + # no more locations could possibly contain dt + if start_time > dt_ts: + # logger.debug(f"Passed start time: {end_time} > {dt_ts} ({datetime.fromtimestamp(end_time)} > {datetime.fromtimestamp(dt_ts)})") + break + idx += 1 + + +def stats() -> Stats: + from my.core import stat + + return {**stat(locations)} diff --git a/my/location/google_takeout.py b/my/location/google_takeout.py index 80b31cb..a1c1403 100644 --- a/my/location/google_takeout.py +++ b/my/location/google_takeout.py @@ -23,7 +23,12 @@ def locations() -> Iterator[Location]: for g in events(): if isinstance(g, GoogleLocation): yield Location( - lon=g.lng, lat=g.lat, dt=g.dt, accuracy=g.accuracy, elevation=None + lon=g.lng, + lat=g.lat, + dt=g.dt, + accuracy=g.accuracy, + elevation=None, + datasource="google_takeout", ) diff --git a/my/location/gpslogger.py b/my/location/gpslogger.py index 95f4474..46fc381 100644 --- a/my/location/gpslogger.py +++ b/my/location/gpslogger.py @@ -32,9 +32,16 @@ from .common import Location logger = LazyLogger(__name__, level="warning") +def _input_sort_key(path: Path) -> str: + if "_" in path.name: + return path.name.split("_", maxsplit=1)[1] + return path.name + def inputs() -> Sequence[Path]: - return get_files(config.export_path, glob="*.gpx") + # gpslogger files can optionally be prefixed by a device id, + # like b5760c66102a5269_20211214142156.gpx + return sorted(get_files(config.export_path, glob="*.gpx", sort=False), key=_input_sort_key) def _cachew_depends_on() -> List[float]: @@ -65,6 +72,7 @@ def _extract_locations(path: Path) -> Iterator[Location]: accuracy=config.accuracy, elevation=point.elevation, dt=datetime.replace(point.time, tzinfo=timezone.utc), + datasource="gpslogger", ) diff --git a/my/location/home.py b/my/location/home.py index ac0fcb8..f6e6978 100644 --- a/my/location/home.py +++ b/my/location/home.py @@ -1,71 +1,7 @@ -''' -Simple location provider, serving as a fallback when more detailed data isn't available -''' -from dataclasses import dataclass -from datetime import datetime, time, timezone -from functools import lru_cache -from typing import Sequence, Tuple, Union, cast +from .fallback.via_home import * -from my.config import location as user_config +from my.core.warnings import high -from my.location.common import LatLon, DateIsh - -@dataclass -class Config(user_config): - home: Union[ - LatLon, # either single, 'current' location - Sequence[Tuple[ # or, a sequence of location history - DateIsh, # date when you moved to - LatLon, # the location - ]] - ] - # TODO could make current Optional and somehow determine from system settings? - @property - def _history(self) -> Sequence[Tuple[datetime, LatLon]]: - home1 = self.home - # todo ugh, can't test for isnstance LatLon, it's a tuple itself - home2: Sequence[Tuple[DateIsh, LatLon]] - if isinstance(home1[0], tuple): - # already a sequence - home2 = cast(Sequence[Tuple[DateIsh, LatLon]], home1) - else: - # must be a pair of coordinates. also doesn't really matter which date to pick? - loc = cast(LatLon, home1) - home2 = [(datetime.min, loc)] - - # todo cache? - res = [] - for x, loc in home2: - dt: datetime - if isinstance(x, str): - dt = datetime.fromisoformat(x) - elif isinstance(x, datetime): - dt = x - else: - dt = datetime.combine(x, time.min) - # todo not sure about doing it here, but makes it easier to compare.. - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - res.append((dt, loc)) - res = list(sorted(res, key=lambda p: p[0])) - return res - - -from ..core.cfg import make_config -config = make_config(Config) - - -@lru_cache(maxsize=None) -def get_location(dt: datetime) -> LatLon: - ''' - Interpolates the location at dt - ''' - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - hist = list(reversed(config._history)) - for pdt, loc in hist: - if dt >= pdt: - return loc - else: - # I guess the most reasonable is to fallback on the first location - return hist[-1][1] +high( + "my.location.home is deprecated, use my.location.fallback.via_home instead, or estimate locations using the higher-level my.location.fallback.all.estimate_location" +) diff --git a/my/location/via_ip.py b/my/location/via_ip.py index e882cdb..df48f8b 100644 --- a/my/location/via_ip.py +++ b/my/location/via_ip.py @@ -1,39 +1,7 @@ -""" -Converts IP addresses provided by my.location.ip to estimated locations -""" - REQUIRES = ["git+https://github.com/seanbreckenridge/ipgeocache"] -from my.core import dataclass, Stats -from my.config import location +from .fallback.via_ip import * +from my.core.warnings import high -@dataclass -class config(location.via_ip): - # no real science to this, just a guess of ~15km accuracy for IP addresses - accuracy: float = 15_000.0 - - -from typing import Iterator - -from .common import Location -from my.ip.all import ips - - -def locations() -> Iterator[Location]: - for ip in ips(): - loc: str = ip.ipgeocache()["loc"] - lat, _, lon = loc.partition(",") - yield Location( - lat=float(lat), - lon=float(lon), - dt=ip.dt, - accuracy=config.accuracy, - elevation=None, - ) - - -def stats() -> Stats: - from my.core import stat - - return {**stat(locations)} +high("my.location.via_ip is deprecated, use my.location.fallback.via_ip instead") diff --git a/my/time/tz/via_location.py b/my/time/tz/via_location.py index 6b8e835..e111a4a 100644 --- a/my/time/tz/via_location.py +++ b/my/time/tz/via_location.py @@ -41,17 +41,23 @@ class config(user_config): # if the accuracy for the location is more than 5km, don't use require_accuracy: float = 5_000 + # how often (hours) to refresh the cachew timezone cache + # this may be removed in the future if we opt for dict-based caching + _iter_tz_refresh_time: int = 6 + from collections import Counter from datetime import date, datetime from functools import lru_cache from itertools import groupby -from typing import Iterator, NamedTuple, Optional, Tuple, Any, List, Iterable +from typing import Iterator, NamedTuple, Optional, Tuple, Any, List, Iterable, Set -from more_itertools import seekable +import heapq import pytz +from more_itertools import seekable from my.core.common import LazyLogger, mcachew, tzdatetime +from my.core.source import import_source logger = LazyLogger(__name__, level='warning') @@ -102,23 +108,13 @@ def _sorted_locations() -> List[Tuple[LatLon, datetime]]: return list(sorted(_locations(), key=lambda x: x[1])) -# Note: this takes a while, as the upstream since _locations isn't sorted, so this -# has to do an iterative sort of the entire my.locations.all list -def _iter_local_dates() -> Iterator[DayWithZone]: - finder = _timezone_finder(fast=config.fast) # rely on the default - #pdt = None - # TODO: warnings doesnt actually warn? - warnings = [] - - locs: Iterable[Tuple[LatLon, datetime]] - locs = _sorted_locations() if config.sort_locations else _locations() - - # todo allow to skip if not noo many errors in row? +def _find_tz_for_locs(finder: Any, locs: Iterable[Tuple[LatLon, datetime]]) -> Iterator[DayWithZone]: for (lat, lon), dt in locs: # TODO right. its _very_ slow... zone = finder.timezone_at(lat=lat, lng=lon) + # todo allow to skip if not noo many errors in row? if zone is None: - warnings.append(f"Couldn't figure out tz for {lat}, {lon}") + # warnings.append(f"Couldn't figure out tz for {lat}, {lon}") continue tz = pytz.timezone(zone) # TODO this is probably a bit expensive... test & benchmark @@ -133,6 +129,33 @@ def _iter_local_dates() -> Iterator[DayWithZone]: z = tz.zone; assert z is not None yield DayWithZone(day=ndate, zone=z) +# Note: this takes a while, as the upstream since _locations isn't sorted, so this +# has to do an iterative sort of the entire my.locations.all list +def _iter_local_dates() -> Iterator[DayWithZone]: + finder = _timezone_finder(fast=config.fast) # rely on the default + #pdt = None + # TODO: warnings doesnt actually warn? + # warnings = [] + + locs: Iterable[Tuple[LatLon, datetime]] + locs = _sorted_locations() if config.sort_locations else _locations() + + yield from _find_tz_for_locs(finder, locs) + + +# my.location.fallback.estimate_location could be used here +# but iterating through all the locations is faster since this +# is saved behind cachew +@import_source(module_name="my.location.fallback.all") +def _iter_local_dates_fallback() -> Iterator[DayWithZone]: + from my.location.fallback.all import fallback_locations as flocs + + def _fallback_locations() -> Iterator[Tuple[LatLon, datetime]]: + for loc in sorted(flocs(), key=lambda x: x.dt): + yield ((loc.lat, loc.lon), loc.dt) + + yield from _find_tz_for_locs(_timezone_finder(fast=config.fast), _fallback_locations()) + def most_common(lst: List[DayWithZone]) -> DayWithZone: res, _ = Counter(lst).most_common(1)[0] # type: ignore[var-annotated] @@ -142,27 +165,43 @@ def most_common(lst: List[DayWithZone]) -> DayWithZone: def _iter_tz_depends_on() -> str: """ Since you might get new data which specifies a new timezone sometime - in the day, this causes _iter_tzs to refresh every 6 hours, like: + in the day, this causes _iter_tzs to refresh every _iter_tz_refresh_time hours + (default 6), like: 2022-04-26_00 2022-04-26_06 2022-04-26_12 2022-04-26_18 """ + mod = config._iter_tz_refresh_time + assert mod >= 1 day = str(date.today()) hr = datetime.now().hour - hr_truncated = hr // 6 * 6 + hr_truncated = hr // mod * mod return "{}_{}".format(day, hr_truncated) -# refresh _iter_tzs every 6 hours -- don't think a better depends_on is possible dynamically +# refresh _iter_tzs every few hours -- don't think a better depends_on is possible dynamically @mcachew(logger=logger, depends_on=_iter_tz_depends_on) def _iter_tzs() -> Iterator[DayWithZone]: # since we have no control over what order the locations are returned, # we need to sort them first before we can do a groupby local_dates: List[DayWithZone] = list(_iter_local_dates()) local_dates.sort(key=lambda p: p.day) - for d, gr in groupby(local_dates, key=lambda p: p.day): - logger.info('processed %s', d) + logger.debug(f"no. of items using exact locations: {len(local_dates)}") + + local_dates_fallback: List[DayWithZone] = list(_iter_local_dates_fallback()) + local_dates_fallback.sort(key=lambda p: p.day) + + # find days that are in fallback but not in local_dates (i.e., missing days) + local_dates_set: Set[date] = set(d.day for d in local_dates) + use_fallback_days: List[DayWithZone] = [d for d in local_dates_fallback if d.day not in local_dates_set] + logger.debug(f"no. of items being used from fallback locations: {len(use_fallback_days)}") + + # combine local_dates and missing days from fallback into a sorted list + all_dates = heapq.merge(local_dates, use_fallback_days, key=lambda p: p.day) + + for d, gr in groupby(all_dates, key=lambda p: p.day): + logger.info(f"processed {d}{', using fallback' if d in local_dates_set else ''}") zone = most_common(list(gr)).zone yield DayWithZone(day=d, zone=zone) @@ -192,7 +231,7 @@ def _get_day_tz(d: date) -> Optional[pytz.BaseTzInfo]: # ok to cache, there are only a few home locations? @lru_cache(maxsize=None) -def _get_home_tz(loc) -> Optional[pytz.BaseTzInfo]: +def _get_home_tz(loc: LatLon) -> Optional[pytz.BaseTzInfo]: (lat, lng) = loc finder = _timezone_finder(fast=False) # ok to use slow here for better precision zone = finder.timezone_at(lat=lat, lng=lng) @@ -211,9 +250,17 @@ def _get_tz(dt: datetime) -> Optional[pytz.BaseTzInfo]: if res is not None: return res # fallback to home tz - from ...location import home - loc = home.get_location(dt) - return _get_home_tz(loc=loc) + # note: the fallback to fallback.via_home.estimate_location is still needed, since + # _iter_local_dates_fallback only returns days which we actually have a datetime for + # (e.g. there was an IP address within a day of that datetime) + # + # given a datetime, fallback.via_home.estimate_location will find which home location + # that datetime is between, else fallback on your first home location, so it acts + # as a last resort + from my.location.fallback import via_home as home + loc = list(home.estimate_location(dt)) + assert len(loc) == 1, f"should only have one home location, received {loc}" + return _get_home_tz(loc=(loc[0].lat, loc[0].lon)) # expose as 'public' function get_tz = _get_tz diff --git a/setup.py b/setup.py index 31fc393..b0f4ab6 100644 --- a/setup.py +++ b/setup.py @@ -57,6 +57,7 @@ def main() -> None: # todo document these? 'logzero', 'orjson', # for my.core.serialize + 'pyfzf_iter', # for my.core.denylist 'cachew>=0.8.0', 'mypy', # used for config checks ], diff --git a/tests/core/test_denylist.py b/tests/core/test_denylist.py new file mode 100644 index 0000000..d6f4c49 --- /dev/null +++ b/tests/core/test_denylist.py @@ -0,0 +1,106 @@ +import warnings + +import json +from pathlib import Path +from datetime import datetime +from typing import NamedTuple, Iterator + +from my.core.denylist import DenyList + + +class IP(NamedTuple): + addr: str + dt: datetime + + +def data() -> Iterator[IP]: + # random IP addresses + yield IP(addr="67.98.113.0", dt=datetime(2020, 1, 1)) + yield IP(addr="59.40.113.87", dt=datetime(2020, 2, 1)) + yield IP(addr="161.235.192.228", dt=datetime(2020, 3, 1)) + yield IP(addr="165.243.139.87", dt=datetime(2020, 4, 1)) + yield IP(addr="69.69.141.154", dt=datetime(2020, 5, 1)) + yield IP(addr="50.72.224.80", dt=datetime(2020, 6, 1)) + yield IP(addr="221.67.89.168", dt=datetime(2020, 7, 1)) + yield IP(addr="177.113.119.251", dt=datetime(2020, 8, 1)) + yield IP(addr="93.200.246.215", dt=datetime(2020, 9, 1)) + yield IP(addr="127.105.171.61", dt=datetime(2020, 10, 1)) + + +def test_denylist(tmp_path: Path) -> None: + tf = (tmp_path / "denylist.json").absolute() + with warnings.catch_warnings(record=True): + + # create empty denylist (though file does not have to exist for denylist to work) + tf.write_text("[]") + + d = DenyList(tf) + + d.load() + assert dict(d._deny_map) == {} + assert d._deny_raw_list == [] + + assert list(d.filter(data())) == list(data()) + # no data in denylist yet + assert len(d._deny_map) == 0 + assert len(d._deny_raw_list) == 0 + + # add some data + d.deny(key="addr", value="67.98.113.0") + # write and reload to update _deny_map, _deny_raw_list + d.write() + d.load() + + assert len(d._deny_map) == 1 + assert len(d._deny_raw_list) == 1 + + assert d._deny_raw_list == [{"addr": "67.98.113.0"}] + + filtered = list(d.filter(data())) + assert len(filtered) == 9 + assert "67.98.113.0" not in [i.addr for i in filtered] + + assert dict(d._deny_map) == {"addr": {"67.98.113.0"}} + + denied = list(d.filter(data(), invert=True)) + assert len(denied) == 1 + + assert denied[0] == IP(addr="67.98.113.0", dt=datetime(2020, 1, 1)) + + # add some non-JSON primitive data + + d.deny(key="dt", value=datetime(2020, 2, 1)) + + # test internal behavior, _deny_raw_list should have been updated, + # but _deny_map doesnt get updated by a call to .deny + # + # if we change this just update the test, is just here to ensure + # this is the behaviour + + assert len(d._deny_map) == 1 + + # write and load to update _deny_map + d.write() + d.load() + + assert len(d._deny_map) == 2 + assert len(d._deny_raw_list) == 2 + + assert d._deny_raw_list[-1] == {"dt": "2020-02-01T00:00:00"} + + filtered = list(d.filter(data())) + assert len(filtered) == 8 + + assert "59.40.113.87" not in [i.addr for i in filtered] + + with open(tf, "r") as f: + data_json = json.loads(f.read()) + + assert data_json == [ + { + "addr": "67.98.113.0", + }, + { + "dt": "2020-02-01T00:00:00", + }, + ] diff --git a/tests/location.py b/tests/location.py index 298b7ba..c47849e 100644 --- a/tests/location.py +++ b/tests/location.py @@ -1,7 +1,5 @@ from pathlib import Path -from more_itertools import one - import pytest # type: ignore @@ -20,26 +18,11 @@ def test() -> None: @pytest.fixture(autouse=True) def prepare(tmp_path: Path): - from .common import reset_modules - reset_modules() - - user_config = _prepare_google_config(tmp_path) + from .shared_config import temp_config + user_config = temp_config(tmp_path) import my.core.cfg as C with C.tmp_config() as config: - config.google = user_config # type: ignore + config.google = user_config.google yield - -def _prepare_google_config(tmp_path: Path): - from .common import testdata - track = one(testdata().rglob('italy-slovenia-2017-07-29.json')) - - # todo ugh. unnecessary zipping, but at the moment takeout provider doesn't support plain dirs - import zipfile - with zipfile.ZipFile(tmp_path / 'takeout.zip', 'w') as zf: - zf.writestr('Takeout/Location History/Location History.json', track.read_bytes()) - - class google_config: - takeout_path = tmp_path - return google_config diff --git a/tests/location_fallback.py b/tests/location_fallback.py new file mode 100644 index 0000000..aad33ee --- /dev/null +++ b/tests/location_fallback.py @@ -0,0 +1,125 @@ +""" +To test my.location.fallback_location.all +""" + +from typing import Iterator +from datetime import datetime, timezone, timedelta + +from more_itertools import ilen + +from my.ip.common import IP + +def data() -> Iterator[IP]: + # random IP addresses + yield IP(addr="67.98.113.0", dt=datetime(2020, 1, 1, 12, 0, 0, tzinfo=timezone.utc)) + yield IP(addr="67.98.112.0", dt=datetime(2020, 1, 15, 12, 0, 0, tzinfo=timezone.utc)) + yield IP(addr="59.40.113.87", dt=datetime(2020, 2, 1, 12, 0, 0, tzinfo=timezone.utc)) + yield IP(addr="59.40.139.87", dt=datetime(2020, 2, 1, 16, 0, 0, tzinfo=timezone.utc)) + yield IP(addr="161.235.192.228", dt=datetime(2020, 3, 1, 12, 0, 0, tzinfo=timezone.utc)) + +# redefine the my.ip.all function using data for testing +import my.ip.all as ip_module +ip_module.ips = data + +from my.location.fallback import via_ip + +# these are all tests for the bisect algorithm defined in via_ip.py +# to make sure we can correctly find IPs that are within the 'for_duration' of a given datetime + +def test_ip_fallback() -> None: + # make sure that the data override works + assert ilen(ip_module.ips()) == ilen(data()) + assert ilen(ip_module.ips()) == ilen(via_ip.fallback_locations()) + assert ilen(via_ip.fallback_locations()) == 5 + assert ilen(via_ip._sorted_fallback_locations()) == 5 + + # confirm duration from via_ip since that is used for bisect + assert via_ip.config.for_duration == timedelta(hours=24) + + # basic tests + + # try estimating slightly before the first IP + est = list(via_ip.estimate_location(datetime(2020, 1, 1, 11, 59, 59, tzinfo=timezone.utc))) + assert len(est) == 0 + + # during the duration for the first IP + est = list(via_ip.estimate_location(datetime(2020, 1, 1, 12, 30, 0, tzinfo=timezone.utc))) + assert len(est) == 1 + + # right after the 'for_duration' for an IP + est = list(via_ip.estimate_location(datetime(2020, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + via_ip.config.for_duration + timedelta(seconds=1))) + assert len(est) == 0 + + # on 2/1/2020, threes one IP if before 16:30 + est = list(via_ip.estimate_location(datetime(2020, 2, 1, 12, 30, 0, tzinfo=timezone.utc))) + assert len(est) == 1 + + # and two if after 16:30 + est = list(via_ip.estimate_location(datetime(2020, 2, 1, 17, 00, 0, tzinfo=timezone.utc))) + assert len(est) == 2 + + # the 12:30 IP should 'expire' before the 16:30 IP, use 3:30PM on the next day + est = list(via_ip.estimate_location(datetime(2020, 2, 2, 15, 30, 0, tzinfo=timezone.utc))) + assert len(est) == 1 + + use_dt = datetime(2020, 3, 1, 12, 15, 0, tzinfo=timezone.utc) + + # test last IP + est = list(via_ip.estimate_location(use_dt)) + assert len(est) == 1 + + # datetime should be the IPs, not the passed IP (if via_home, it uses the passed dt) + assert est[0].dt != use_dt + + # test interop with other fallback estimators/all.py + # + # redefine fallback_estimators to prevent possible namespace packages the user + # may have installed from having side effects testing this + from my.location.fallback import all + from my.location.fallback import via_home + def _fe() -> Iterator[all.LocationEstimator]: + yield via_ip.estimate_location + yield via_home.estimate_location + + all.fallback_estimators = _fe + assert ilen(all.fallback_estimators()) == 2 + + # test that all.estimate_location has access to both IPs + # + # just passing via_ip should give one IP + from my.location.fallback.common import _iter_estimate_from + raw_est = list(_iter_estimate_from(use_dt, (via_ip.estimate_location,))) + assert len(raw_est) == 1 + assert raw_est[0].datasource == "via_ip" + assert raw_est[0].accuracy == 15_000 + + # passing home should give one + home_est = list(_iter_estimate_from(use_dt, (via_home.estimate_location,))) + assert len(home_est) == 1 + assert home_est[0].accuracy == 30_000 + + # make sure ip accuracy is more accurate + assert raw_est[0].accuracy < home_est[0].accuracy + + # passing both should give two + raw_est = list(_iter_estimate_from(use_dt, (via_ip.estimate_location, via_home.estimate_location))) + assert len(raw_est) == 2 + + # shouldn't raise value error + all_est = all.estimate_location(use_dt) + # should have used the IP from via_ip since it was more accurate + assert all_est.datasource == "via_ip" + + # test that a home defined in shared_config.py is used if no IP is found + loc = all.estimate_location(datetime(2021, 1, 1, 12, 30, 0, tzinfo=timezone.utc)) + assert loc.datasource == "via_home" + + # test a different home using location.fallback.all + bulgaria = all.estimate_location(datetime(2006, 1, 1, 12, 30, 0, tzinfo=timezone.utc)) + assert bulgaria.datasource == "via_home" + assert (bulgaria.lat, bulgaria.lon) == (42.697842, 23.325973) + assert (loc.lat, loc.lon) != (bulgaria.lat, bulgaria.lon) + + +# re-use prepare fixture for overriding config from shared_config.py +from .tz import prepare diff --git a/tests/shared_config.py b/tests/shared_config.py new file mode 100644 index 0000000..6b83a5a --- /dev/null +++ b/tests/shared_config.py @@ -0,0 +1,65 @@ +# Defines some shared config for tests + +from datetime import datetime, date, timezone +from pathlib import Path + +from typing import Any, NamedTuple +import my.time.tz.via_location as LTZ +from more_itertools import one + + +class SharedConfig(NamedTuple): + google: Any + location: Any + time: Any + + +def _prepare_google_config(tmp_path: Path): + from .common import testdata + try: + track = one(testdata().rglob('italy-slovenia-2017-07-29.json')) + except ValueError: + raise RuntimeError('testdata not found, setup git submodules?') + + + # todo ugh. unnecessary zipping, but at the moment takeout provider doesn't support plain dirs + import zipfile + with zipfile.ZipFile(tmp_path / 'takeout.zip', 'w') as zf: + zf.writestr('Takeout/Location History/Location History.json', track.read_bytes()) + + class google_config: + takeout_path = tmp_path + return google_config + + +# pass tmp_path from pytest to this helper function +# see tests/tz.py as an example +def temp_config(temp_path: Path) -> Any: + from .common import reset_modules + reset_modules() + + LTZ.config.fast = True + + class location: + home_accuracy = 30_000 + home = ( + # supports ISO strings + ('2005-12-04' , (42.697842, 23.325973)), # Bulgaria, Sofia + # supports date/datetime objects + (date(year=1980, month=2, day=15) , (40.7128 , -74.0060 )), # NY + # check tz handling.. + (datetime.fromtimestamp(1600000000, tz=timezone.utc), (55.7558 , 37.6173 )), # Moscow, Russia + ) + # note: order doesn't matter, will be sorted in the data provider + class via_ip: + accuracy = 15_000 + class gpslogger: + pass + + class time: + class tz: + class via_location: + pass # just rely on the defaults... + + + return SharedConfig(google=_prepare_google_config(temp_path), location=location, time=time) diff --git a/tests/tz.py b/tests/tz.py index 0ea2b40..8f80800 100644 --- a/tests/tz.py +++ b/tests/tz.py @@ -1,4 +1,5 @@ -from datetime import datetime, timedelta, date, timezone +import sys +from datetime import datetime, timedelta from pathlib import Path import pytest # type: ignore @@ -46,8 +47,15 @@ def test_tz() -> None: tz = LTZ._get_tz(D('20201001 14:15:16')) assert tz is not None - tz = LTZ._get_tz(datetime.min) - assert tz is not None + on_windows = sys.platform == 'win32' + if not on_windows: + tz = LTZ._get_tz(datetime.min) + assert tz is not None + else: + # seems this fails because windows doesnt support same date ranges + # https://stackoverflow.com/a/41400321/ + with pytest.raises(OSError): + LTZ._get_tz(datetime.min) def test_policies() -> None: @@ -73,36 +81,15 @@ def D(dstr: str) -> datetime: return datetime.strptime(dstr, '%Y%m%d %H:%M:%S') -# TODO copy pasted from location.py, need to extract some common provider + @pytest.fixture(autouse=True) def prepare(tmp_path: Path): - from .common import reset_modules - reset_modules() - - LTZ.config.fast = True - - from .location import _prepare_google_config - google = _prepare_google_config(tmp_path) - - class location: - home = ( - # supports ISO strings - ('2005-12-04' , (42.697842, 23.325973)), # Bulgaria, Sofia - # supports date/datetime objects - (date(year=1980, month=2, day=15) , (40.7128 , -74.0060 )), # NY - # check tz handling.. - (datetime.fromtimestamp(1600000000, tz=timezone.utc), (55.7558 , 37.6173 )), # Moscow, Russia - ) - # note: order doesn't matter, will be sorted in the data provider - - class time: - class tz: - class via_location: - pass # just rely on the defaults... + from .shared_config import temp_config + conf = temp_config(tmp_path) import my.core.cfg as C with C.tmp_config() as config: - config.google = google - config.time = time - config.location = location + config.google = conf.google + config.time = conf.time + config.location = conf.location yield diff --git a/tox.ini b/tox.ini index 6e7ca23..efe6069 100644 --- a/tox.ini +++ b/tox.ini @@ -47,7 +47,12 @@ commands = hpi module install my.location.google pip install ijson # optional dependency + # tz/location hpi module install my.time.tz.via_location + hpi module install my.ip.all + hpi module install my.location.gpslogger + hpi module install my.location.fallback.via_ip + hpi module install my.google.takeout.parser hpi module install my.calendar.holidays @@ -125,8 +130,7 @@ commands = my.rescuetime \ my.runnerup \ my.stackexchange.stexport \ - my.smscalls \ - my.tinder.android + my.smscalls {envpython} -m mypy --install-types --non-interactive \