iterator for processing window; make location provider consume less memory

This commit is contained in:
Dima Gerasimov 2018-11-12 21:29:47 +00:00
parent 1ca1bcd309
commit cb39e5a00e
2 changed files with 78 additions and 21 deletions

View file

@ -1,5 +1,8 @@
from typing import NamedTuple, Iterator, List, Iterable, Collection, Sequence
from collections import deque
from itertools import islice
from datetime import datetime
import os
from os import listdir
from os.path import join
from zipfile import ZipFile
@ -16,10 +19,7 @@ def get_logger():
return logging.getLogger("location")
TAKEOUTS_PATH = "/path/to/takeout"
CACHE_PATH = "/L/data/.cache/location.pickle"
# TODO need to cache?
# TODO tag??
CACHE_PATH = "/L/data/.cache/location.picklel"
Tag = str
@ -42,13 +42,16 @@ def tagger(dt: datetime, lat: float, lon: float) -> Tag:
# TODO hope they are sorted...
# TODO that could also serve as basis for timezone provider.
def iter_locations() -> Iterator[Location]:
def load_locations() -> Iterator[Location]:
last_takeout = max([f for f in listdir(TAKEOUTS_PATH) if re.match('takeout.*.zip', f)])
jdata = None
with ZipFile(join(TAKEOUTS_PATH, last_takeout)).open('Takeout/Location History/Location History.json') as fo:
cc = 0
for j in ijson.items(fo, 'locations.item'):
# TODO eh, not very streaming?..
dt = datetime.fromtimestamp(int(j["timestampMs"]) / 1000) # TODO utc??
if cc % 10000 == 0:
print(f'processing {dt}')
cc += 1
lat = float(j["latitudeE7"] / 10000000)
lon = float(j["longitudeE7"] / 10000000)
tag = tagger(dt, lat, lon)
@ -59,22 +62,67 @@ def iter_locations() -> Iterator[Location]:
tag=tag
)
def get_locations(cached: bool=False) -> Sequence[Location]:
def iter_locations(cached: bool=False) -> Iterator[Location]:
import dill # type: ignore
if cached:
with open(CACHE_PATH, 'rb') as fo:
preph = dill.load(fo)
return [Location(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods...
# TODO while fo has more data?
while True:
try:
pre = dill.load(fo)
yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods...
except EOFError:
break
else:
return list(iter_locations())
yield from load_locations()
def get_locations(cached: bool=False) -> Sequence[Location]:
return list(iter_locations(cached=cached))
class LocInterval(NamedTuple):
from_: Location
to: Location
# TODO kython? nicer interface?
class Window:
def __init__(self, it):
self.it = it
self.storage = deque()
self.start = 0
self.end = 0
# TODO need check for existence?
def load_to(self, to):
while to >= self.end:
try:
ii = next(self.it)
self.storage.append(ii)
self.end += 1
except StopIteration:
break
def exists(self, i):
self.load_to(i)
return i < self.end
def consume_to(self, i):
self.load_to(i)
consumed = i - self.start
self.start = i
for _ in range(consumed):
self.storage.popleft()
def __getitem__(self, i):
self.load_to(i)
ii = i - self.start
assert ii >= 0
return self.storage[ii]
# TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here
# TODO maybe if tag is none, we just don't care?
def get_groups(cached: bool=False) -> List[LocInterval]:
locs = get_locations(cached=cached)
locsi = Window(iter_locations(cached=cached))
i = 0
groups: List[LocInterval] = []
curg: List[Location] = []
@ -89,16 +137,21 @@ def get_groups(cached: bool=False) -> List[LocInterval]:
def dump_group():
nonlocal curg
if len(curg) > 0:
# print("new group")
groups.append(LocInterval(from_=curg[0], to=curg[-1]))
curg = []
while i < len(locs):
while locsi.exists(i):
# if i % 100 == 0:
# print("processing " + str(i))
locsi.consume_to(i)
last = None if len(curg) == 0 else curg[-1]
cur = locs[i]
cur = locsi[i]
j = i
match = False
while not match and j < len(locs) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour?
cur = locs[j]
while not match and locsi.exists(j) and j < i + 10: # TODO FIXME time distance here... e.g. half an hour?
cur = locsi[j]
if last is None or cur.tag == last.tag:
# ok
add_to_group(cur)
@ -115,8 +168,12 @@ def get_groups(cached: bool=False) -> List[LocInterval]:
dump_group()
return groups
# TODO ok, def cache groups.
def update_cache():
import dill # type: ignore
datas = get_locations(cached=False)
with open(CACHE_PATH, 'wb') as fo:
dill.dump(datas, fo)
CACHE_PATH_TMP = CACHE_PATH + '.tmp'
# TODO maybe, also keep on /tmp first?
with open(CACHE_PATH_TMP, 'wb', 2 ** 20) as fo:
for loc in iter_locations(cached=False):
dill.dump(loc, fo)
os.rename(CACHE_PATH_TMP, CACHE_PATH)

View file

@ -1,4 +1,4 @@
from location import get_logger, get_locations, iter_locations
from location import get_logger, get_locations, iter_locations, get_groups
logger = get_logger()
@ -17,7 +17,7 @@ if len(sys.argv) > 1:
else:
raise RuntimeError(f"Unknown command {cmd}")
else:
for p in iter_locations():
for p in get_groups(cached=True):
pass
# TODO need datetime!
print(p)
# print(p)