From cb39e5a00e4c9f616ad22f801aeb00f814c6797f Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Mon, 12 Nov 2018 21:29:47 +0000 Subject: [PATCH] iterator for processing window; make location provider consume less memory --- location/__init__.py | 93 +++++++++++++++++++++++++++++++++++--------- location/__main__.py | 6 +-- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/location/__init__.py b/location/__init__.py index e8c7b6c..7d3eac8 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -1,5 +1,8 @@ from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence +from collections import deque +from itertools import islice from datetime import datetime +import os from os import listdir from os.path import join from zipfile import ZipFile @@ -16,10 +19,7 @@ def get_logger(): return logging.getLogger("location") TAKEOUTS_PATH = "/path/to/takeout" -CACHE_PATH = "/L/data/.cache/location.pickle" - -# TODO need to cache? -# TODO tag?? +CACHE_PATH = "/L/data/.cache/location.picklel" Tag = str @@ -42,13 +42,16 @@ def tagger(dt: datetime, lat: float, lon: float) -> Tag: # TODO hope they are sorted... # TODO that could also serve as basis for timezone provider. -def iter_locations() -> Iterator[Location]: +def load_locations() -> Iterator[Location]: last_takeout = max([f for f in listdir(TAKEOUTS_PATH) if re.match('takeout.*.zip', f)]) jdata = None with ZipFile(join(TAKEOUTS_PATH, last_takeout)).open('Takeout/Location History/Location History.json') as fo: + cc = 0 for j in ijson.items(fo, 'locations.item'): - # TODO eh, not very streaming?.. dt = datetime.fromtimestamp(int(j["timestampMs"]) / 1000) # TODO utc?? + if cc % 10000 == 0: + print(f'processing {dt}') + cc += 1 lat = float(j["latitudeE7"] / 10000000) lon = float(j["longitudeE7"] / 10000000) tag = tagger(dt, lat, lon) @@ -59,22 +62,67 @@ def iter_locations() -> Iterator[Location]: tag=tag ) -def get_locations(cached: bool=False) -> Sequence[Location]: +def iter_locations(cached: bool=False) -> Iterator[Location]: import dill # type: ignore if cached: with open(CACHE_PATH, 'rb') as fo: - preph = dill.load(fo) - return [Location(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods... + # TODO while fo has more data? + while True: + try: + pre = dill.load(fo) + yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods... + except EOFError: + break else: - return list(iter_locations()) + yield from load_locations() + + +def get_locations(cached: bool=False) -> Sequence[Location]: + return list(iter_locations(cached=cached)) class LocInterval(NamedTuple): from_: Location to: Location + +# TODO kython? nicer interface? +class Window: + def __init__(self, it): + self.it = it + self.storage = deque() + self.start = 0 + self.end = 0 + + # TODO need check for existence? + def load_to(self, to): + while to >= self.end: + try: + ii = next(self.it) + self.storage.append(ii) + self.end += 1 + except StopIteration: + break + def exists(self, i): + self.load_to(i) + return i < self.end + + def consume_to(self, i): + self.load_to(i) + consumed = i - self.start + self.start = i + for _ in range(consumed): + self.storage.popleft() + + def __getitem__(self, i): + self.load_to(i) + ii = i - self.start + assert ii >= 0 + return self.storage[ii] + # TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here +# TODO maybe if tag is none, we just don't care? def get_groups(cached: bool=False) -> List[LocInterval]: - locs = get_locations(cached=cached) + locsi = Window(iter_locations(cached=cached)) i = 0 groups: List[LocInterval] = [] curg: List[Location] = [] @@ -89,16 +137,21 @@ def get_groups(cached: bool=False) -> List[LocInterval]: def dump_group(): nonlocal curg if len(curg) > 0: + # print("new group") groups.append(LocInterval(from_=curg[0], to=curg[-1])) curg = [] - while i < len(locs): + while locsi.exists(i): + # if i % 100 == 0: + # print("processing " + str(i)) + locsi.consume_to(i) + last = None if len(curg) == 0 else curg[-1] - cur = locs[i] + cur = locsi[i] j = i match = False - while not match and j < len(locs) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour? - cur = locs[j] + while not match and locsi.exists(j) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour? + cur = locsi[j] if last is None or cur.tag == last.tag: # ok add_to_group(cur) @@ -115,8 +168,12 @@ def get_groups(cached: bool=False) -> List[LocInterval]: dump_group() return groups +# TODO ok, def cache groups. def update_cache(): import dill # type: ignore - datas = get_locations(cached=False) - with open(CACHE_PATH, 'wb') as fo: - dill.dump(datas, fo) + CACHE_PATH_TMP = CACHE_PATH + '.tmp' + # TODO maybe, also keep on /tmp first? + with open(CACHE_PATH_TMP, 'wb', 2 ** 20) as fo: + for loc in iter_locations(cached=False): + dill.dump(loc, fo) + os.rename(CACHE_PATH_TMP, CACHE_PATH) diff --git a/location/__main__.py b/location/__main__.py index 0106a89..6eed9db 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -1,4 +1,4 @@ -from location import get_logger, get_locations, iter_locations +from location import get_logger, get_locations, iter_locations, get_groups logger = get_logger() @@ -17,7 +17,7 @@ if len(sys.argv) > 1: else: raise RuntimeError(f"Unknown command {cmd}") else: - for p in iter_locations(): + for p in get_groups(cached=True): pass # TODO need datetime! - print(p) + # print(p)