From 20585a313023f9cccd5f05abf9f7e3663e9264f3 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sun, 21 Feb 2021 22:01:57 +0000 Subject: [PATCH] influxdb: WIP on magic automatic interface to run: python3 -c 'import my.core.influxdb as I; import my.hypothesis as H; I.magic_fill(H.highlights)' --- my/core/influxdb.py | 61 ++++++++++++++++++++++++++++++++++++++------- my/core/pandas.py | 11 ++++---- 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/my/core/influxdb.py b/my/core/influxdb.py index d6d8d72..3b2b850 100644 --- a/my/core/influxdb.py +++ b/my/core/influxdb.py @@ -1,8 +1,7 @@ ''' TODO doesn't really belong to 'core' morally, but can think of moving out later ''' -from typing import Iterable, Any, Optional - +from typing import Iterable, Any, Optional, Dict from .common import LazyLogger, asdict, Json @@ -14,7 +13,7 @@ class config: db = 'db' -def fill(it: Iterable[Any], *, measurement: str, reset: bool=False) -> None: +def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str='dt') -> None: # todo infer dt column automatically, reuse in stat? # it doesn't like dots, ends up some syntax error? measurement = measurement.replace('.', '_') @@ -31,19 +30,38 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False) -> None: if reset: client.delete_series(database=db, measurement=measurement) + # TODO need to take schema here... + cache: Dict[str, bool] = {} + def good(f, v) -> bool: + c = cache.get(f) + if c is not None: + return c + t = type(v) + r = t in {str, int} + cache[f] = r + if not r: + logger.warning('%s: filtering out %s=%s because of type %s', measurement, f, v, t) + return r + + def filter_dict(d: Json) -> Json: + return {f: v for f, v in d.items() if good(f, v)} + def dit() -> Iterable[Json]: for i in it: d = asdict(i) tags: Optional[Json] = None - tags = d.get('tags') # meh... handle in a more robust manner - if tags is not None: + tags_ = d.get('tags') # meh... handle in a more robust manner + if tags_ is not None and isinstance(tags_, dict): # FIXME meh. del d['tags'] + tags = tags_ # TODO what to do with exceptions?? # todo handle errors.. not sure how? maybe add tag for 'error' and fill with emtpy data? - dt = d['dt'].isoformat() - del d['dt'] - fields = d + dt = d[dt_col].isoformat() + del d[dt_col] + + fields = filter_dict(d) + yield dict( measurement=measurement, # TODO maybe good idea to tag with database file/name? to inspect inconsistencies etc.. @@ -52,7 +70,7 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False) -> None: # "fields are data and tags are metadata" tags=tags, time=dt, - fields=d, + fields=fields, ) @@ -64,3 +82,28 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False) -> None: logger.debug('writing next chunk %s', chl[-1]) client.write_points(chl, database=db) # todo "Specify timestamp precision when writing to InfluxDB."? + + +def magic_fill(it) -> None: + assert callable(it) + name = f'{it.__module__}:{it.__name__}' + + from itertools import tee + from more_itertools import first, one + it = it() + it, x = tee(it) + f = first(x, default=None) + if f is None: + logger.warning('%s has no data', name) + return + + # TODO can we reuse pandas code or something? + # + from .pandas import _as_columns + schema = _as_columns(type(f)) + + from datetime import datetime + dtex = RuntimeError(f'expected single datetime field. schema: {schema}') + dtf = one((f for f, t in schema.items() if t == datetime), too_short=dtex, too_long=dtex) + + fill(it, measurement=name, reset=True, dt_col=dtf) diff --git a/my/core/pandas.py b/my/core/pandas.py index f1eead4..90b49ce 100644 --- a/my/core/pandas.py +++ b/my/core/pandas.py @@ -5,7 +5,7 @@ Various pandas helpers and convenience functions # NOTE: this file is meant to be importable without Pandas installed from datetime import datetime from pprint import pformat -from typing import Optional, TYPE_CHECKING, Any, Iterable, Type, List +from typing import Optional, TYPE_CHECKING, Any, Iterable, Type, List, Dict from . import warnings, Res from .common import LazyLogger @@ -105,12 +105,13 @@ error_to_row = error_to_json # todo deprecate? # no type for dataclass? Schema = Any -def _as_columns(s: Schema) -> List[str]: +def _as_columns(s: Schema) -> Dict[str, Type]: + # todo would be nice to extract properties; add tests for this as well import dataclasses as D if D.is_dataclass(s): - return [f.name for f in D.fields(s)] + return {f.name: f.type for f in D.fields(s)} # else must be NamedTuple?? - return list(getattr(s, '_fields')) + return getattr(s, '_field_types') # todo add proper types @@ -125,7 +126,7 @@ def as_dataframe(it: Iterable[Res[Any]], schema: Optional[Schema]=None) -> DataF # so we need to convert each individually... sigh from .common import to_jsons import pandas as pd - columns = None if schema is None else _as_columns(schema) + columns = None if schema is None else list(_as_columns(schema).keys()) return pd.DataFrame(to_jsons(it), columns=columns)