diff --git a/misc/rescuetime_cleanup.py b/misc/rescuetime_cleanup.py index 356211e..9f3d824 100644 --- a/misc/rescuetime_cleanup.py +++ b/misc/rescuetime_cleanup.py @@ -10,10 +10,23 @@ import sys # todo function to reload hpi? todel = [m for m in sys.modules if m.startswith('my.')] -# for m in todel: del sys.modules[m] +for m in todel: del sys.modules[m] import my +# todo add to doc? +from my.core import get_files +from my.config import rescuetime as RC + +# todo ugh. doesn't work?? +# from my.core.cachew import disable_cachew +# disable_cachew() +# RC.export_path = get_files(RC.export_path)[-1:] + import my.rescuetime as M +# print(len(list(M.entries()))) +M.fill_influxdb() + +ffwf from itertools import islice, groupby from more_itertools import ilen, bucket diff --git a/my/core/common.py b/my/core/common.py index 27e9d9b..2258af6 100644 --- a/my/core/common.py +++ b/my/core/common.py @@ -471,3 +471,13 @@ def asdict(thing) -> Json: # must be a NT otherwise? # todo add a proper check.. () return thing._asdict() + + +# todo not sure about naming +def to_jsons(it) -> Iterable[Json]: + from .error import error_to_json # prevent circular import + for r in it: + if isinstance(r, Exception): + yield error_to_json(r) + else: + yield asdict(r) diff --git a/my/core/error.py b/my/core/error.py index 61bce82..1d55d4a 100644 --- a/my/core/error.py +++ b/my/core/error.py @@ -143,6 +143,19 @@ def extract_error_datetime(e: Exception) -> Optional[datetime]: return None +import traceback +from .common import Json +def error_to_json(e: Exception, *, dt_col: str='dt', tz=None) -> Json: + edt = extract_error_datetime(e) + if edt is not None and edt.tzinfo is None and tz is not None: + edt = edt.replace(tzinfo=tz) + estr = ''.join(traceback.format_exception(Exception, e, e.__traceback__)) + return { + 'error': estr, + dt_col : edt, + } + + def test_datetime_errors() -> None: import pytz dt_notz = datetime.now() diff --git a/my/core/pandas.py b/my/core/pandas.py index f1d4c5c..af7ec00 100644 --- a/my/core/pandas.py +++ b/my/core/pandas.py @@ -97,18 +97,8 @@ def check_dataframe(f: FuncT, error_col_policy: ErrorColPolicy='add_if_missing', # todo doctor: could have a suggesion to wrap dataframes with it?? discover by return type? -import traceback -from typing import Dict, Any -from .error import extract_error_datetime -def error_to_row(e: Exception, *, dt_col: str='dt', tz=None) -> Dict[str, Any]: - edt = extract_error_datetime(e) - if edt is not None and edt.tzinfo is None and tz is not None: - edt = edt.replace(tzinfo=tz) - estr = ''.join(traceback.format_exception(Exception, e, e.__traceback__)) - return { - 'error': estr, - dt_col : edt, - } +from .error import error_to_json +error_to_row = error_to_json # todo deprecate? # todo add proper types @@ -120,8 +110,7 @@ def as_dataframe(it: Iterable[Res[Any]]) -> DataFrameT: # https://github.com/pandas-dev/pandas/blob/fc9fdba6592bdb5d0d1147ce4d65639acd897565/pandas/core/frame.py#L562 # same for NamedTuple -- seems that it takes whatever schema the first NT has # so we need to convert each individually... sigh - from .common import asdict - ie = (error_to_row(r) if isinstance(r, Exception) else asdict(r) for r in it) # TODO just add tests for it? + from .common import to_jsons import pandas as pd - return pd.DataFrame(ie) + return pd.DataFrame(to_jsons(it)) diff --git a/my/rescuetime.py b/my/rescuetime.py index a616c33..c338b1b 100644 --- a/my/rescuetime.py +++ b/my/rescuetime.py @@ -16,7 +16,7 @@ from .core.error import Res, split_errors from my.config import rescuetime as config -log = LazyLogger(__package__, level='info') +log = LazyLogger(__name__, level='info') def inputs() -> Sequence[Path]: @@ -80,25 +80,33 @@ def fake_data(rows: int=1000) -> Iterator[None]: # todo not sure if I want to keep these here? vvv - # guess should move to core? or to 'ext' module, i.e. interfaces? # make automatic -def fill_influxdb(): +def fill_influxdb() -> None: + from .core.common import asdict + from influxdb import InfluxDBClient # type: ignore client = InfluxDBClient() - # client.delete_series(database='lastfm', measurement='phone') - db = 'test' - client.drop_database(db) - client.create_database(db) - # todo handle errors - vit = (e for e in entries() if isinstance(e, dal.Entry)) - jsons = [{ - "measurement": 'phone', - "tags": {}, - "time": str(e.dt), - "fields": {"name": e.activity}, - } for e in vit] - client.write_points(jsons, database=db) # TODO?? + db = 'db' + measurement = __name__.replace('.', '_') + client.delete_series(database=db, measurement=measurement) + # client.drop_database(db) + # todo create if not exists? + # client.create_database(db) + # todo handle errors.. not sure how? maybe add tag for 'error' and fill with emtpy data? + vit = (e for e in entries() if isinstance(e, Entry)) + jsons = ({ + 'measurement': measurement, # hmm, influx doesn't like dots? + # hmm, so tags are autoindexed and might be faster? + # not sure what's the big difference though + # "fields are data and tags are metadata" + 'tags': {'activity': e.activity}, + 'time': e.dt.isoformat(), + 'fields': {'duration_s': e.duration_s}, + # todo asdict(e), + } for e in vit) + # todo do we need to batch? + client.write_points(jsons, database=db) # TODO lots of garbage in dir()? maybe need to del the imports...