212 lines
5.9 KiB
Python
212 lines
5.9 KiB
Python
from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence, Deque, Any, Optional
|
|
from collections import deque
|
|
from itertools import islice
|
|
from datetime import datetime
|
|
from zipfile import ZipFile
|
|
import logging
|
|
import csv
|
|
import re
|
|
import json
|
|
from pathlib import Path
|
|
import pytz
|
|
|
|
|
|
from kython import kompress
|
|
|
|
|
|
# pipe install geopy
|
|
import geopy # type: ignore
|
|
import geopy.distance # type: ignore
|
|
# pip3 install ijson
|
|
import ijson # type: ignore
|
|
|
|
def get_logger():
|
|
return logging.getLogger("location")
|
|
|
|
|
|
TAKEOUTS_PATH = Path("/path/to/takeout")
|
|
CACHE_PATH = Path("/L/data/.cache/location.picklel")
|
|
|
|
|
|
Tag = str
|
|
|
|
class Location(NamedTuple):
|
|
dt: datetime
|
|
lat: float
|
|
lon: float
|
|
alt: Optional[float]
|
|
tag: Tag
|
|
|
|
|
|
def tagger(dt: datetime, point: geopy.Point) -> Tag:
|
|
TAGS = [
|
|
# removed
|
|
]
|
|
for coord, dist, tag in TAGS:
|
|
if geopy.distance.distance(coord, point).m < dist:
|
|
return tag
|
|
else:
|
|
return "other"
|
|
|
|
# TODO hope they are sorted...
|
|
# TODO that could also serve as basis for tz provider
|
|
def load_locations() -> Iterator[Location]:
|
|
logger = get_logger() # TODO count errors?
|
|
|
|
last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip'))
|
|
|
|
# TODO wonder if old takeouts could contribute as well??
|
|
total = 0
|
|
errors = 0
|
|
with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo:
|
|
for j in ijson.items(fo, 'locations.item'):
|
|
dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000)
|
|
if total % 10000 == 0:
|
|
logger.info('processing item %d %s', total, dt)
|
|
total += 1
|
|
|
|
dt = pytz.utc.localize(dt)
|
|
try:
|
|
lat = float(j["latitudeE7"] / 10000000)
|
|
lon = float(j["longitudeE7"] / 10000000)
|
|
point = geopy.Point(lat, lon) # kinda sanity check that coordinates are ok
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
errors += 1
|
|
if float(errors) / total > 0.01:
|
|
raise RuntimeError('too many errors! aborting')
|
|
else:
|
|
continue
|
|
|
|
alt = j.get("altitude", None)
|
|
tag = tagger(dt, point) # TODO take accuracy into account??
|
|
yield Location(
|
|
dt=dt,
|
|
lat=lat,
|
|
lon=lon,
|
|
alt=alt,
|
|
tag=tag
|
|
)
|
|
|
|
def iter_locations(cached: bool=False) -> Iterator[Location]:
|
|
import sys
|
|
sys.path.append('/L/Dropbox/data/location_provider') # jeez.. otherwise it refuses to unpickle :(
|
|
|
|
import pickle as dill # type: ignore
|
|
if cached:
|
|
with CACHE_PATH.open('rb') as fo:
|
|
while True:
|
|
try:
|
|
# TODO shit really?? it can't load now, do I need to adjust pythonpath or something?...
|
|
pre = dill.load(fo)
|
|
yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods...
|
|
except EOFError:
|
|
break
|
|
else:
|
|
yield from load_locations()
|
|
|
|
|
|
def get_locations(cached: bool=False) -> Sequence[Location]:
|
|
return list(iter_locations(cached=cached))
|
|
|
|
class LocInterval(NamedTuple):
|
|
from_: Location
|
|
to: Location
|
|
|
|
|
|
# TODO kython? nicer interface?
|
|
class Window:
|
|
def __init__(self, it):
|
|
self.it = it
|
|
self.storage: Deque[Any] = deque()
|
|
self.start = 0
|
|
self.end = 0
|
|
|
|
# TODO need check for existence?
|
|
def load_to(self, to):
|
|
while to >= self.end:
|
|
try:
|
|
ii = next(self.it)
|
|
self.storage.append(ii)
|
|
self.end += 1
|
|
except StopIteration:
|
|
break
|
|
def exists(self, i):
|
|
self.load_to(i)
|
|
return i < self.end
|
|
|
|
def consume_to(self, i):
|
|
self.load_to(i)
|
|
consumed = i - self.start
|
|
self.start = i
|
|
for _ in range(consumed):
|
|
self.storage.popleft()
|
|
|
|
def __getitem__(self, i):
|
|
self.load_to(i)
|
|
ii = i - self.start
|
|
assert ii >= 0
|
|
return self.storage[ii]
|
|
|
|
|
|
|
|
# TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here
|
|
# TODO maybe if tag is none, we just don't care?
|
|
def get_groups(cached: bool=False) -> List[LocInterval]:
|
|
print("cached", cached)
|
|
all_locations = iter_locations(cached=cached)
|
|
locsi = Window(all_locations)
|
|
i = 0
|
|
groups: List[LocInterval] = []
|
|
curg: List[Location] = []
|
|
|
|
def add_to_group(x):
|
|
nonlocal curg
|
|
if len(curg) < 2:
|
|
curg.append(x)
|
|
else:
|
|
curg[-1] = x
|
|
|
|
def dump_group():
|
|
nonlocal curg
|
|
if len(curg) > 0:
|
|
# print("new group")
|
|
groups.append(LocInterval(from_=curg[0], to=curg[-1]))
|
|
curg = []
|
|
|
|
while locsi.exists(i):
|
|
# if i % 1000 == 0:
|
|
# print("processing " + str(i))
|
|
locsi.consume_to(i)
|
|
|
|
last = None if len(curg) == 0 else curg[-1]
|
|
cur = locsi[i]
|
|
j = i
|
|
match = False
|
|
while not match and locsi.exists(j) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour?
|
|
cur = locsi[j]
|
|
if last is None or cur.tag == last.tag:
|
|
# ok
|
|
add_to_group(cur)
|
|
i = j + 1
|
|
match = True
|
|
else:
|
|
j += 1
|
|
# if we made here without advancing
|
|
if not match:
|
|
dump_group()
|
|
i += 1
|
|
else:
|
|
pass
|
|
dump_group()
|
|
return groups
|
|
|
|
def update_cache():
|
|
import pickle as dill # type: ignore
|
|
CACHE_PATH_TMP = CACHE_PATH.with_suffix('.tmp')
|
|
# TODO maybe, also keep on /tmp first?
|
|
|
|
with CACHE_PATH_TMP.open('wb', 2 ** 20) as fo:
|
|
for loc in iter_locations(cached=False):
|
|
dill.dump(loc, fo)
|
|
CACHE_PATH_TMP.rename(CACHE_PATH)
|