diff --git a/my/location/takeout.py b/my/location/takeout.py index 6dbce72..28be64f 100644 --- a/my/location/takeout.py +++ b/my/location/takeout.py @@ -7,22 +7,17 @@ from collections import deque from datetime import datetime from itertools import islice from pathlib import Path -from typing import Any, Collection, Deque, Iterable, Iterator, List, NamedTuple, Optional, Sequence, IO +from typing import Any, Collection, Deque, Iterable, Iterator, List, NamedTuple, Optional, Sequence, IO, Tuple +import re + import pytz # pip3 install geopy import geopy # type: ignore import geopy.distance # type: ignore -try: - # pip3 install ijson cffi - # cffi backend is almost 2x faster than default - import ijson.backends.yajl2_cffi as ijson # type: ignore -except: - # fallback to default backend. warning? - import ijson # type: ignore - -from ..common import get_files, LazyLogger, mcachew +from ..core.common import get_files, LazyLogger, mcachew +from ..core.cachew import cache_dir from ..google.takeout.paths import get_last_takeout from ..kython import kompress @@ -30,14 +25,9 @@ from ..kython import kompress logger = LazyLogger(__name__) -# todo switch to use cachew.cache_dir -def cache_path(*args, **kwargs): - from my.config import location as config - return config.cache_path - - Tag = Optional[str] +# todo maybe don't tag by default? class Location(NamedTuple): dt: datetime lat: float @@ -46,8 +36,47 @@ class Location(NamedTuple): tag: Tag -# TODO use pool? not sure if that would really be faster... -def _iter_locations_fo(fo, start, stop) -> Iterator[Location]: +TsLatLon = Tuple[int, int, int] + + +def _iter_via_ijson(fo) -> Iterator[TsLatLon]: + # ijson version takes 25 seconds for 1M items (without processing) + try: + # pip3 install ijson cffi + import ijson.backends.yajl2_cffi as ijson # type: ignore + except: + import warnings + warnings.warn("Falling back to default ijson because 'cffi' backend isn't found. It's up to 2x faster, you might want to check it out") + import ijson # type: ignore + + for d in ijson.items(fo, 'locations.item'): + yield ( + int(d['timestampMs']), + d['latitudeE7' ], + d['longitudeE7'], + ) + + +def _iter_via_grep(fo) -> Iterator[TsLatLon]: + # grep version takes 5 seconds for 1M items (without processing) + x = [None, None, None] + for i, line in enumerate(fo): + if i > 0 and i % 3 == 0: + yield tuple(x) + n = re.search(b': "?(-?\\d+)"?,?$', line) # meh. somewhat fragile... + j = i % 3 + x[j] = int(n.group(1).decode('ascii')) + # make sure it's read what we expected + assert (i + 1) % 3 == 0 + yield tuple(x) + + +# todo could also use pool? not sure if that would really be faster... +# earch thread could process 100K at once? +# would need to find out a way to know when to stop? process in some sort of sqrt progression?? + + +def _iter_locations_fo(fit) -> Iterator[Location]: total = 0 errors = 0 @@ -72,17 +101,16 @@ def _iter_locations_fo(fo, start, stop) -> Iterator[Location]: else: return None - - for j in islice(ijson.items(fo, 'locations.item'), start, stop): - dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000) + for tsMs, latE7, lonE7 in fit: + dt = datetime.fromtimestamp(tsMs / 1000, tz=pytz.utc) + total += 1 if total % 10000 == 0: logger.info('processing item %d %s', total, dt) - total += 1 - dt = pytz.utc.localize(dt) try: - lat = float(j["latitudeE7"] / 10000000) - lon = float(j["longitudeE7"] / 10000000) + lat = float(latE7 / 1e7) + lon = float(lonE7 / 1e7) + # note: geopy is quite slow.. point = geopy.Point(lat, lon) # kinda sanity check that coordinates are ok except Exception as e: logger.exception(e) @@ -92,8 +120,12 @@ def _iter_locations_fo(fo, start, stop) -> Iterator[Location]: else: continue - alt = j.get("altitude", None) - tag = tagger(dt, point) # TODO take accuracy into account?? + # todo support later + # alt = j.get("altitude", None) + alt = None + # todo enable tags later + # tag = tagger(dt, point) # TODO take accuracy into account?? + tag = None yield Location( dt=dt, lat=lat, @@ -105,18 +137,33 @@ def _iter_locations_fo(fo, start, stop) -> Iterator[Location]: _LOCATION_JSON = 'Takeout/Location History/Location History.json' +# todo if start != 0, disable cache? again this is where nicer caching would come handy # TODO hope they are sorted... (could assert for it) -@mcachew(cache_path, chunk_by=10000, logger=logger) +@mcachew(cache_dir() / 'google_location.cache', logger=logger) def _iter_locations(path: Path, start=0, stop=None) -> Iterator[Location]: ctx: IO[str] if path.suffix == '.json': + # todo: to support, should perhaps provide it as input= to Popen + raise RuntimeError("Temporary not supported") ctx = path.open('r') else: # must be a takeout archive + # todo CPath? although not sure if it can be iterative? ctx = kompress.open(path, _LOCATION_JSON) - with ctx as fo: - yield from _iter_locations_fo(fo, start=start, stop=stop) - # TODO wonder if old takeouts could contribute as well?? + # with ctx as fo: + # fit = _iter_via_ijson(fo) + # fit = islice(fit, start, stop) + # yield from _iter_locations_fo(fit) + + unzip = f'unzip -p "{path}" "{_LOCATION_JSON}"' + extract = "grep -E '^ .(timestampMs|latitudeE7|longitudeE7)'" + from subprocess import Popen, PIPE + with Popen(f'{unzip} | {extract}', shell=True, stdout=PIPE) as p: + out = p.stdout; assert out is not None + fit = _iter_via_grep(out) + fit = islice(fit, start, stop) + yield from _iter_locations_fo(fit) + # todo wonder if old takeouts could contribute as well?? def iter_locations(**kwargs) -> Iterator[Location]: @@ -135,7 +182,7 @@ class LocInterval(NamedTuple): to: Location -# TODO use this advanced iterators library? +# TODO use more_itertools # TODO kython? nicer interface? class Window: def __init__(self, it):