From 0517f7ffb89fa53f2df33f80882968e01c65e59b Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Mon, 26 Apr 2021 09:41:39 +0100 Subject: [PATCH] core/influxdb: add main method to create influx measurement and fill with values allows running something like python3 -m my.core.influxdb populate my.zotero --- my/core/common.py | 2 +- my/core/influxdb.py | 41 ++++++++++++++++++++++++++++++++++++++--- my/core/stats.py | 18 ++++++++++++------ my/hypothesis.py | 1 + 4 files changed, 52 insertions(+), 10 deletions(-) diff --git a/my/core/common.py b/my/core/common.py index 892a375..48bc4c7 100644 --- a/my/core/common.py +++ b/my/core/common.py @@ -596,7 +596,7 @@ datetime_aware = datetime def assert_subpackage(name: str) -> None: # can lead to some unexpected issues if you 'import cachew' which being in my/core directory.. so let's protect against it # NOTE: if we use overlay, name can be smth like my.origg.my.core.cachew ... - assert 'my.core' in name, f'Expected module __name__ ({name}) to start with my.core' + assert name == '__main__' or 'my.core' in name, f'Expected module __name__ ({name}) to be __main__ or start with my.core' # https://stackoverflow.com/a/10436851/706389 diff --git a/my/core/influxdb.py b/my/core/influxdb.py index 7349b0e..a5e950f 100644 --- a/my/core/influxdb.py +++ b/my/core/influxdb.py @@ -15,7 +15,10 @@ class config: db = 'db' -def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str='dt') -> None: +RESET_DEFAULT = False + + +def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_col: str='dt') -> None: # todo infer dt column automatically, reuse in stat? # it doesn't like dots, ends up some syntax error? measurement = measurement.replace('.', '_') @@ -30,6 +33,7 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str= # todo should be it be env variable? if reset: + logger.warning('deleting measurements: %s:%s', db, measurement) client.delete_series(database=db, measurement=measurement) # TODO need to take schema here... @@ -79,14 +83,18 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str= from more_itertools import chunked # "The optimal batch size is 5000 lines of line protocol." # some chunking is def necessary, otherwise it fails + inserted = 0 for chi in chunked(dit(), n=5000): chl = list(chi) + inserted += len(chl) logger.debug('writing next chunk %s', chl[-1]) client.write_points(chl, database=db) + + logger.info('inserted %d points', inserted) # todo "Specify timestamp precision when writing to InfluxDB."? -def magic_fill(it, *, name: Optional[str]=None) -> None: +def magic_fill(it, *, name: Optional[str]=None, reset: bool=RESET_DEFAULT) -> None: if name is None: assert callable(it) # generators have no name/module name = f'{it.__module__}:{it.__name__}' @@ -112,4 +120,31 @@ def magic_fill(it, *, name: Optional[str]=None) -> None: dtex = RuntimeError(f'expected single datetime field. schema: {schema}') dtf = one((f for f, t in schema.items() if t == datetime), too_short=dtex, too_long=dtex) - fill(it, measurement=name, reset=True, dt_col=dtf) + fill(it, measurement=name, reset=reset, dt_col=dtf) + + +import click + +@click.group() +def main() -> None: + pass + + +@main.command(name='populate', short_help='populate influxdb') +@click.option('--reset', is_flag=True, help='Reset Influx measurements before inserting', show_default=True) +@click.argument('MODULE', type=str, required=True) +def populate(module: str, reset: bool) -> None: + from .stats import guess_data_providers + providers = guess_data_providers(module) + # meh.. encapsulate in guess_data_providers? + if 'inputs' in providers: + del providers['inputs'] + # todo could do interactive thing? same way as in hpi query + [(k, f)] = providers.items() + magic_fill(f, reset=reset) + + +# todo later just add to hpi main? +# not sure if want to couple +if __name__ == '__main__': + main() diff --git a/my/core/stats.py b/my/core/stats.py index b54a3b9..dfa68e5 100644 --- a/my/core/stats.py +++ b/my/core/stats.py @@ -6,7 +6,7 @@ import importlib import inspect import sys import typing -from typing import Optional, Callable, Any, Iterator +from typing import Optional, Callable, Any, Iterator, Sequence, Dict from .common import StatsFun, Stats, stat @@ -14,16 +14,22 @@ from .common import StatsFun, Stats, stat # TODO maybe could be enough to annotate OUTPUTS or something like that? # then stats could just use them as hints? def guess_stats(module_name: str) -> Optional[StatsFun]: - module = importlib.import_module(module_name) - mfunctions = inspect.getmembers(module, inspect.isfunction) - functions = {k: v for k, v in mfunctions if is_data_provider(v)} - if len(functions) == 0: + providers = guess_data_providers(module_name) + if len(providers) == 0: return None def auto_stats() -> Stats: - return {k: stat(v) for k, v in functions.items()} + return {k: stat(v) for k, v in providers.items()} return auto_stats +def guess_data_providers(module_name: str) -> Dict[str, Callable]: + module = importlib.import_module(module_name) + mfunctions = inspect.getmembers(module, inspect.isfunction) + return {k: v for k, v in mfunctions if is_data_provider(v)} + + +# todo how to exclude deprecated stuff? +# todo also exclude def inputs()? def is_data_provider(fun: Any) -> bool: """ 1. returns iterable or something like that diff --git a/my/hypothesis.py b/my/hypothesis.py index 1241534..370854a 100644 --- a/my/hypothesis.py +++ b/my/hypothesis.py @@ -53,6 +53,7 @@ def _dal() -> dal.DAL: return dal.DAL(sources) +# TODO they are in reverse chronological order... def highlights() -> List[Res[Highlight]]: # todo hmm. otherwise mypy complans key: Callable[[Highlight], datetime] = lambda h: h.created