core: more generic functions to jsonify data, rescuetime: fix influxdb filling
This commit is contained in:
parent
07f901e1e5
commit
4012f9b7c2
5 changed files with 65 additions and 32 deletions
|
@ -10,10 +10,23 @@ import sys
|
||||||
|
|
||||||
# todo function to reload hpi?
|
# todo function to reload hpi?
|
||||||
todel = [m for m in sys.modules if m.startswith('my.')]
|
todel = [m for m in sys.modules if m.startswith('my.')]
|
||||||
# for m in todel: del sys.modules[m]
|
for m in todel: del sys.modules[m]
|
||||||
|
|
||||||
import my
|
import my
|
||||||
|
# todo add to doc?
|
||||||
|
from my.core import get_files
|
||||||
|
from my.config import rescuetime as RC
|
||||||
|
|
||||||
|
# todo ugh. doesn't work??
|
||||||
|
# from my.core.cachew import disable_cachew
|
||||||
|
# disable_cachew()
|
||||||
|
# RC.export_path = get_files(RC.export_path)[-1:]
|
||||||
|
|
||||||
import my.rescuetime as M
|
import my.rescuetime as M
|
||||||
|
# print(len(list(M.entries())))
|
||||||
|
M.fill_influxdb()
|
||||||
|
|
||||||
|
ffwf
|
||||||
|
|
||||||
from itertools import islice, groupby
|
from itertools import islice, groupby
|
||||||
from more_itertools import ilen, bucket
|
from more_itertools import ilen, bucket
|
||||||
|
|
|
@ -471,3 +471,13 @@ def asdict(thing) -> Json:
|
||||||
# must be a NT otherwise?
|
# must be a NT otherwise?
|
||||||
# todo add a proper check.. ()
|
# todo add a proper check.. ()
|
||||||
return thing._asdict()
|
return thing._asdict()
|
||||||
|
|
||||||
|
|
||||||
|
# todo not sure about naming
|
||||||
|
def to_jsons(it) -> Iterable[Json]:
|
||||||
|
from .error import error_to_json # prevent circular import
|
||||||
|
for r in it:
|
||||||
|
if isinstance(r, Exception):
|
||||||
|
yield error_to_json(r)
|
||||||
|
else:
|
||||||
|
yield asdict(r)
|
||||||
|
|
|
@ -143,6 +143,19 @@ def extract_error_datetime(e: Exception) -> Optional[datetime]:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
import traceback
|
||||||
|
from .common import Json
|
||||||
|
def error_to_json(e: Exception, *, dt_col: str='dt', tz=None) -> Json:
|
||||||
|
edt = extract_error_datetime(e)
|
||||||
|
if edt is not None and edt.tzinfo is None and tz is not None:
|
||||||
|
edt = edt.replace(tzinfo=tz)
|
||||||
|
estr = ''.join(traceback.format_exception(Exception, e, e.__traceback__))
|
||||||
|
return {
|
||||||
|
'error': estr,
|
||||||
|
dt_col : edt,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_datetime_errors() -> None:
|
def test_datetime_errors() -> None:
|
||||||
import pytz
|
import pytz
|
||||||
dt_notz = datetime.now()
|
dt_notz = datetime.now()
|
||||||
|
|
|
@ -97,18 +97,8 @@ def check_dataframe(f: FuncT, error_col_policy: ErrorColPolicy='add_if_missing',
|
||||||
# todo doctor: could have a suggesion to wrap dataframes with it?? discover by return type?
|
# todo doctor: could have a suggesion to wrap dataframes with it?? discover by return type?
|
||||||
|
|
||||||
|
|
||||||
import traceback
|
from .error import error_to_json
|
||||||
from typing import Dict, Any
|
error_to_row = error_to_json # todo deprecate?
|
||||||
from .error import extract_error_datetime
|
|
||||||
def error_to_row(e: Exception, *, dt_col: str='dt', tz=None) -> Dict[str, Any]:
|
|
||||||
edt = extract_error_datetime(e)
|
|
||||||
if edt is not None and edt.tzinfo is None and tz is not None:
|
|
||||||
edt = edt.replace(tzinfo=tz)
|
|
||||||
estr = ''.join(traceback.format_exception(Exception, e, e.__traceback__))
|
|
||||||
return {
|
|
||||||
'error': estr,
|
|
||||||
dt_col : edt,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# todo add proper types
|
# todo add proper types
|
||||||
|
@ -120,8 +110,7 @@ def as_dataframe(it: Iterable[Res[Any]]) -> DataFrameT:
|
||||||
# https://github.com/pandas-dev/pandas/blob/fc9fdba6592bdb5d0d1147ce4d65639acd897565/pandas/core/frame.py#L562
|
# https://github.com/pandas-dev/pandas/blob/fc9fdba6592bdb5d0d1147ce4d65639acd897565/pandas/core/frame.py#L562
|
||||||
# same for NamedTuple -- seems that it takes whatever schema the first NT has
|
# same for NamedTuple -- seems that it takes whatever schema the first NT has
|
||||||
# so we need to convert each individually... sigh
|
# so we need to convert each individually... sigh
|
||||||
from .common import asdict
|
|
||||||
ie = (error_to_row(r) if isinstance(r, Exception) else asdict(r) for r in it)
|
|
||||||
# TODO just add tests for it?
|
# TODO just add tests for it?
|
||||||
|
from .common import to_jsons
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
return pd.DataFrame(ie)
|
return pd.DataFrame(to_jsons(it))
|
||||||
|
|
|
@ -16,7 +16,7 @@ from .core.error import Res, split_errors
|
||||||
from my.config import rescuetime as config
|
from my.config import rescuetime as config
|
||||||
|
|
||||||
|
|
||||||
log = LazyLogger(__package__, level='info')
|
log = LazyLogger(__name__, level='info')
|
||||||
|
|
||||||
|
|
||||||
def inputs() -> Sequence[Path]:
|
def inputs() -> Sequence[Path]:
|
||||||
|
@ -80,25 +80,33 @@ def fake_data(rows: int=1000) -> Iterator[None]:
|
||||||
|
|
||||||
|
|
||||||
# todo not sure if I want to keep these here? vvv
|
# todo not sure if I want to keep these here? vvv
|
||||||
|
|
||||||
# guess should move to core? or to 'ext' module, i.e. interfaces?
|
# guess should move to core? or to 'ext' module, i.e. interfaces?
|
||||||
# make automatic
|
# make automatic
|
||||||
def fill_influxdb():
|
def fill_influxdb() -> None:
|
||||||
|
from .core.common import asdict
|
||||||
|
|
||||||
from influxdb import InfluxDBClient # type: ignore
|
from influxdb import InfluxDBClient # type: ignore
|
||||||
client = InfluxDBClient()
|
client = InfluxDBClient()
|
||||||
# client.delete_series(database='lastfm', measurement='phone')
|
db = 'db'
|
||||||
db = 'test'
|
measurement = __name__.replace('.', '_')
|
||||||
client.drop_database(db)
|
client.delete_series(database=db, measurement=measurement)
|
||||||
client.create_database(db)
|
# client.drop_database(db)
|
||||||
# todo handle errors
|
# todo create if not exists?
|
||||||
vit = (e for e in entries() if isinstance(e, dal.Entry))
|
# client.create_database(db)
|
||||||
jsons = [{
|
# todo handle errors.. not sure how? maybe add tag for 'error' and fill with emtpy data?
|
||||||
"measurement": 'phone',
|
vit = (e for e in entries() if isinstance(e, Entry))
|
||||||
"tags": {},
|
jsons = ({
|
||||||
"time": str(e.dt),
|
'measurement': measurement, # hmm, influx doesn't like dots?
|
||||||
"fields": {"name": e.activity},
|
# hmm, so tags are autoindexed and might be faster?
|
||||||
} for e in vit]
|
# not sure what's the big difference though
|
||||||
client.write_points(jsons, database=db) # TODO??
|
# "fields are data and tags are metadata"
|
||||||
|
'tags': {'activity': e.activity},
|
||||||
|
'time': e.dt.isoformat(),
|
||||||
|
'fields': {'duration_s': e.duration_s},
|
||||||
|
# todo asdict(e),
|
||||||
|
} for e in vit)
|
||||||
|
# todo do we need to batch?
|
||||||
|
client.write_points(jsons, database=db)
|
||||||
|
|
||||||
|
|
||||||
# TODO lots of garbage in dir()? maybe need to del the imports...
|
# TODO lots of garbage in dir()? maybe need to del the imports...
|
||||||
|
|
Loading…
Add table
Reference in a new issue