core/influxdb: add main method to create influx measurement and fill with values

allows running something like

    python3 -m my.core.influxdb populate my.zotero
This commit is contained in:
Dima Gerasimov 2021-04-26 09:41:39 +01:00 committed by karlicoss
parent 0278f2b68d
commit 0517f7ffb8
4 changed files with 52 additions and 10 deletions

View file

@ -15,7 +15,10 @@ class config:
db = 'db'
def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str='dt') -> None:
RESET_DEFAULT = False
def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_col: str='dt') -> None:
# todo infer dt column automatically, reuse in stat?
# it doesn't like dots, ends up some syntax error?
measurement = measurement.replace('.', '_')
@ -30,6 +33,7 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str=
# todo should be it be env variable?
if reset:
logger.warning('deleting measurements: %s:%s', db, measurement)
client.delete_series(database=db, measurement=measurement)
# TODO need to take schema here...
@ -79,14 +83,18 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=False, dt_col: str=
from more_itertools import chunked
# "The optimal batch size is 5000 lines of line protocol."
# some chunking is def necessary, otherwise it fails
inserted = 0
for chi in chunked(dit(), n=5000):
chl = list(chi)
inserted += len(chl)
logger.debug('writing next chunk %s', chl[-1])
client.write_points(chl, database=db)
logger.info('inserted %d points', inserted)
# todo "Specify timestamp precision when writing to InfluxDB."?
def magic_fill(it, *, name: Optional[str]=None) -> None:
def magic_fill(it, *, name: Optional[str]=None, reset: bool=RESET_DEFAULT) -> None:
if name is None:
assert callable(it) # generators have no name/module
name = f'{it.__module__}:{it.__name__}'
@ -112,4 +120,31 @@ def magic_fill(it, *, name: Optional[str]=None) -> None:
dtex = RuntimeError(f'expected single datetime field. schema: {schema}')
dtf = one((f for f, t in schema.items() if t == datetime), too_short=dtex, too_long=dtex)
fill(it, measurement=name, reset=True, dt_col=dtf)
fill(it, measurement=name, reset=reset, dt_col=dtf)
import click
@click.group()
def main() -> None:
pass
@main.command(name='populate', short_help='populate influxdb')
@click.option('--reset', is_flag=True, help='Reset Influx measurements before inserting', show_default=True)
@click.argument('MODULE', type=str, required=True)
def populate(module: str, reset: bool) -> None:
from .stats import guess_data_providers
providers = guess_data_providers(module)
# meh.. encapsulate in guess_data_providers?
if 'inputs' in providers:
del providers['inputs']
# todo could do interactive thing? same way as in hpi query
[(k, f)] = providers.items()
magic_fill(f, reset=reset)
# todo later just add to hpi main?
# not sure if want to couple
if __name__ == '__main__':
main()