''' Rescuetime (activity tracking) data ''' from pathlib import Path from datetime import datetime, timedelta from typing import Sequence, Iterable from .core import get_files, LazyLogger from .core.common import mcachew from .core.error import Res, split_errors import more_itertools from my.config import rescuetime as config log = LazyLogger(__package__, level='info') def inputs() -> Sequence[Path]: return get_files(config.export_path) import my.config.repos.rescuexport.dal as dal DAL = dal.DAL Entry = dal.Entry @mcachew(hashf=lambda: inputs()) def entries() -> Iterable[Entry]: dal = DAL(inputs()) it = dal.entries() vit, eit = split_errors(it, ET=Exception) # todo handle errors, I guess initially I didn't because it's unclear how to easily group? # todo would be nice if logger unwrapped causes by default?? yield from vit def groups(gap=timedelta(hours=3)): vit = entries() from more_itertools import split_when yield from split_when(vit, lambda a, b: (b.dt - a.dt) > gap) def stats(): from .core import stat return { **stat(groups), **stat(entries), } # basically, hack config and populate it with fake data? fake data generated by DAL, but the rest is handled by this? from contextlib import contextmanager # todo take seed, or what? @contextmanager def fake_data(rows=1000): # todo also disable cachew automatically for such things? # TODO right, disabled_cachew won't work here because at that point, entries() is already wrapped? # I guess need to fix this in cachew? from .core.cachew import disabled_cachew from .core.cfg import override_config from tempfile import TemporaryDirectory with disabled_cachew(), override_config(config) as cfg, TemporaryDirectory() as td: tdir = Path(td) cfg.export_path = tdir f = tdir / 'rescuetime.json' import json f.write_text(json.dumps(dal.fake_data_generator(rows=rows))) yield # TODO ok, now it's something that actually could run on CI! # todo not sure if I want to keep these here? vvv def fill_influxdb(): from influxdb import InfluxDBClient # type: ignore client = InfluxDBClient() # client.delete_series(database='lastfm', measurement='phone') db = 'test' client.drop_database(db) client.create_database(db) vit = entries() jsons = [{ "measurement": 'phone', "tags": {}, "time": str(e.dt), "fields": {"name": e.activity}, } for e in vit] client.write_points(jsons, database=db) # TODO??