HPI/my/location/takeout.py
2020-01-08 21:08:08 +00:00

236 lines
6.4 KiB
Python

import json
import logging
import re
from collections import deque
from datetime import datetime
from itertools import islice
from pathlib import Path
from typing import Any, Collection, Deque, Iterable, Iterator, List, NamedTuple, Optional, Sequence
from zipfile import ZipFile
import pytz
# pip3 install geopy
import geopy # type: ignore
import geopy.distance # type: ignore
try:
# pip3 install ijson cffi
# cffi backend is almost 2x faster than default
import ijson.backends.yajl2_cffi as ijson # type: ignore
except:
# fallback to default backend. warning?
import ijson # type: ignore
from kython import kompress # TODO
from ..common import get_files
def get_logger():
return logging.getLogger("location")
def cache_path(*args, **kwargs):
from mycfg import paths
return paths.location.cache_path
Tag = Optional[str]
class Location(NamedTuple):
dt: datetime
lat: float
lon: float
alt: Optional[float]
tag: Tag
# TODO use pool? not sure if that would really be faster...
def _iter_locations_fo(fo, start, stop) -> Iterator[Location]:
logger = get_logger()
total = 0
errors = 0
try:
from mycfg.locations import LOCATIONS as known_locations
except ModuleNotFoundError as e:
name = 'mycfg.locations'
if e.name != name:
raise e
logger.warning("'%s' isn't found. setting known_locations to empty list", name)
known_locations = []
# TODO tagging should be takeout-agnostic
def tagger(dt: datetime, point: geopy.Point) -> Tag:
'''
Tag points with known locations (e.g. work/home/etc)
'''
for lat, lon, dist, tag in known_locations:
# TODO use something more efficient?
if geopy.distance.distance((lat, lon), point).m < dist:
return tag
else:
return None
for j in islice(ijson.items(fo, 'locations.item'), start, stop):
dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000)
if total % 10000 == 0:
logger.info('processing item %d %s', total, dt)
total += 1
dt = pytz.utc.localize(dt)
try:
lat = float(j["latitudeE7"] / 10000000)
lon = float(j["longitudeE7"] / 10000000)
point = geopy.Point(lat, lon) # kinda sanity check that coordinates are ok
except Exception as e:
logger.exception(e)
errors += 1
if float(errors) / total > 0.01:
raise RuntimeError('too many errors! aborting')
else:
continue
alt = j.get("altitude", None)
tag = tagger(dt, point) # TODO take accuracy into account??
yield Location(
dt=dt,
lat=lat,
lon=lon,
alt=alt,
tag=tag
)
# TODO hope they are sorted... (could assert for it)
# TODO actually, path includes timestamp already... so mtime_hash isn't _really_ necessary
# TODO CACHEW_OFF env variable?
# TODO use mcachew
from cachew import cachew, mtime_hash
@cachew(cache_path, hashf=mtime_hash, cls=Location, chunk_by=10000, logger=get_logger())
def _iter_locations(path: Path, start=0, stop=None) -> Iterator[Location]:
if path.suffix == '.json':
ctx = path.open('r')
else: # must be a takeout archive
ctx = kompress.open(path, 'Takeout/Location History/Location History.json')
with ctx as fo:
yield from _iter_locations_fo(fo, start=start, stop=stop)
# TODO wonder if old takeouts could contribute as well??
def iter_locations(**kwargs) -> Iterator[Location]:
from mycfg import paths
# TODO need to include older data
last_takeout = max(get_files(paths.google.takeout_path, glob='takeout*.zip'))
return _iter_locations(path=last_takeout, **kwargs)
def get_locations(*args, **kwargs) -> Sequence[Location]:
return list(iter_locations(*args, **kwargs))
class LocInterval(NamedTuple):
from_: Location
to: Location
# TODO use this advanced iterators library?
# TODO kython? nicer interface?
class Window:
def __init__(self, it):
self.it = it
self.storage: Deque[Any] = deque()
self.start = 0
self.end = 0
# TODO need check for existence?
def load_to(self, to):
while to >= self.end:
try:
ii = next(self.it)
self.storage.append(ii)
self.end += 1
except StopIteration:
break
def exists(self, i):
self.load_to(i)
return i < self.end
def consume_to(self, i):
self.load_to(i)
consumed = i - self.start
self.start = i
for _ in range(consumed):
self.storage.popleft()
def __getitem__(self, i):
self.load_to(i)
ii = i - self.start
assert ii >= 0
return self.storage[ii]
# TODO cachew as well?
# TODO maybe if tag is none, we just don't care?
def get_groups(*args, **kwargs) -> List[LocInterval]:
logger = get_logger()
all_locations = iter(iter_locations(*args, **kwargs))
locsi = Window(all_locations)
i = 0
groups: List[LocInterval] = []
curg: List[Location] = []
def add_to_group(x):
nonlocal curg
if len(curg) < 2:
curg.append(x)
else:
curg[-1] = x
def dump_group():
nonlocal curg
if len(curg) > 0:
# print("new group")
groups.append(LocInterval(from_=curg[0], to=curg[-1]))
curg = []
while locsi.exists(i):
if i % 10000 == 0:
logger.debug('grouping item %d', i)
locsi.consume_to(i)
last = None if len(curg) == 0 else curg[-1]
cur = locsi[i]
j = i
match = False
while not match and locsi.exists(j) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour?
cur = locsi[j]
if last is None or cur.tag == last.tag:
# ok
add_to_group(cur)
i = j + 1
match = True
else:
j += 1
# if we made here without advancing
if not match:
dump_group()
i += 1
else:
pass
dump_group()
return groups
def update_cache():
# TODO perhaps set hash to null instead, that's a bit less intrusive
cp = cache_path()
if cp.exists():
cp.unlink()
for _ in iter_locations():
pass