From f0b748804e802e9ee54683daea6a58bc7d2b1a8b Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Thu, 30 Aug 2018 12:03:54 +0100 Subject: [PATCH 01/26] Initial --- .gitignore | 172 +++++++++++++++++++++++++++++++++++++++++++ ci.sh | 10 +++ location/__init__.py | 7 ++ location/__main__.py | 10 +++ run | 6 ++ update_cache | 7 ++ 6 files changed, 212 insertions(+) create mode 100644 .gitignore create mode 100755 ci.sh create mode 100644 location/__init__.py create mode 100644 location/__main__.py create mode 100755 run create mode 100755 update_cache diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b539013 --- /dev/null +++ b/.gitignore @@ -0,0 +1,172 @@ + +# Created by https://www.gitignore.io/api/python,emacs + +### Emacs ### +# -*- mode: gitignore; -*- +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* + +# Org-mode +.org-id-locations +*_archive + +# flymake-mode +*_flymake.* + +# eshell files +/eshell/history +/eshell/lastdir + +# elpa packages +/elpa/ + +# reftex files +*.rel + +# AUCTeX auto folder +/auto/ + +# cask packages +.cask/ +dist/ + +# Flycheck +flycheck_*.el + +# server auth directory +/server/ + +# projectiles files +.projectile + +# directory configuration +.dir-locals.el + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +### Python Patch ### +.venv/ + +### Python.VirtualEnv Stack ### +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +pip-selfcheck.json + + +# End of https://www.gitignore.io/api/python,emacs diff --git a/ci.sh b/ci.sh new file mode 100755 index 0000000..7a3b8ca --- /dev/null +++ b/ci.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +cd "$(this_dir)" || exit + +. ~/bash_ci + +ci_run mypy location +ci_run pylint -E location + +ci_report_errors diff --git a/location/__init__.py b/location/__init__.py new file mode 100644 index 0000000..d28ada2 --- /dev/null +++ b/location/__init__.py @@ -0,0 +1,7 @@ +import logging + +def get_logger(): + return logging.getLogger("location") + + +# TODO need to cache? diff --git a/location/__main__.py b/location/__main__.py new file mode 100644 index 0000000..e4da5b9 --- /dev/null +++ b/location/__main__.py @@ -0,0 +1,10 @@ +from location import get_logger + +logger = get_logger() + +from kython.logging import setup_logzero + +setup_logzero(logger) + + + diff --git a/run b/run new file mode 100755 index 0000000..a9f51cc --- /dev/null +++ b/run @@ -0,0 +1,6 @@ +#!/bin/bash +set -eu + +cd "$(dirname "$0")" + +python3 -m location diff --git a/update_cache b/update_cache new file mode 100755 index 0000000..6daf62c --- /dev/null +++ b/update_cache @@ -0,0 +1,7 @@ +#!/bin/bash +set -eu + +cd "$(dirname "$0")" + +python3 -m photos update_cache + From 2e0e8ef82a9641e9032f645cb887b451c20de015 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Thu, 30 Aug 2018 13:37:52 +0100 Subject: [PATCH 02/26] initial --- location/__init__.py | 59 ++++++++++++++++++++++++++++++++++++++++++++ location/__main__.py | 17 +++++++++++-- update_cache | 2 +- 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index d28ada2..f521011 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,7 +1,66 @@ +from typing import NamedTuple, Iterator, List, Iterable +from datetime import datetime import logging +import csv +import geopy.distance # type: ignore def get_logger(): return logging.getLogger("location") +PATH = "/L/data/location/location.csv" +CACHE_PATH = "/L/.cache/location.cache" # TODO need to cache? +# TODO tag?? + +Tag = str + +class Location(NamedTuple): + dt: datetime + lat: float + lon: float + tag: Tag + + +def tagger(dt: datetime, lat: float, lon: float) -> Tag: + TAGS = [ + # removed + ] + for coord, dist, tag in TAGS: + if geopy.distance.distance(coord, (lat, lon)).m < dist: + return tag + else: + return "other" + +def iter_locations() -> Iterator[Location]: + with open(PATH) as fo: + reader = csv.reader(fo) + next(reader) # skip header + for ll in reader: + [ts, lats, lons] = ll + # TODO hmm, is it local?? + dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") + lat = float(lats) + lon = float(lons) + tag = tagger(dt, lat, lon) + yield Location( + dt=dt, + lat=lat, + lon=lon, + tag=tag + ) + +def get_locations(cached: bool=False) -> Iterable[Location]: + import dill # type: ignore + if cached: + with open(CACHE_PATH, 'rb') as fo: + preph = dill.load(fo) + return [Location(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods... + else: + return list(iter_locations()) + +def update_cache(): + import dill # type: ignore + datas = get_locations(cached=False) + with open(CACHE_PATH, 'wb') as fo: + dill.dump(datas, fo) diff --git a/location/__main__.py b/location/__main__.py index e4da5b9..10a27dc 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -1,4 +1,4 @@ -from location import get_logger +from location import get_logger, get_locations, iter_locations logger = get_logger() @@ -6,5 +6,18 @@ from kython.logging import setup_logzero setup_logzero(logger) +import sys - +if len(sys.argv) > 1: + cmd = sys.argv[1] + if cmd == "update_cache": + from location import update_cache, get_locations + update_cache() + get_locations(cached=True) + else: + raise RuntimeError(f"Unknown command {cmd}") +else: + for p in iter_locations(): + pass + # TODO need datetime! + # print(p) diff --git a/update_cache b/update_cache index 6daf62c..3938289 100755 --- a/update_cache +++ b/update_cache @@ -3,5 +3,5 @@ set -eu cd "$(dirname "$0")" -python3 -m photos update_cache +python3 -m location update_cache From 64be0e9706af84a3d3fa2297c7396439a525c8e4 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Thu, 30 Aug 2018 17:31:11 +0300 Subject: [PATCH 03/26] grouping --- location/__init__.py | 48 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/location/__init__.py b/location/__init__.py index f521011..11bc401 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -32,6 +32,8 @@ def tagger(dt: datetime, lat: float, lon: float) -> Tag: else: return "other" +# TODO hope they are sorted... +# TODO that could also serve as basis for timezone provider. def iter_locations() -> Iterator[Location]: with open(PATH) as fo: reader = csv.reader(fo) @@ -59,6 +61,52 @@ def get_locations(cached: bool=False) -> Iterable[Location]: else: return list(iter_locations()) +class LocInterval(NamedTuple): + from_: Location + to: Location + +def get_groups(cached: bool=False) -> List[LocInterval]: + locs = get_locations(cached=cached) + i = 0 + groups: List[LocInterval] = [] + curg: List[Location] = [] + + def add_to_group(x): + nonlocal curg + if len(curg) < 2: + curg.append(x) + else: + curg[-1] = x + + def dump_group(): + nonlocal curg + if len(curg) > 0: + groups.append(LocInterval(from_=curg[0], to=curg[-1])) + curg = [] + + while i < len(locs): + last = None if len(curg) == 0 else curg[-1] + cur = locs[i] + j = i + match = False + while not match and j < len(locs) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour? + cur = locs[j] + if last is None or cur.tag == last.tag: + # ok + add_to_group(cur) + i = j + 1 + match = True + else: + j += 1 + # if we made here without advancing + if not match: + dump_group() + i += 1 + else: + pass + dump_group() + return groups + def update_cache(): import dill # type: ignore datas = get_locations(cached=False) From 70a09a80ba1dd948b81e28f88848baca14b12036 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Wed, 10 Oct 2018 19:35:55 +0100 Subject: [PATCH 04/26] Fix mypy --- location/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index 11bc401..4ba6cf8 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,4 +1,4 @@ -from typing import NamedTuple, Iterator, List, Iterable +from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence from datetime import datetime import logging import csv @@ -52,7 +52,7 @@ def iter_locations() -> Iterator[Location]: tag=tag ) -def get_locations(cached: bool=False) -> Iterable[Location]: +def get_locations(cached: bool=False) -> Sequence[Location]: import dill # type: ignore if cached: with open(CACHE_PATH, 'rb') as fo: From 96c8c324f38cd544964de91a28e683dc72a01e56 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sat, 20 Oct 2018 09:40:43 +0100 Subject: [PATCH 05/26] Collect from zip; iteratively --- location/__init__.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index 4ba6cf8..af18223 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,13 +1,21 @@ from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence from datetime import datetime +from os import listdir +from os.path import join +from zipfile import ZipFile import logging import csv +import re +import json + import geopy.distance # type: ignore +# pip3 install ijson +import ijson # type: ignore def get_logger(): return logging.getLogger("location") -PATH = "/L/data/location/location.csv" +TAKEOUTS_PATH = "/path/to/takeout" CACHE_PATH = "/L/.cache/location.cache" # TODO need to cache? @@ -35,15 +43,14 @@ def tagger(dt: datetime, lat: float, lon: float) -> Tag: # TODO hope they are sorted... # TODO that could also serve as basis for timezone provider. def iter_locations() -> Iterator[Location]: - with open(PATH) as fo: - reader = csv.reader(fo) - next(reader) # skip header - for ll in reader: - [ts, lats, lons] = ll - # TODO hmm, is it local?? - dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") - lat = float(lats) - lon = float(lons) + last_takeout = max([f for f in listdir(TAKEOUTS_PATH) if re.match('takeout.*.zip', f)]) + jdata = None + with ZipFile(join(TAKEOUTS_PATH, last_takeout)).open('Takeout/Location History/Location History.json') as fo: + for j in ijson.items(fo, 'locations.item'): + # TODO eh, not very streaming?.. + dt = datetime.fromtimestamp(int(j["timestampMs"]) / 1000) # TODO utc?? + lat = float(j["latitudeE7"] / 10000000) + lon = float(j["longitudeE7"] / 10000000) tag = tagger(dt, lat, lon) yield Location( dt=dt, @@ -65,6 +72,7 @@ class LocInterval(NamedTuple): from_: Location to: Location +# TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here def get_groups(cached: bool=False) -> List[LocInterval]: locs = get_locations(cached=cached) i = 0 From 1ca1bcd309fb7bbf383f071bf842450e87162a45 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sat, 10 Nov 2018 16:34:11 +0000 Subject: [PATCH 06/26] fix cache path and requirements --- location/__init__.py | 2 +- location/__main__.py | 2 +- requirements.txt | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 requirements.txt diff --git a/location/__init__.py b/location/__init__.py index af18223..e8c7b6c 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -16,7 +16,7 @@ def get_logger(): return logging.getLogger("location") TAKEOUTS_PATH = "/path/to/takeout" -CACHE_PATH = "/L/.cache/location.cache" +CACHE_PATH = "/L/data/.cache/location.pickle" # TODO need to cache? # TODO tag?? diff --git a/location/__main__.py b/location/__main__.py index 10a27dc..0106a89 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -20,4 +20,4 @@ else: for p in iter_locations(): pass # TODO need datetime! - # print(p) + print(p) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eef293b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +geopy +ijson From cb39e5a00e4c9f616ad22f801aeb00f814c6797f Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Mon, 12 Nov 2018 21:29:47 +0000 Subject: [PATCH 07/26] iterator for processing window; make location provider consume less memory --- location/__init__.py | 93 +++++++++++++++++++++++++++++++++++--------- location/__main__.py | 6 +-- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index e8c7b6c..7d3eac8 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,5 +1,8 @@ from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence +from collections import deque +from itertools import islice from datetime import datetime +import os from os import listdir from os.path import join from zipfile import ZipFile @@ -16,10 +19,7 @@ def get_logger(): return logging.getLogger("location") TAKEOUTS_PATH = "/path/to/takeout" -CACHE_PATH = "/L/data/.cache/location.pickle" - -# TODO need to cache? -# TODO tag?? +CACHE_PATH = "/L/data/.cache/location.picklel" Tag = str @@ -42,13 +42,16 @@ def tagger(dt: datetime, lat: float, lon: float) -> Tag: # TODO hope they are sorted... # TODO that could also serve as basis for timezone provider. -def iter_locations() -> Iterator[Location]: +def load_locations() -> Iterator[Location]: last_takeout = max([f for f in listdir(TAKEOUTS_PATH) if re.match('takeout.*.zip', f)]) jdata = None with ZipFile(join(TAKEOUTS_PATH, last_takeout)).open('Takeout/Location History/Location History.json') as fo: + cc = 0 for j in ijson.items(fo, 'locations.item'): - # TODO eh, not very streaming?.. dt = datetime.fromtimestamp(int(j["timestampMs"]) / 1000) # TODO utc?? + if cc % 10000 == 0: + print(f'processing {dt}') + cc += 1 lat = float(j["latitudeE7"] / 10000000) lon = float(j["longitudeE7"] / 10000000) tag = tagger(dt, lat, lon) @@ -59,22 +62,67 @@ def iter_locations() -> Iterator[Location]: tag=tag ) -def get_locations(cached: bool=False) -> Sequence[Location]: +def iter_locations(cached: bool=False) -> Iterator[Location]: import dill # type: ignore if cached: with open(CACHE_PATH, 'rb') as fo: - preph = dill.load(fo) - return [Location(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods... + # TODO while fo has more data? + while True: + try: + pre = dill.load(fo) + yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods... + except EOFError: + break else: - return list(iter_locations()) + yield from load_locations() + + +def get_locations(cached: bool=False) -> Sequence[Location]: + return list(iter_locations(cached=cached)) class LocInterval(NamedTuple): from_: Location to: Location + +# TODO kython? nicer interface? +class Window: + def __init__(self, it): + self.it = it + self.storage = deque() + self.start = 0 + self.end = 0 + + # TODO need check for existence? + def load_to(self, to): + while to >= self.end: + try: + ii = next(self.it) + self.storage.append(ii) + self.end += 1 + except StopIteration: + break + def exists(self, i): + self.load_to(i) + return i < self.end + + def consume_to(self, i): + self.load_to(i) + consumed = i - self.start + self.start = i + for _ in range(consumed): + self.storage.popleft() + + def __getitem__(self, i): + self.load_to(i) + ii = i - self.start + assert ii >= 0 + return self.storage[ii] + # TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here +# TODO maybe if tag is none, we just don't care? def get_groups(cached: bool=False) -> List[LocInterval]: - locs = get_locations(cached=cached) + locsi = Window(iter_locations(cached=cached)) i = 0 groups: List[LocInterval] = [] curg: List[Location] = [] @@ -89,16 +137,21 @@ def get_groups(cached: bool=False) -> List[LocInterval]: def dump_group(): nonlocal curg if len(curg) > 0: + # print("new group") groups.append(LocInterval(from_=curg[0], to=curg[-1])) curg = [] - while i < len(locs): + while locsi.exists(i): + # if i % 100 == 0: + # print("processing " + str(i)) + locsi.consume_to(i) + last = None if len(curg) == 0 else curg[-1] - cur = locs[i] + cur = locsi[i] j = i match = False - while not match and j < len(locs) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour? - cur = locs[j] + while not match and locsi.exists(j) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour? + cur = locsi[j] if last is None or cur.tag == last.tag: # ok add_to_group(cur) @@ -115,8 +168,12 @@ def get_groups(cached: bool=False) -> List[LocInterval]: dump_group() return groups +# TODO ok, def cache groups. def update_cache(): import dill # type: ignore - datas = get_locations(cached=False) - with open(CACHE_PATH, 'wb') as fo: - dill.dump(datas, fo) + CACHE_PATH_TMP = CACHE_PATH + '.tmp' + # TODO maybe, also keep on /tmp first? + with open(CACHE_PATH_TMP, 'wb', 2 ** 20) as fo: + for loc in iter_locations(cached=False): + dill.dump(loc, fo) + os.rename(CACHE_PATH_TMP, CACHE_PATH) diff --git a/location/__main__.py b/location/__main__.py index 0106a89..6eed9db 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -1,4 +1,4 @@ -from location import get_logger, get_locations, iter_locations +from location import get_logger, get_locations, iter_locations, get_groups logger = get_logger() @@ -17,7 +17,7 @@ if len(sys.argv) > 1: else: raise RuntimeError(f"Unknown command {cmd}") else: - for p in iter_locations(): + for p in get_groups(cached=True): pass # TODO need datetime! - print(p) + # print(p) From 930bf099a3f0cf9bcaf2d1838fda953c63e6d704 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Fri, 16 Nov 2018 22:33:51 +0000 Subject: [PATCH 08/26] Attempt to fix issue with slow unpickling.. --- location/__init__.py | 9 +++++---- location/__main__.py | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index 7d3eac8..02d0e20 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -63,10 +63,12 @@ def load_locations() -> Iterator[Location]: ) def iter_locations(cached: bool=False) -> Iterator[Location]: - import dill # type: ignore + import sys + sys.path.append('/L/Dropbox/data/location_provider') # jeez.. otherwise it refuses to unpickle :( + + import pickle as dill # type: ignore if cached: with open(CACHE_PATH, 'rb') as fo: - # TODO while fo has more data? while True: try: pre = dill.load(fo) @@ -168,9 +170,8 @@ def get_groups(cached: bool=False) -> List[LocInterval]: dump_group() return groups -# TODO ok, def cache groups. def update_cache(): - import dill # type: ignore + import pickle as dill # type: ignore CACHE_PATH_TMP = CACHE_PATH + '.tmp' # TODO maybe, also keep on /tmp first? with open(CACHE_PATH_TMP, 'wb', 2 ** 20) as fo: diff --git a/location/__main__.py b/location/__main__.py index 6eed9db..b085a61 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -18,6 +18,5 @@ if len(sys.argv) > 1: raise RuntimeError(f"Unknown command {cmd}") else: for p in get_groups(cached=True): - pass + print(p) # TODO need datetime! - # print(p) From b694667fb95d649c7588f46ce5621f927f194d8c Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 12 Mar 2019 17:59:23 +0000 Subject: [PATCH 09/26] add altitude, klogging, ruci --- location/__init__.py | 16 ++++++++++++---- location/__main__.py | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index 02d0e20..a7a6a5e 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,4 +1,4 @@ -from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence +from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence, Deque, Any from collections import deque from itertools import islice from datetime import datetime @@ -27,6 +27,7 @@ class Location(NamedTuple): dt: datetime lat: float lon: float + alt: float tag: Tag @@ -54,11 +55,13 @@ def load_locations() -> Iterator[Location]: cc += 1 lat = float(j["latitudeE7"] / 10000000) lon = float(j["longitudeE7"] / 10000000) + alt = float(j["altitude"]) tag = tagger(dt, lat, lon) yield Location( dt=dt, lat=lat, lon=lon, + alt=alt, tag=tag ) @@ -71,6 +74,7 @@ def iter_locations(cached: bool=False) -> Iterator[Location]: with open(CACHE_PATH, 'rb') as fo: while True: try: + # TODO shit really?? it can't load now, do I need to adjust pythonpath or something?... pre = dill.load(fo) yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods... except EOFError: @@ -91,7 +95,7 @@ class LocInterval(NamedTuple): class Window: def __init__(self, it): self.it = it - self.storage = deque() + self.storage: Deque[Any] = deque() self.start = 0 self.end = 0 @@ -121,10 +125,14 @@ class Window: assert ii >= 0 return self.storage[ii] + + # TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here # TODO maybe if tag is none, we just don't care? def get_groups(cached: bool=False) -> List[LocInterval]: - locsi = Window(iter_locations(cached=cached)) + print("cached", cached) + all_locations = iter_locations(cached=cached) + locsi = Window(all_locations) i = 0 groups: List[LocInterval] = [] curg: List[Location] = [] @@ -144,7 +152,7 @@ def get_groups(cached: bool=False) -> List[LocInterval]: curg = [] while locsi.exists(i): - # if i % 100 == 0: + # if i % 1000 == 0: # print("processing " + str(i)) locsi.consume_to(i) diff --git a/location/__main__.py b/location/__main__.py index b085a61..10e3ed3 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -2,7 +2,7 @@ from location import get_logger, get_locations, iter_locations, get_groups logger = get_logger() -from kython.logging import setup_logzero +from kython.klogging import setup_logzero setup_logzero(logger) From a0916ed6bd0478900ffc66b17ab0c9b5c7419ac3 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Fri, 12 Apr 2019 20:46:57 +0100 Subject: [PATCH 10/26] handle errors defensively --- ci.sh | 10 ------ location/__init__.py | 76 +++++++++++++++++++++++++++++--------------- location/__main__.py | 10 +++--- 3 files changed, 56 insertions(+), 40 deletions(-) delete mode 100755 ci.sh diff --git a/ci.sh b/ci.sh deleted file mode 100755 index 7a3b8ca..0000000 --- a/ci.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -cd "$(this_dir)" || exit - -. ~/bash_ci - -ci_run mypy location -ci_run pylint -E location - -ci_report_errors diff --git a/location/__init__.py b/location/__init__.py index a7a6a5e..af335b7 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,16 +1,21 @@ -from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence, Deque, Any +from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence, Deque, Any, Optional from collections import deque from itertools import islice from datetime import datetime -import os -from os import listdir -from os.path import join from zipfile import ZipFile import logging import csv import re import json +from pathlib import Path +import pytz + +from kython import kompress + + +# pipe install geopy +import geopy # type: ignore import geopy.distance # type: ignore # pip3 install ijson import ijson # type: ignore @@ -18,8 +23,10 @@ import ijson # type: ignore def get_logger(): return logging.getLogger("location") -TAKEOUTS_PATH = "/path/to/takeout" -CACHE_PATH = "/L/data/.cache/location.picklel" + +TAKEOUTS_PATH = Path("/path/to/takeout") +CACHE_PATH = Path("/L/data/.cache/location.picklel") + Tag = str @@ -27,36 +34,52 @@ class Location(NamedTuple): dt: datetime lat: float lon: float - alt: float + alt: Optional[float] tag: Tag -def tagger(dt: datetime, lat: float, lon: float) -> Tag: +def tagger(dt: datetime, point: geopy.Point) -> Tag: TAGS = [ # removed ] for coord, dist, tag in TAGS: - if geopy.distance.distance(coord, (lat, lon)).m < dist: + if geopy.distance.distance(coord, point).m < dist: return tag else: return "other" # TODO hope they are sorted... -# TODO that could also serve as basis for timezone provider. +# TODO that could also serve as basis for tz provider def load_locations() -> Iterator[Location]: - last_takeout = max([f for f in listdir(TAKEOUTS_PATH) if re.match('takeout.*.zip', f)]) - jdata = None - with ZipFile(join(TAKEOUTS_PATH, last_takeout)).open('Takeout/Location History/Location History.json') as fo: - cc = 0 + logger = get_logger() # TODO count errors? + + last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) + + # TODO wonder if old takeouts could contribute as well?? + total = 0 + errors = 0 + with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: for j in ijson.items(fo, 'locations.item'): - dt = datetime.fromtimestamp(int(j["timestampMs"]) / 1000) # TODO utc?? - if cc % 10000 == 0: - print(f'processing {dt}') - cc += 1 - lat = float(j["latitudeE7"] / 10000000) - lon = float(j["longitudeE7"] / 10000000) - alt = float(j["altitude"]) - tag = tagger(dt, lat, lon) + dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000) + if total % 10000 == 0: + logger.info('processing item %d %s', total, dt) + total += 1 + + dt = pytz.utc.localize(dt) + try: + lat = float(j["latitudeE7"] / 10000000) + lon = float(j["longitudeE7"] / 10000000) + point = geopy.Point(lat, lon) # kinda sanity check that coordinates are ok + except Exception as e: + logger.exception(e) + errors += 1 + if float(errors) / total > 0.01: + raise RuntimeError('too many errors! aborting') + else: + continue + + alt = j.get("altitude", None) + tag = tagger(dt, point) # TODO take accuracy into account?? yield Location( dt=dt, lat=lat, @@ -71,7 +94,7 @@ def iter_locations(cached: bool=False) -> Iterator[Location]: import pickle as dill # type: ignore if cached: - with open(CACHE_PATH, 'rb') as fo: + with CACHE_PATH.open('rb') as fo: while True: try: # TODO shit really?? it can't load now, do I need to adjust pythonpath or something?... @@ -180,9 +203,10 @@ def get_groups(cached: bool=False) -> List[LocInterval]: def update_cache(): import pickle as dill # type: ignore - CACHE_PATH_TMP = CACHE_PATH + '.tmp' + CACHE_PATH_TMP = CACHE_PATH.with_suffix('.tmp') # TODO maybe, also keep on /tmp first? - with open(CACHE_PATH_TMP, 'wb', 2 ** 20) as fo: + + with CACHE_PATH_TMP.open('wb', 2 ** 20) as fo: for loc in iter_locations(cached=False): dill.dump(loc, fo) - os.rename(CACHE_PATH_TMP, CACHE_PATH) + CACHE_PATH_TMP.rename(CACHE_PATH) diff --git a/location/__main__.py b/location/__main__.py index 10e3ed3..a6aef4a 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -1,12 +1,14 @@ -from location import get_logger, get_locations, iter_locations, get_groups +import sys +import logging -logger = get_logger() +from location import get_logger, get_locations, iter_locations, get_groups from kython.klogging import setup_logzero -setup_logzero(logger) +logger = get_logger() +setup_logzero(logger, level=logging.INFO) + -import sys if len(sys.argv) > 1: cmd = sys.argv[1] From 22c06a48e12117d2a2d2c133755b21a1b1fdbc65 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 12:53:59 +0200 Subject: [PATCH 11/26] protect main --- location/__main__.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/location/__main__.py b/location/__main__.py index a6aef4a..6c5a481 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -5,20 +5,24 @@ from location import get_logger, get_locations, iter_locations, get_groups from kython.klogging import setup_logzero -logger = get_logger() -setup_logzero(logger, level=logging.INFO) +def main(): + logger = get_logger() + setup_logzero(logger, level=logging.INFO) -if len(sys.argv) > 1: - cmd = sys.argv[1] - if cmd == "update_cache": - from location import update_cache, get_locations - update_cache() - get_locations(cached=True) + if len(sys.argv) > 1: + cmd = sys.argv[1] + if cmd == "update_cache": + from location import update_cache, get_locations + update_cache() + get_locations(cached=True) + else: + raise RuntimeError(f"Unknown command {cmd}") else: - raise RuntimeError(f"Unknown command {cmd}") -else: - for p in get_groups(cached=True): - print(p) - # TODO need datetime! + for p in get_groups(cached=True): + print(p) + # TODO need datetime! + +if __name__ == '__main__': + main() From 7f51eedc0f1c90efe483b00814ad6dfd2ec6ba96 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 12:54:13 +0200 Subject: [PATCH 12/26] extract _load_locations method --- location/__init__.py | 67 ++++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index af335b7..663dc3e 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -48,45 +48,50 @@ def tagger(dt: datetime, point: geopy.Point) -> Tag: else: return "other" +def _load_locations(fo) -> Iterator[Location]: + logger = get_logger() + total = 0 + errors = 0 + + for j in ijson.items(fo, 'locations.item'): + dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000) + if total % 10000 == 0: + logger.info('processing item %d %s', total, dt) + total += 1 + + dt = pytz.utc.localize(dt) + try: + lat = float(j["latitudeE7"] / 10000000) + lon = float(j["longitudeE7"] / 10000000) + point = geopy.Point(lat, lon) # kinda sanity check that coordinates are ok + except Exception as e: + logger.exception(e) + errors += 1 + if float(errors) / total > 0.01: + raise RuntimeError('too many errors! aborting') + else: + continue + + alt = j.get("altitude", None) + tag = tagger(dt, point) # TODO take accuracy into account?? + yield Location( + dt=dt, + lat=lat, + lon=lon, + alt=alt, + tag=tag + ) + # TODO hope they are sorted... # TODO that could also serve as basis for tz provider def load_locations() -> Iterator[Location]: - logger = get_logger() # TODO count errors? + logger = get_logger() last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) # TODO wonder if old takeouts could contribute as well?? - total = 0 - errors = 0 with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: - for j in ijson.items(fo, 'locations.item'): - dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000) - if total % 10000 == 0: - logger.info('processing item %d %s', total, dt) - total += 1 - - dt = pytz.utc.localize(dt) - try: - lat = float(j["latitudeE7"] / 10000000) - lon = float(j["longitudeE7"] / 10000000) - point = geopy.Point(lat, lon) # kinda sanity check that coordinates are ok - except Exception as e: - logger.exception(e) - errors += 1 - if float(errors) / total > 0.01: - raise RuntimeError('too many errors! aborting') - else: - continue - - alt = j.get("altitude", None) - tag = tagger(dt, point) # TODO take accuracy into account?? - yield Location( - dt=dt, - lat=lat, - lon=lon, - alt=alt, - tag=tag - ) + return _load_locations(fo) def iter_locations(cached: bool=False) -> Iterator[Location]: import sys From be2a9dffbd739886ebff7dbc4d73660b7a59ec34 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 12:54:32 +0200 Subject: [PATCH 13/26] initial sql --- sql.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100755 sql.py diff --git a/sql.py b/sql.py new file mode 100755 index 0000000..786608d --- /dev/null +++ b/sql.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +from pathlib import Path +from itertools import islice +import logging + +from location import _load_locations, Location, get_logger +import sqlalchemy as sa # type: ignore + + +# TODO wonder if possible to extract schema automatically? +def xx_obj(): + return Location( + dt=sa.types.TIMESTAMP(timezone=True), # TODO tz? + # TODO FIXME utc seems to be lost.. doesn't sqlite support it or what? + lat=sa.Float, + lon=sa.Float, + alt=sa.Float, # TODO nullable + tag=sa.String, + ) + +def make_schema(obj): + return [sa.Column(col, tp) for col, tp in obj._asdict().items()] + + +def main(): + from kython import setup_logzero + setup_logzero(get_logger(), level=logging.DEBUG) + + db_path = Path('test.sqlite') + if db_path.exists(): + db_path.unlink() + + db = sa.create_engine(f'sqlite:///{db_path}') + engine = db.connect() # TODO do I need to tear anything down?? + meta = sa.MetaData(engine) + schema = make_schema(xx_obj()) + sa.Table('locations', meta, *schema) + meta.create_all() + table = sa.table('locations', *schema) + + + + with Path('/L/tmp/loc/LocationHistory.json').open('r') as fo: + # locs = list(_load_locations(fo)) + locs = list(islice(_load_locations(fo), 0, 30000)) + + # TODO fuck. do I really need to split myself?? + # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables + + # TODO is it quicker to insert anyway? needs unique policy + engine.execute(table.insert().values(locs)) + +if __name__ == '__main__': + main() From 9545db4aaf4d32499df8f154250160e283a217f2 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 13:10:12 +0200 Subject: [PATCH 14/26] saving and loading db --- sql.py | 48 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/sql.py b/sql.py index 786608d..35e3df8 100755 --- a/sql.py +++ b/sql.py @@ -6,6 +6,8 @@ import logging from location import _load_locations, Location, get_logger import sqlalchemy as sa # type: ignore +from kython import ichunks + # TODO wonder if possible to extract schema automatically? def xx_obj(): @@ -22,14 +24,7 @@ def make_schema(obj): return [sa.Column(col, tp) for col, tp in obj._asdict().items()] -def main(): - from kython import setup_logzero - setup_logzero(get_logger(), level=logging.DEBUG) - - db_path = Path('test.sqlite') - if db_path.exists(): - db_path.unlink() - +def save_locs(db_path: Path): db = sa.create_engine(f'sqlite:///{db_path}') engine = db.connect() # TODO do I need to tear anything down?? meta = sa.MetaData(engine) @@ -39,16 +34,43 @@ def main(): table = sa.table('locations', *schema) - with Path('/L/tmp/loc/LocationHistory.json').open('r') as fo: # locs = list(_load_locations(fo)) - locs = list(islice(_load_locations(fo), 0, 30000)) + # TODO fuck. do I really need to split myself?? + # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables + # TODO err wtf?? jsust 80000??? + for chunk in ichunks(_load_locations(fo), 10000): + engine.execute(table.insert().values(chunk)) + + # TODO maintain order during insertion? + +def load_locs(db_path: Path): + db = sa.create_engine(f'sqlite:///{db_path}') + engine = db.connect() # TODO do I need to tear anything down?? + meta = sa.MetaData(engine) + schema = make_schema(xx_obj()) + sa.Table('locations', meta, *schema) + meta.create_all() + table = sa.table('locations', *schema) + + return engine.execute(table.select()).fetchall() + + +def main(): + from kython import setup_logzero + setup_logzero(get_logger(), level=logging.DEBUG) + + db_path = Path('test2.sqlite') + # if db_path.exists(): + # db_path.unlink() + + locs = [Location(**d) for d in load_locs(db_path)][:10] + print(locs) - # TODO fuck. do I really need to split myself?? - # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables # TODO is it quicker to insert anyway? needs unique policy - engine.execute(table.insert().values(locs)) + + # ok, very nice. the whold db is just 4mb now if __name__ == '__main__': main() From 1a765129cba9c91ff1d60c3becd0b5a06a7479d8 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 13:22:19 +0200 Subject: [PATCH 15/26] loading --- sql.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/sql.py b/sql.py index 35e3df8..590cc89 100755 --- a/sql.py +++ b/sql.py @@ -38,13 +38,14 @@ def save_locs(db_path: Path): # locs = list(_load_locations(fo)) # TODO fuck. do I really need to split myself?? # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables - # TODO err wtf?? jsust 80000??? + # TODO count deprecated?? for chunk in ichunks(_load_locations(fo), 10000): engine.execute(table.insert().values(chunk)) + print(engine.execute(table.count()).fetchone()) # TODO maintain order during insertion? -def load_locs(db_path: Path): +def iter_db_locs(db_path: Path): db = sa.create_engine(f'sqlite:///{db_path}') engine = db.connect() # TODO do I need to tear anything down?? meta = sa.MetaData(engine) @@ -53,24 +54,27 @@ def load_locs(db_path: Path): meta.create_all() table = sa.table('locations', *schema) - return engine.execute(table.select()).fetchall() + datas = engine.execute(table.select()).fetchall() + yield from (Location(**d) for d in datas) def main(): from kython import setup_logzero setup_logzero(get_logger(), level=logging.DEBUG) - db_path = Path('test2.sqlite') + db_path = Path('test3.sqlite') # if db_path.exists(): # db_path.unlink() - locs = [Location(**d) for d in load_locs(db_path)][:10] - print(locs) + locs = iter_db_locs(db_path) + print(len(list(locs))) # TODO is it quicker to insert anyway? needs unique policy - # ok, very nice. the whold db is just 4mb now + # ok, very nice. the whold db is just 20mb now + # nice, and loads in seconds basically + # TODO FIXME just need to check timezone if __name__ == '__main__': main() From 9466606d68c0db5e37195e5df2a9b68f13e002f7 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 13:28:46 +0200 Subject: [PATCH 16/26] basic test --- sql.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sql.py b/sql.py index 590cc89..2b07391 100755 --- a/sql.py +++ b/sql.py @@ -24,7 +24,7 @@ def make_schema(obj): return [sa.Column(col, tp) for col, tp in obj._asdict().items()] -def save_locs(db_path: Path): +def cache_locs(source: Path, db_path: Path, limit=None): db = sa.create_engine(f'sqlite:///{db_path}') engine = db.connect() # TODO do I need to tear anything down?? meta = sa.MetaData(engine) @@ -34,14 +34,13 @@ def save_locs(db_path: Path): table = sa.table('locations', *schema) - with Path('/L/tmp/loc/LocationHistory.json').open('r') as fo: - # locs = list(_load_locations(fo)) + with source.open('r') as fo: # TODO fuck. do I really need to split myself?? # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables # TODO count deprecated?? - for chunk in ichunks(_load_locations(fo), 10000): + # print(engine.execute(table.count()).fetchone()) + for chunk in ichunks(islice(_load_locations(fo), 0, limit), 10000): engine.execute(table.insert().values(chunk)) - print(engine.execute(table.count()).fetchone()) # TODO maintain order during insertion? @@ -57,6 +56,16 @@ def iter_db_locs(db_path: Path): datas = engine.execute(table.select()).fetchall() yield from (Location(**d) for d in datas) +def test(tmp_path): + tdir = Path(tmp_path) + tdb = tdir / 'test.sqlite' + + test_src = Path('/L/tmp/loc/LocationHistory.json') + test_limit = 100 + cache_locs(source=test_src, db_path=tdb, limit=test_limit) + + locs = list(iter_db_locs(tdb)) + assert len(locs) == test_limit def main(): from kython import setup_logzero From 6fdbfa260994ffb122193b2ce5bda163e5242a79 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 13:33:04 +0200 Subject: [PATCH 17/26] check for equality --- sql.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sql.py b/sql.py index 2b07391..592a0df 100755 --- a/sql.py +++ b/sql.py @@ -59,13 +59,20 @@ def iter_db_locs(db_path: Path): def test(tmp_path): tdir = Path(tmp_path) tdb = tdir / 'test.sqlite' - - test_src = Path('/L/tmp/loc/LocationHistory.json') test_limit = 100 - cache_locs(source=test_src, db_path=tdb, limit=test_limit) + test_src = Path('/L/tmp/loc/LocationHistory.json') - locs = list(iter_db_locs(tdb)) - assert len(locs) == test_limit + # TODO meh, double loading, but for now fine + with test_src.open('r') as fo: + real_locs = list(islice(_load_locations(fo), 0, test_limit)) + + cache_locs(source=test_src, db_path=tdb, limit=test_limit) + cached_locs = list(iter_db_locs(tdb)) + assert len(cached_locs) == test_limit + def FIXME_tz(locs): + # TODO FIXME tzinfo... + return [x._replace(dt=x.dt.replace(tzinfo=None)) for x in locs] + assert FIXME_tz(real_locs) == FIXME_tz(cached_locs) def main(): from kython import setup_logzero From 08d4c447bf7a9de526cf2d9f7ed3e8bc73fc4eeb Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 14:51:39 +0200 Subject: [PATCH 18/26] automatic extraction of schema --- sql.py | 61 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/sql.py b/sql.py index 592a0df..c8d2118 100755 --- a/sql.py +++ b/sql.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 from pathlib import Path +from datetime import datetime from itertools import islice +from typing import Type, NamedTuple, Union import logging from location import _load_locations, Location, get_logger @@ -9,30 +11,46 @@ import sqlalchemy as sa # type: ignore from kython import ichunks -# TODO wonder if possible to extract schema automatically? -def xx_obj(): - return Location( - dt=sa.types.TIMESTAMP(timezone=True), # TODO tz? +def _map_type(cls): + tmap = { + str: sa.String, + float: sa.Float, + datetime: sa.types.TIMESTAMP(timezone=True), # TODO tz? # TODO FIXME utc seems to be lost.. doesn't sqlite support it or what? - lat=sa.Float, - lon=sa.Float, - alt=sa.Float, # TODO nullable - tag=sa.String, - ) - -def make_schema(obj): - return [sa.Column(col, tp) for col, tp in obj._asdict().items()] + } + r = tmap.get(cls, None) + if r is not None: + return r -def cache_locs(source: Path, db_path: Path, limit=None): + if getattr(cls, '__origin__', None) == Union: + elems = cls.__args__ + elems = [e for e in elems if e != type(None)] + if len(elems) == 1: + return _map_type(elems[0]) # meh.. + raise RuntimeError(f'Unexpected type {cls}') + + + +def make_schema(cls: Type[NamedTuple]): # TODO covariant? + res = [] + for name, ann in cls.__annotations__.items(): + res.append(sa.Column(name, _map_type(ann))) + return res + + +def get_table(db_path: Path, type_, name='table'): db = sa.create_engine(f'sqlite:///{db_path}') engine = db.connect() # TODO do I need to tear anything down?? meta = sa.MetaData(engine) - schema = make_schema(xx_obj()) - sa.Table('locations', meta, *schema) + schema = make_schema(type_) + sa.Table(name, meta, *schema) meta.create_all() - table = sa.table('locations', *schema) + table = sa.table(name, *schema) + return engine, table +def cache_locs(source: Path, db_path: Path, limit=None): + engine, table = get_table(db_path=db_path, type_=Location) with source.open('r') as fo: # TODO fuck. do I really need to split myself?? @@ -45,14 +63,7 @@ def cache_locs(source: Path, db_path: Path, limit=None): # TODO maintain order during insertion? def iter_db_locs(db_path: Path): - db = sa.create_engine(f'sqlite:///{db_path}') - engine = db.connect() # TODO do I need to tear anything down?? - meta = sa.MetaData(engine) - schema = make_schema(xx_obj()) - sa.Table('locations', meta, *schema) - meta.create_all() - table = sa.table('locations', *schema) - + engine, table = get_table(db_path, type_=Location) datas = engine.execute(table.select()).fetchall() yield from (Location(**d) for d in datas) @@ -75,6 +86,8 @@ def test(tmp_path): assert FIXME_tz(real_locs) == FIXME_tz(cached_locs) def main(): + + from kython import setup_logzero setup_logzero(get_logger(), level=logging.DEBUG) From 510d771e6ec97a0a35f3b44a87e02808a272216b Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 16:03:54 +0100 Subject: [PATCH 19/26] handle timezones --- sql.py | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/sql.py b/sql.py index c8d2118..b93475a 100755 --- a/sql.py +++ b/sql.py @@ -2,21 +2,40 @@ from pathlib import Path from datetime import datetime from itertools import islice -from typing import Type, NamedTuple, Union +from typing import Type, NamedTuple, Union, Optional import logging from location import _load_locations, Location, get_logger +import sqlalchemy # type: ignore import sqlalchemy as sa # type: ignore from kython import ichunks +from kython.py37 import fromisoformat + +# TODO move to some common thing? +class IsoDateTime(sqlalchemy.TypeDecorator): + # TODO can we use something more effecient? e.g. blob for encoded datetime and tz? not sure if worth it + impl = sqlalchemy.types.String + + # TODO optional? + def process_bind_param(self, value: Optional[datetime], dialect) -> Optional[str]: + if value is None: + return None + return value.isoformat() + + def process_result_value(self, value: Optional[str], dialect) -> Optional[datetime]: + if value is None: + return None + return fromisoformat(value) + + def _map_type(cls): tmap = { str: sa.String, float: sa.Float, - datetime: sa.types.TIMESTAMP(timezone=True), # TODO tz? - # TODO FIXME utc seems to be lost.. doesn't sqlite support it or what? + datetime: IsoDateTime, } r = tmap.get(cls, None) if r is not None: @@ -30,7 +49,8 @@ def _map_type(cls): return _map_type(elems[0]) # meh.. raise RuntimeError(f'Unexpected type {cls}') - +# TODO to strart with, just assert utc when serializing, deserializing +# TODO how to use timestamp as key? just round it? def make_schema(cls: Type[NamedTuple]): # TODO covariant? res = [] @@ -71,7 +91,7 @@ def test(tmp_path): tdir = Path(tmp_path) tdb = tdir / 'test.sqlite' test_limit = 100 - test_src = Path('/L/tmp/loc/LocationHistory.json') + test_src = Path('/L/tmp/LocationHistory.json') # TODO meh, double loading, but for now fine with test_src.open('r') as fo: @@ -80,10 +100,7 @@ def test(tmp_path): cache_locs(source=test_src, db_path=tdb, limit=test_limit) cached_locs = list(iter_db_locs(tdb)) assert len(cached_locs) == test_limit - def FIXME_tz(locs): - # TODO FIXME tzinfo... - return [x._replace(dt=x.dt.replace(tzinfo=None)) for x in locs] - assert FIXME_tz(real_locs) == FIXME_tz(cached_locs) + assert real_locs == cached_locs def main(): From d05ba9e5a386154c8faeb301bf8c08c5edbd1d8d Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 18:44:52 +0100 Subject: [PATCH 20/26] some WIP on trying sqlalchemy to do what I want --- sql.py | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 113 insertions(+), 13 deletions(-) diff --git a/sql.py b/sql.py index b93475a..53faa97 100755 --- a/sql.py +++ b/sql.py @@ -16,7 +16,8 @@ from kython.py37 import fromisoformat # TODO move to some common thing? class IsoDateTime(sqlalchemy.TypeDecorator): - # TODO can we use something more effecient? e.g. blob for encoded datetime and tz? not sure if worth it + # in theory could use something more effecient? e.g. blob for encoded datetime and tz? + # but practically, the difference seems to be pretty small, so perhaps fine for now impl = sqlalchemy.types.String # TODO optional? @@ -102,25 +103,124 @@ def test(tmp_path): assert len(cached_locs) == test_limit assert real_locs == cached_locs -def main(): - +from kython.ktyping import PathIsh +def make_dbcache(p: PathIsh, hashf): + raise NotImplementedError +# TODO what if we want dynamic path?? +# dbcache = make_dbcache('/L/tmp/test.db', hashf=lambda p: p) # TODO FIXME? + +Hash = str + +# TODO hash is a bit misleading +# TODO perhaps another table is the way to go... + +# TODO careful about concurrent access? +def read_hash(db_path: Path) -> Optional[Hash]: + hash_file = db_path.with_suffix('.hash') + if not hash_file.exists(): + return None + return hash_file.read_text() + +# TODO not sure if there is any way to guarantee atomic reading.... +# unless it happens automatically due to unlink logic? +# TODO need to know entry type? +# TODO or, we can just encode that in names. that way no need for atomic stuff + +# TODO give a better name +class Alala: + def __init__(self, db_path: Path, type_) -> None: + self.db = sa.create_engine(f'sqlite:///{db_path}') + self.engine = self.db.connect() # TODO do I need to tear anything down?? + self.meta = sa.MetaData(self.engine) + self.table_hash = sa.Table('hash' , self.meta, sa.Column('value', sa.types.String)) + + schema = make_schema(type_) + self.table_data = sa.Table('table', self.meta, *schema) + # for t in [self.table_data, self.table_hash]: + # # TODO shit. how to reuse these properly???? + # cols = [c for c in t._columns] + # sa.Table(t.name, self.meta, *cols) + + self.meta.create_all() + + # @property + # def table_hash(self): + # # TODO single entry constraint? + # return sa.table('hash', sa.Column('value', sa.types.String)) + + +def get_dbcache_logger(): + return logging.getLogger('dbcache') + +def dbcache_worker(db_path: PathIsh, hashf, type_, wrapped): + logger = get_dbcache_logger() + + db_path = Path(db_path) + # TODO FIXME make sure we have exclusive write lock + # TODO FIMXE ok, use transactions, then we'd be fine + + alala = Alala(db_path, type_) + engine = alala.engine + # table_hash = sa.table('hash', sa.Column('value', sa.types.String)) + prev_hashes = engine.execute(alala.table_hash.select()).fetchall() + if len(prev_hashes) > 1: + raise RuntimeError(f'Multiple hashes! {prev_hashes}') + + prev_hash: Optional[Hash] + if len(prev_hashes) == 0: + prev_hash = None + else: + prev_hash = prev_hashes[0] + logger.debug('previous hash: %s', prev_hash) + + def wrapper(key): + h = hashf(key) + logger.debug('current hash: %s', h) + assert h is not None # just in case + + with engine.begin() as transaction: + rows = engine.execute(alala.table_data.select()).fetchall() + if h == prev_hash: + rows = engine.execute() + # return type_() + raise NotImplementedError("TODO return data") + else: + datas = wrapped(key) + engine.execute(alala.table_data.insert().values(datas)) # TODO chunks?? + + # TODO FIXME insert and replace instead + # alala.table_hash.drop(engine) + engine.execute(alala.table_hash.delete()) + engine.execute(alala.table_hash.insert().values([{'value': h}])) + return datas + # TODO engine is leaking?? + return wrapper + +def wrapped(path: Path): + return [] # TODO + +def hashf(path: Path): + return str(path) # TODO mtime + +def main(): from kython import setup_logzero setup_logzero(get_logger(), level=logging.DEBUG) + setup_logzero(get_dbcache_logger(), level=logging.DEBUG) - db_path = Path('test3.sqlite') - # if db_path.exists(): - # db_path.unlink() + src_path = Path('hi') - locs = iter_db_locs(db_path) - print(len(list(locs))) + db_path = Path('test.sqlite') + if db_path.exists(): + db_path.unlink() + new_wrapped = dbcache_worker(db_path=db_path, hashf=hashf, type_=Location, wrapped=wrapped) + res = new_wrapped(src_path) + print(res) - # TODO is it quicker to insert anyway? needs unique policy - - # ok, very nice. the whold db is just 20mb now - # nice, and loads in seconds basically - # TODO FIXME just need to check timezone + # cache_locs(source=Path('/L/tmp/LocationHistory.json'), db_path=db_path) + # locs = iter_db_locs(db_path) + # print(len(list(locs))) if __name__ == '__main__': main() From 055cb1c373085b85850a51330a34d58038b6e350 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 21:45:31 +0100 Subject: [PATCH 21/26] finally somewhat works.. --- sql.py | 115 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 59 insertions(+), 56 deletions(-) diff --git a/sql.py b/sql.py index 53faa97..7f0a46a 100755 --- a/sql.py +++ b/sql.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 from pathlib import Path +import functools from datetime import datetime from itertools import islice from typing import Type, NamedTuple, Union, Optional @@ -104,8 +105,6 @@ def test(tmp_path): assert real_locs == cached_locs from kython.ktyping import PathIsh -def make_dbcache(p: PathIsh, hashf): - raise NotImplementedError # TODO what if we want dynamic path?? # dbcache = make_dbcache('/L/tmp/test.db', hashf=lambda p: p) # TODO FIXME? @@ -137,71 +136,74 @@ class Alala: schema = make_schema(type_) self.table_data = sa.Table('table', self.meta, *schema) - # for t in [self.table_data, self.table_hash]: - # # TODO shit. how to reuse these properly???? - # cols = [c for c in t._columns] - # sa.Table(t.name, self.meta, *cols) - self.meta.create_all() - # @property - # def table_hash(self): - # # TODO single entry constraint? - # return sa.table('hash', sa.Column('value', sa.types.String)) - def get_dbcache_logger(): return logging.getLogger('dbcache') -def dbcache_worker(db_path: PathIsh, hashf, type_, wrapped): +# TODO ugh. there should be a nicer way to wrap that... +def make_dbcache(db_path: PathIsh, hashf, type_): logger = get_dbcache_logger() - db_path = Path(db_path) - # TODO FIXME make sure we have exclusive write lock - # TODO FIMXE ok, use transactions, then we'd be fine + def dec(func): + @functools.wraps(func) + def wrapper(key): + # TODO FIXME make sure we have exclusive write lock - alala = Alala(db_path, type_) - engine = alala.engine - # table_hash = sa.table('hash', sa.Column('value', sa.types.String)) - prev_hashes = engine.execute(alala.table_hash.select()).fetchall() - if len(prev_hashes) > 1: - raise RuntimeError(f'Multiple hashes! {prev_hashes}') + alala = Alala(db_path, type_) + engine = alala.engine - prev_hash: Optional[Hash] - if len(prev_hashes) == 0: - prev_hash = None - else: - prev_hash = prev_hashes[0] - logger.debug('previous hash: %s', prev_hash) + prev_hashes = engine.execute(alala.table_hash.select()).fetchall() + if len(prev_hashes) > 1: + raise RuntimeError(f'Multiple hashes! {prev_hashes}') - def wrapper(key): - h = hashf(key) - logger.debug('current hash: %s', h) - assert h is not None # just in case - - with engine.begin() as transaction: - rows = engine.execute(alala.table_data.select()).fetchall() - if h == prev_hash: - rows = engine.execute() - # return type_() - raise NotImplementedError("TODO return data") + prev_hash: Optional[Hash] + if len(prev_hashes) == 0: + prev_hash = None else: - datas = wrapped(key) - engine.execute(alala.table_data.insert().values(datas)) # TODO chunks?? + prev_hash = prev_hashes[0][0] # TODO ugh, returns a tuple... + logger.debug('previous hash: %s', prev_hash) - # TODO FIXME insert and replace instead - # alala.table_hash.drop(engine) - engine.execute(alala.table_hash.delete()) - engine.execute(alala.table_hash.insert().values([{'value': h}])) - return datas - # TODO engine is leaking?? - return wrapper + h = hashf(key) + logger.debug('current hash: %s', h) + assert h is not None # just in case -def wrapped(path: Path): - return [] # TODO + with engine.begin() as transaction: + if h == prev_hash: + rows = engine.execute(alala.table_data.select()).fetchall() + return [type_(**row) for row in rows] + else: + datas = func(key) + if len(datas) > 0: + engine.execute(alala.table_data.insert().values(datas)) # TODO chunks?? + + # TODO FIXME insert and replace instead + engine.execute(alala.table_hash.delete()) + engine.execute(alala.table_hash.insert().values([{'value': h}])) + return datas + return wrapper + + # TODO FIXME engine is leaking?? + return dec + + +def hashf(path: Path) -> Hash: + mt = int(path.stat().st_mtime) + return f'{path}.{mt}' + +dbcache = make_dbcache('test.sqlite', hashf=hashf, type_=Location) + +@dbcache +def _xxx_locations(path: Path): + with path.open('r') as fo: + return list(islice(_load_locations(fo), 0, 100)) + + +def xxx_locations(): + test_src = Path('/L/tmp/LocationHistory.json') + return _xxx_locations(test_src) -def hashf(path: Path): - return str(path) # TODO mtime def main(): from kython import setup_logzero @@ -211,11 +213,12 @@ def main(): src_path = Path('hi') db_path = Path('test.sqlite') - if db_path.exists(): - db_path.unlink() + # if db_path.exists(): + # db_path.unlink() - new_wrapped = dbcache_worker(db_path=db_path, hashf=hashf, type_=Location, wrapped=wrapped) - res = new_wrapped(src_path) + res = xxx_locations() + # new_wrapped = dbcache_worker(db_path=db_path, hashf=hashf, type_=Location, wrapped=wrapped) + # res = new_wrapped(src_path) print(res) # cache_locs(source=Path('/L/tmp/LocationHistory.json'), db_path=db_path) From 22333d507808a487fdcaa2e400dc270377d55cb0 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 30 Apr 2019 22:38:43 +0100 Subject: [PATCH 22/26] start using kython.kcache --- location/__init__.py | 49 +++++---- location/__main__.py | 6 +- sql.py | 229 ------------------------------------------- 3 files changed, 28 insertions(+), 256 deletions(-) delete mode 100755 sql.py diff --git a/location/__init__.py b/location/__init__.py index 663dc3e..5f5bd01 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -12,6 +12,7 @@ import pytz from kython import kompress +from kython.kcache import make_dbcache, mtime_hash # pipe install geopy @@ -25,7 +26,6 @@ def get_logger(): TAKEOUTS_PATH = Path("/path/to/takeout") -CACHE_PATH = Path("/L/data/.cache/location.picklel") Tag = str @@ -37,6 +37,8 @@ class Location(NamedTuple): alt: Optional[float] tag: Tag +dbcache = make_dbcache('/L/data/.cache/location.sqlite', hashf=mtime_hash, type_=Location) + def tagger(dt: datetime, point: geopy.Point) -> Tag: TAGS = [ @@ -48,7 +50,9 @@ def tagger(dt: datetime, point: geopy.Point) -> Tag: else: return "other" -def _load_locations(fo) -> Iterator[Location]: + +# TODO careful, might not fit in glumov ram... +def _iter_locations_fo(fo) -> Iterator[Location]: logger = get_logger() total = 0 errors = 0 @@ -84,32 +88,27 @@ def _load_locations(fo) -> Iterator[Location]: # TODO hope they are sorted... # TODO that could also serve as basis for tz provider -def load_locations() -> Iterator[Location]: - logger = get_logger() - - last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) +@dbcache +def _iter_locations(path: Path) -> List[Location]: + limit = None + # TODO FIXME support archives + with path.open('r') as fo: + return list(islice(_iter_locations_fo(fo), 0, limit)) # TODO wonder if old takeouts could contribute as well?? - with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: - return _load_locations(fo) + # with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: + # return _iter_locations_fo(fo) + + +# TODO shit.. should support iterator.. +def iter_locations() -> List[Location]: + last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) + last_takeout = Path('/L/tmp/LocationHistory.json') + return _iter_locations(last_takeout) -def iter_locations(cached: bool=False) -> Iterator[Location]: import sys sys.path.append('/L/Dropbox/data/location_provider') # jeez.. otherwise it refuses to unpickle :( - import pickle as dill # type: ignore - if cached: - with CACHE_PATH.open('rb') as fo: - while True: - try: - # TODO shit really?? it can't load now, do I need to adjust pythonpath or something?... - pre = dill.load(fo) - yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods... - except EOFError: - break - else: - yield from load_locations() - def get_locations(cached: bool=False) -> Sequence[Location]: return list(iter_locations(cached=cached)) @@ -155,11 +154,9 @@ class Window: -# TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here # TODO maybe if tag is none, we just don't care? -def get_groups(cached: bool=False) -> List[LocInterval]: - print("cached", cached) - all_locations = iter_locations(cached=cached) +def get_groups() -> List[LocInterval]: + all_locations = iter(iter_locations()) # TODO locsi = Window(all_locations) i = 0 groups: List[LocInterval] = [] diff --git a/location/__main__.py b/location/__main__.py index 6c5a481..e246c3c 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -4,15 +4,19 @@ import logging from location import get_logger, get_locations, iter_locations, get_groups from kython.klogging import setup_logzero +from kython.kcache import get_kcache_logger + def main(): logger = get_logger() setup_logzero(logger, level=logging.INFO) + setup_logzero(get_kcache_logger(), level=logging.DEBUG) if len(sys.argv) > 1: cmd = sys.argv[1] + # TODO ok, update cache makes sense just to refresh in case of code changes... if cmd == "update_cache": from location import update_cache, get_locations update_cache() @@ -20,7 +24,7 @@ def main(): else: raise RuntimeError(f"Unknown command {cmd}") else: - for p in get_groups(cached=True): + for p in get_groups(): print(p) # TODO need datetime! diff --git a/sql.py b/sql.py deleted file mode 100755 index 7f0a46a..0000000 --- a/sql.py +++ /dev/null @@ -1,229 +0,0 @@ -#!/usr/bin/env python3 -from pathlib import Path -import functools -from datetime import datetime -from itertools import islice -from typing import Type, NamedTuple, Union, Optional -import logging - -from location import _load_locations, Location, get_logger -import sqlalchemy # type: ignore -import sqlalchemy as sa # type: ignore - -from kython import ichunks - - -from kython.py37 import fromisoformat - -# TODO move to some common thing? -class IsoDateTime(sqlalchemy.TypeDecorator): - # in theory could use something more effecient? e.g. blob for encoded datetime and tz? - # but practically, the difference seems to be pretty small, so perhaps fine for now - impl = sqlalchemy.types.String - - # TODO optional? - def process_bind_param(self, value: Optional[datetime], dialect) -> Optional[str]: - if value is None: - return None - return value.isoformat() - - def process_result_value(self, value: Optional[str], dialect) -> Optional[datetime]: - if value is None: - return None - return fromisoformat(value) - - -def _map_type(cls): - tmap = { - str: sa.String, - float: sa.Float, - datetime: IsoDateTime, - } - r = tmap.get(cls, None) - if r is not None: - return r - - - if getattr(cls, '__origin__', None) == Union: - elems = cls.__args__ - elems = [e for e in elems if e != type(None)] - if len(elems) == 1: - return _map_type(elems[0]) # meh.. - raise RuntimeError(f'Unexpected type {cls}') - -# TODO to strart with, just assert utc when serializing, deserializing -# TODO how to use timestamp as key? just round it? - -def make_schema(cls: Type[NamedTuple]): # TODO covariant? - res = [] - for name, ann in cls.__annotations__.items(): - res.append(sa.Column(name, _map_type(ann))) - return res - - -def get_table(db_path: Path, type_, name='table'): - db = sa.create_engine(f'sqlite:///{db_path}') - engine = db.connect() # TODO do I need to tear anything down?? - meta = sa.MetaData(engine) - schema = make_schema(type_) - sa.Table(name, meta, *schema) - meta.create_all() - table = sa.table(name, *schema) - return engine, table - -def cache_locs(source: Path, db_path: Path, limit=None): - engine, table = get_table(db_path=db_path, type_=Location) - - with source.open('r') as fo: - # TODO fuck. do I really need to split myself?? - # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables - # TODO count deprecated?? - # print(engine.execute(table.count()).fetchone()) - for chunk in ichunks(islice(_load_locations(fo), 0, limit), 10000): - engine.execute(table.insert().values(chunk)) - - # TODO maintain order during insertion? - -def iter_db_locs(db_path: Path): - engine, table = get_table(db_path, type_=Location) - datas = engine.execute(table.select()).fetchall() - yield from (Location(**d) for d in datas) - -def test(tmp_path): - tdir = Path(tmp_path) - tdb = tdir / 'test.sqlite' - test_limit = 100 - test_src = Path('/L/tmp/LocationHistory.json') - - # TODO meh, double loading, but for now fine - with test_src.open('r') as fo: - real_locs = list(islice(_load_locations(fo), 0, test_limit)) - - cache_locs(source=test_src, db_path=tdb, limit=test_limit) - cached_locs = list(iter_db_locs(tdb)) - assert len(cached_locs) == test_limit - assert real_locs == cached_locs - -from kython.ktyping import PathIsh - -# TODO what if we want dynamic path?? -# dbcache = make_dbcache('/L/tmp/test.db', hashf=lambda p: p) # TODO FIXME? - -Hash = str - -# TODO hash is a bit misleading -# TODO perhaps another table is the way to go... - -# TODO careful about concurrent access? -def read_hash(db_path: Path) -> Optional[Hash]: - hash_file = db_path.with_suffix('.hash') - if not hash_file.exists(): - return None - return hash_file.read_text() - -# TODO not sure if there is any way to guarantee atomic reading.... -# unless it happens automatically due to unlink logic? -# TODO need to know entry type? -# TODO or, we can just encode that in names. that way no need for atomic stuff - -# TODO give a better name -class Alala: - def __init__(self, db_path: Path, type_) -> None: - self.db = sa.create_engine(f'sqlite:///{db_path}') - self.engine = self.db.connect() # TODO do I need to tear anything down?? - self.meta = sa.MetaData(self.engine) - self.table_hash = sa.Table('hash' , self.meta, sa.Column('value', sa.types.String)) - - schema = make_schema(type_) - self.table_data = sa.Table('table', self.meta, *schema) - self.meta.create_all() - - -def get_dbcache_logger(): - return logging.getLogger('dbcache') - -# TODO ugh. there should be a nicer way to wrap that... -def make_dbcache(db_path: PathIsh, hashf, type_): - logger = get_dbcache_logger() - db_path = Path(db_path) - def dec(func): - @functools.wraps(func) - def wrapper(key): - # TODO FIXME make sure we have exclusive write lock - - alala = Alala(db_path, type_) - engine = alala.engine - - prev_hashes = engine.execute(alala.table_hash.select()).fetchall() - if len(prev_hashes) > 1: - raise RuntimeError(f'Multiple hashes! {prev_hashes}') - - prev_hash: Optional[Hash] - if len(prev_hashes) == 0: - prev_hash = None - else: - prev_hash = prev_hashes[0][0] # TODO ugh, returns a tuple... - logger.debug('previous hash: %s', prev_hash) - - h = hashf(key) - logger.debug('current hash: %s', h) - assert h is not None # just in case - - with engine.begin() as transaction: - if h == prev_hash: - rows = engine.execute(alala.table_data.select()).fetchall() - return [type_(**row) for row in rows] - else: - datas = func(key) - if len(datas) > 0: - engine.execute(alala.table_data.insert().values(datas)) # TODO chunks?? - - # TODO FIXME insert and replace instead - engine.execute(alala.table_hash.delete()) - engine.execute(alala.table_hash.insert().values([{'value': h}])) - return datas - return wrapper - - # TODO FIXME engine is leaking?? - return dec - - -def hashf(path: Path) -> Hash: - mt = int(path.stat().st_mtime) - return f'{path}.{mt}' - -dbcache = make_dbcache('test.sqlite', hashf=hashf, type_=Location) - -@dbcache -def _xxx_locations(path: Path): - with path.open('r') as fo: - return list(islice(_load_locations(fo), 0, 100)) - - -def xxx_locations(): - test_src = Path('/L/tmp/LocationHistory.json') - return _xxx_locations(test_src) - - -def main(): - from kython import setup_logzero - setup_logzero(get_logger(), level=logging.DEBUG) - setup_logzero(get_dbcache_logger(), level=logging.DEBUG) - - src_path = Path('hi') - - db_path = Path('test.sqlite') - # if db_path.exists(): - # db_path.unlink() - - res = xxx_locations() - # new_wrapped = dbcache_worker(db_path=db_path, hashf=hashf, type_=Location, wrapped=wrapped) - # res = new_wrapped(src_path) - print(res) - - # cache_locs(source=Path('/L/tmp/LocationHistory.json'), db_path=db_path) - # locs = iter_db_locs(db_path) - # print(len(list(locs))) - -if __name__ == '__main__': - main() From 001f030fff732bf59f7f7b74ca19f0d17bc513ea Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Wed, 1 May 2019 00:08:38 +0100 Subject: [PATCH 23/26] fully switch to kython.kcache --- location/__init__.py | 43 +++++++++++++++++++------------------------ location/__main__.py | 3 +-- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index 5f5bd01..a07a0d3 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -26,6 +26,7 @@ def get_logger(): TAKEOUTS_PATH = Path("/path/to/takeout") +CACHE_PATH = Path('/L/data/.cache/location.sqlite') Tag = str @@ -37,7 +38,7 @@ class Location(NamedTuple): alt: Optional[float] tag: Tag -dbcache = make_dbcache('/L/data/.cache/location.sqlite', hashf=mtime_hash, type_=Location) +dbcache = make_dbcache(CACHE_PATH, hashf=mtime_hash, type_=Location, chunk_by=10000) def tagger(dt: datetime, point: geopy.Point) -> Tag: @@ -51,7 +52,6 @@ def tagger(dt: datetime, point: geopy.Point) -> Tag: return "other" -# TODO careful, might not fit in glumov ram... def _iter_locations_fo(fo) -> Iterator[Location]: logger = get_logger() total = 0 @@ -89,29 +89,26 @@ def _iter_locations_fo(fo) -> Iterator[Location]: # TODO hope they are sorted... # TODO that could also serve as basis for tz provider @dbcache -def _iter_locations(path: Path) -> List[Location]: +def _iter_locations(path: Path) -> Iterator[Location]: limit = None - # TODO FIXME support archives - with path.open('r') as fo: - return list(islice(_iter_locations_fo(fo), 0, limit)) + if path.suffix == '.json': + ctx = path.open('r') + else: # must be a takeout archive + ctx = kompress.open(path, 'Takeout/Location History/Location History.json') + + with ctx as fo: + yield from islice(_iter_locations_fo(fo), 0, limit) # TODO wonder if old takeouts could contribute as well?? - # with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: - # return _iter_locations_fo(fo) -# TODO shit.. should support iterator.. -def iter_locations() -> List[Location]: +def iter_locations() -> Iterator[Location]: last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) - last_takeout = Path('/L/tmp/LocationHistory.json') return _iter_locations(last_takeout) - import sys - sys.path.append('/L/Dropbox/data/location_provider') # jeez.. otherwise it refuses to unpickle :( - -def get_locations(cached: bool=False) -> Sequence[Location]: - return list(iter_locations(cached=cached)) +def get_locations() -> Sequence[Location]: + return list(iter_locations()) class LocInterval(NamedTuple): from_: Location @@ -203,12 +200,10 @@ def get_groups() -> List[LocInterval]: dump_group() return groups -def update_cache(): - import pickle as dill # type: ignore - CACHE_PATH_TMP = CACHE_PATH.with_suffix('.tmp') - # TODO maybe, also keep on /tmp first? - with CACHE_PATH_TMP.open('wb', 2 ** 20) as fo: - for loc in iter_locations(cached=False): - dill.dump(loc, fo) - CACHE_PATH_TMP.rename(CACHE_PATH) +def update_cache(): + # TODO perhaps set hash to null instead, that's a bit less intrusive + if CACHE_PATH.exists(): + CACHE_PATH.unlink() + for _ in iter_locations(): + pass diff --git a/location/__main__.py b/location/__main__.py index e246c3c..a796ea6 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -13,19 +13,18 @@ def main(): setup_logzero(get_kcache_logger(), level=logging.DEBUG) - if len(sys.argv) > 1: cmd = sys.argv[1] # TODO ok, update cache makes sense just to refresh in case of code changes... if cmd == "update_cache": from location import update_cache, get_locations update_cache() - get_locations(cached=True) else: raise RuntimeError(f"Unknown command {cmd}") else: for p in get_groups(): print(p) + # shit. ok, 4 gigs of ram is def too much for glumov... # TODO need datetime! if __name__ == '__main__': From d23a0abca288f038537f434f778cbf17768df1cc Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Wed, 1 May 2019 18:41:01 +0100 Subject: [PATCH 24/26] add logging for get_groups --- location/__init__.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index a07a0d3..2dd696c 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -38,7 +38,7 @@ class Location(NamedTuple): alt: Optional[float] tag: Tag -dbcache = make_dbcache(CACHE_PATH, hashf=mtime_hash, type_=Location, chunk_by=10000) +dbcache = make_dbcache(CACHE_PATH, hashf=mtime_hash, type_=Location, chunk_by=10000, logger=get_logger()) def tagger(dt: datetime, point: geopy.Point) -> Tag: @@ -153,6 +153,8 @@ class Window: # TODO maybe if tag is none, we just don't care? def get_groups() -> List[LocInterval]: + logger = get_logger() + all_locations = iter(iter_locations()) # TODO locsi = Window(all_locations) i = 0 @@ -174,8 +176,9 @@ def get_groups() -> List[LocInterval]: curg = [] while locsi.exists(i): - # if i % 1000 == 0: - # print("processing " + str(i)) + if i % 10000 == 0: + logger.debug('grouping item %d', i) + locsi.consume_to(i) last = None if len(curg) == 0 else curg[-1] From 8bbcf6aa1d1c548cca262529e1fdd97c44002a68 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Fri, 26 Jul 2019 21:31:48 +0100 Subject: [PATCH 25/26] fix hash function --- location/__init__.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index 2dd696c..b20d500 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -52,12 +52,12 @@ def tagger(dt: datetime, point: geopy.Point) -> Tag: return "other" -def _iter_locations_fo(fo) -> Iterator[Location]: +def _iter_locations_fo(fo, start, stop) -> Iterator[Location]: logger = get_logger() total = 0 errors = 0 - for j in ijson.items(fo, 'locations.item'): + for j in islice(ijson.items(fo, 'locations.item'), start, stop): dt = datetime.utcfromtimestamp(int(j["timestampMs"]) / 1000) if total % 10000 == 0: logger.info('processing item %d %s', total, dt) @@ -89,22 +89,20 @@ def _iter_locations_fo(fo) -> Iterator[Location]: # TODO hope they are sorted... # TODO that could also serve as basis for tz provider @dbcache -def _iter_locations(path: Path) -> Iterator[Location]: - limit = None - +def _iter_locations(path: Path, start=0, stop=None) -> Iterator[Location]: if path.suffix == '.json': ctx = path.open('r') else: # must be a takeout archive ctx = kompress.open(path, 'Takeout/Location History/Location History.json') with ctx as fo: - yield from islice(_iter_locations_fo(fo), 0, limit) + yield from _iter_locations_fo(fo, start=start, stop=stop) # TODO wonder if old takeouts could contribute as well?? -def iter_locations() -> Iterator[Location]: +def iter_locations(**kwargs) -> Iterator[Location]: last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) - return _iter_locations(last_takeout) + return _iter_locations(path=last_takeout, **kwargs) def get_locations() -> Sequence[Location]: From e1a251d88a52d59f61b418f7f37fe86f9d28de61 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sat, 27 Jul 2019 12:12:59 +0100 Subject: [PATCH 26/26] switch to cachew --- location/__init__.py | 8 +++----- location/__main__.py | 4 +--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index b20d500..35c3c7a 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -12,7 +12,8 @@ import pytz from kython import kompress -from kython.kcache import make_dbcache, mtime_hash + +from cachew import cachew, mtime_hash # pipe install geopy @@ -38,8 +39,6 @@ class Location(NamedTuple): alt: Optional[float] tag: Tag -dbcache = make_dbcache(CACHE_PATH, hashf=mtime_hash, type_=Location, chunk_by=10000, logger=get_logger()) - def tagger(dt: datetime, point: geopy.Point) -> Tag: TAGS = [ @@ -87,8 +86,7 @@ def _iter_locations_fo(fo, start, stop) -> Iterator[Location]: ) # TODO hope they are sorted... -# TODO that could also serve as basis for tz provider -@dbcache +@cachew(CACHE_PATH, hashf=mtime_hash, cls=Location, chunk_by=10000, logger=get_logger()) def _iter_locations(path: Path, start=0, stop=None) -> Iterator[Location]: if path.suffix == '.json': ctx = path.open('r') diff --git a/location/__main__.py b/location/__main__.py index a796ea6..442d4df 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -4,13 +4,11 @@ import logging from location import get_logger, get_locations, iter_locations, get_groups from kython.klogging import setup_logzero -from kython.kcache import get_kcache_logger def main(): logger = get_logger() - setup_logzero(logger, level=logging.INFO) - setup_logzero(get_kcache_logger(), level=logging.DEBUG) + setup_logzero(logger, level=logging.DEBUG) if len(sys.argv) > 1: