diff --git a/location/__init__.py b/location/__init__.py index 663dc3e..5f5bd01 100644 --- a/location/__init__.py +++ b/location/__init__.py @@ -12,6 +12,7 @@ import pytz from kython import kompress +from kython.kcache import make_dbcache, mtime_hash # pipe install geopy @@ -25,7 +26,6 @@ def get_logger(): TAKEOUTS_PATH = Path("/path/to/takeout") -CACHE_PATH = Path("/L/data/.cache/location.picklel") Tag = str @@ -37,6 +37,8 @@ class Location(NamedTuple): alt: Optional[float] tag: Tag +dbcache = make_dbcache('/L/data/.cache/location.sqlite', hashf=mtime_hash, type_=Location) + def tagger(dt: datetime, point: geopy.Point) -> Tag: TAGS = [ @@ -48,7 +50,9 @@ def tagger(dt: datetime, point: geopy.Point) -> Tag: else: return "other" -def _load_locations(fo) -> Iterator[Location]: + +# TODO careful, might not fit in glumov ram... +def _iter_locations_fo(fo) -> Iterator[Location]: logger = get_logger() total = 0 errors = 0 @@ -84,32 +88,27 @@ def _load_locations(fo) -> Iterator[Location]: # TODO hope they are sorted... # TODO that could also serve as basis for tz provider -def load_locations() -> Iterator[Location]: - logger = get_logger() - - last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) +@dbcache +def _iter_locations(path: Path) -> List[Location]: + limit = None + # TODO FIXME support archives + with path.open('r') as fo: + return list(islice(_iter_locations_fo(fo), 0, limit)) # TODO wonder if old takeouts could contribute as well?? - with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: - return _load_locations(fo) + # with kompress.open(last_takeout, 'Takeout/Location History/Location History.json') as fo: + # return _iter_locations_fo(fo) + + +# TODO shit.. should support iterator.. +def iter_locations() -> List[Location]: + last_takeout = max(TAKEOUTS_PATH.glob('takeout*.zip')) + last_takeout = Path('/L/tmp/LocationHistory.json') + return _iter_locations(last_takeout) -def iter_locations(cached: bool=False) -> Iterator[Location]: import sys sys.path.append('/L/Dropbox/data/location_provider') # jeez.. otherwise it refuses to unpickle :( - import pickle as dill # type: ignore - if cached: - with CACHE_PATH.open('rb') as fo: - while True: - try: - # TODO shit really?? it can't load now, do I need to adjust pythonpath or something?... - pre = dill.load(fo) - yield Location(**pre._asdict()) # meh. but otherwise it's not serialising methods... - except EOFError: - break - else: - yield from load_locations() - def get_locations(cached: bool=False) -> Sequence[Location]: return list(iter_locations(cached=cached)) @@ -155,11 +154,9 @@ class Window: -# TOOD could cache groups too?... using 16% cpu is a bit annoying.. could also use some sliding window here # TODO maybe if tag is none, we just don't care? -def get_groups(cached: bool=False) -> List[LocInterval]: - print("cached", cached) - all_locations = iter_locations(cached=cached) +def get_groups() -> List[LocInterval]: + all_locations = iter(iter_locations()) # TODO locsi = Window(all_locations) i = 0 groups: List[LocInterval] = [] diff --git a/location/__main__.py b/location/__main__.py index 6c5a481..e246c3c 100644 --- a/location/__main__.py +++ b/location/__main__.py @@ -4,15 +4,19 @@ import logging from location import get_logger, get_locations, iter_locations, get_groups from kython.klogging import setup_logzero +from kython.kcache import get_kcache_logger + def main(): logger = get_logger() setup_logzero(logger, level=logging.INFO) + setup_logzero(get_kcache_logger(), level=logging.DEBUG) if len(sys.argv) > 1: cmd = sys.argv[1] + # TODO ok, update cache makes sense just to refresh in case of code changes... if cmd == "update_cache": from location import update_cache, get_locations update_cache() @@ -20,7 +24,7 @@ def main(): else: raise RuntimeError(f"Unknown command {cmd}") else: - for p in get_groups(cached=True): + for p in get_groups(): print(p) # TODO need datetime! diff --git a/sql.py b/sql.py deleted file mode 100755 index 7f0a46a..0000000 --- a/sql.py +++ /dev/null @@ -1,229 +0,0 @@ -#!/usr/bin/env python3 -from pathlib import Path -import functools -from datetime import datetime -from itertools import islice -from typing import Type, NamedTuple, Union, Optional -import logging - -from location import _load_locations, Location, get_logger -import sqlalchemy # type: ignore -import sqlalchemy as sa # type: ignore - -from kython import ichunks - - -from kython.py37 import fromisoformat - -# TODO move to some common thing? -class IsoDateTime(sqlalchemy.TypeDecorator): - # in theory could use something more effecient? e.g. blob for encoded datetime and tz? - # but practically, the difference seems to be pretty small, so perhaps fine for now - impl = sqlalchemy.types.String - - # TODO optional? - def process_bind_param(self, value: Optional[datetime], dialect) -> Optional[str]: - if value is None: - return None - return value.isoformat() - - def process_result_value(self, value: Optional[str], dialect) -> Optional[datetime]: - if value is None: - return None - return fromisoformat(value) - - -def _map_type(cls): - tmap = { - str: sa.String, - float: sa.Float, - datetime: IsoDateTime, - } - r = tmap.get(cls, None) - if r is not None: - return r - - - if getattr(cls, '__origin__', None) == Union: - elems = cls.__args__ - elems = [e for e in elems if e != type(None)] - if len(elems) == 1: - return _map_type(elems[0]) # meh.. - raise RuntimeError(f'Unexpected type {cls}') - -# TODO to strart with, just assert utc when serializing, deserializing -# TODO how to use timestamp as key? just round it? - -def make_schema(cls: Type[NamedTuple]): # TODO covariant? - res = [] - for name, ann in cls.__annotations__.items(): - res.append(sa.Column(name, _map_type(ann))) - return res - - -def get_table(db_path: Path, type_, name='table'): - db = sa.create_engine(f'sqlite:///{db_path}') - engine = db.connect() # TODO do I need to tear anything down?? - meta = sa.MetaData(engine) - schema = make_schema(type_) - sa.Table(name, meta, *schema) - meta.create_all() - table = sa.table(name, *schema) - return engine, table - -def cache_locs(source: Path, db_path: Path, limit=None): - engine, table = get_table(db_path=db_path, type_=Location) - - with source.open('r') as fo: - # TODO fuck. do I really need to split myself?? - # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) too many SQL variables - # TODO count deprecated?? - # print(engine.execute(table.count()).fetchone()) - for chunk in ichunks(islice(_load_locations(fo), 0, limit), 10000): - engine.execute(table.insert().values(chunk)) - - # TODO maintain order during insertion? - -def iter_db_locs(db_path: Path): - engine, table = get_table(db_path, type_=Location) - datas = engine.execute(table.select()).fetchall() - yield from (Location(**d) for d in datas) - -def test(tmp_path): - tdir = Path(tmp_path) - tdb = tdir / 'test.sqlite' - test_limit = 100 - test_src = Path('/L/tmp/LocationHistory.json') - - # TODO meh, double loading, but for now fine - with test_src.open('r') as fo: - real_locs = list(islice(_load_locations(fo), 0, test_limit)) - - cache_locs(source=test_src, db_path=tdb, limit=test_limit) - cached_locs = list(iter_db_locs(tdb)) - assert len(cached_locs) == test_limit - assert real_locs == cached_locs - -from kython.ktyping import PathIsh - -# TODO what if we want dynamic path?? -# dbcache = make_dbcache('/L/tmp/test.db', hashf=lambda p: p) # TODO FIXME? - -Hash = str - -# TODO hash is a bit misleading -# TODO perhaps another table is the way to go... - -# TODO careful about concurrent access? -def read_hash(db_path: Path) -> Optional[Hash]: - hash_file = db_path.with_suffix('.hash') - if not hash_file.exists(): - return None - return hash_file.read_text() - -# TODO not sure if there is any way to guarantee atomic reading.... -# unless it happens automatically due to unlink logic? -# TODO need to know entry type? -# TODO or, we can just encode that in names. that way no need for atomic stuff - -# TODO give a better name -class Alala: - def __init__(self, db_path: Path, type_) -> None: - self.db = sa.create_engine(f'sqlite:///{db_path}') - self.engine = self.db.connect() # TODO do I need to tear anything down?? - self.meta = sa.MetaData(self.engine) - self.table_hash = sa.Table('hash' , self.meta, sa.Column('value', sa.types.String)) - - schema = make_schema(type_) - self.table_data = sa.Table('table', self.meta, *schema) - self.meta.create_all() - - -def get_dbcache_logger(): - return logging.getLogger('dbcache') - -# TODO ugh. there should be a nicer way to wrap that... -def make_dbcache(db_path: PathIsh, hashf, type_): - logger = get_dbcache_logger() - db_path = Path(db_path) - def dec(func): - @functools.wraps(func) - def wrapper(key): - # TODO FIXME make sure we have exclusive write lock - - alala = Alala(db_path, type_) - engine = alala.engine - - prev_hashes = engine.execute(alala.table_hash.select()).fetchall() - if len(prev_hashes) > 1: - raise RuntimeError(f'Multiple hashes! {prev_hashes}') - - prev_hash: Optional[Hash] - if len(prev_hashes) == 0: - prev_hash = None - else: - prev_hash = prev_hashes[0][0] # TODO ugh, returns a tuple... - logger.debug('previous hash: %s', prev_hash) - - h = hashf(key) - logger.debug('current hash: %s', h) - assert h is not None # just in case - - with engine.begin() as transaction: - if h == prev_hash: - rows = engine.execute(alala.table_data.select()).fetchall() - return [type_(**row) for row in rows] - else: - datas = func(key) - if len(datas) > 0: - engine.execute(alala.table_data.insert().values(datas)) # TODO chunks?? - - # TODO FIXME insert and replace instead - engine.execute(alala.table_hash.delete()) - engine.execute(alala.table_hash.insert().values([{'value': h}])) - return datas - return wrapper - - # TODO FIXME engine is leaking?? - return dec - - -def hashf(path: Path) -> Hash: - mt = int(path.stat().st_mtime) - return f'{path}.{mt}' - -dbcache = make_dbcache('test.sqlite', hashf=hashf, type_=Location) - -@dbcache -def _xxx_locations(path: Path): - with path.open('r') as fo: - return list(islice(_load_locations(fo), 0, 100)) - - -def xxx_locations(): - test_src = Path('/L/tmp/LocationHistory.json') - return _xxx_locations(test_src) - - -def main(): - from kython import setup_logzero - setup_logzero(get_logger(), level=logging.DEBUG) - setup_logzero(get_dbcache_logger(), level=logging.DEBUG) - - src_path = Path('hi') - - db_path = Path('test.sqlite') - # if db_path.exists(): - # db_path.unlink() - - res = xxx_locations() - # new_wrapped = dbcache_worker(db_path=db_path, hashf=hashf, type_=Location, wrapped=wrapped) - # res = new_wrapped(src_path) - print(res) - - # cache_locs(source=Path('/L/tmp/LocationHistory.json'), db_path=db_path) - # locs = iter_db_locs(db_path) - # print(len(list(locs))) - -if __name__ == '__main__': - main()