From bfec6b975f25bf689100299b43794ec0f1bcf9ae Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sun, 21 Feb 2021 20:59:03 +0000 Subject: [PATCH] influxdb: add helper to core + use it in bluemaestro/lastfm/rescuetime --- my/bluemaestro.py | 28 +----------- my/core/influxdb.py | 66 ++++++++++++++++++++++++++++ my/{lastfm/__init__.py => lastfm.py} | 25 ++++++++--- my/lastfm/fill_influxdb.py | 17 ------- my/rescuetime.py | 34 +++----------- 5 files changed, 95 insertions(+), 75 deletions(-) create mode 100644 my/core/influxdb.py rename my/{lastfm/__init__.py => lastfm.py} (75%) delete mode 100755 my/lastfm/fill_influxdb.py diff --git a/my/bluemaestro.py b/my/bluemaestro.py index 5a6479f..97668c3 100755 --- a/my/bluemaestro.py +++ b/my/bluemaestro.py @@ -192,32 +192,8 @@ def dataframe() -> DataFrameT: def fill_influxdb() -> None: - from itertools import islice - from .core.common import asdict - - from influxdb import InfluxDBClient # type: ignore - client = InfluxDBClient() - db = 'db' - mname = __name__.replace('.', '_') - client.delete_series(database=db, measurement=mname) - def dissoc(d, k): - del d[k] - return d # meh - jsons = ({ - 'measurement': mname, - # todo maybe good idea to tags with database file/name? to inspect inconsistencies etc.. - # 'tags': {'activity': e.activity}, - 'time': e.dt.isoformat(), - 'fields': dissoc(asdict(e), 'dt'), - } for e in measurements()) - from more_itertools import chunked - # "The optimal batch size is 5000 lines of line protocol." - # some chunking is def necessary, otherwise it fails - for chunk in chunked(jsons, n=5000): - cl = list(chunk) - logger.debug('writing next chunk %s', cl[-1]) - client.write_points(cl, database=db) - # todo "Specify timestamp precision when writing to InfluxDB."? + from .core import influxdb + influxdb.fill(measurements(), measurement=__name__) def check() -> None: diff --git a/my/core/influxdb.py b/my/core/influxdb.py new file mode 100644 index 0000000..d6d8d72 --- /dev/null +++ b/my/core/influxdb.py @@ -0,0 +1,66 @@ +''' +TODO doesn't really belong to 'core' morally, but can think of moving out later +''' +from typing import Iterable, Any, Optional + + +from .common import LazyLogger, asdict, Json + + +logger = LazyLogger(__name__) + + +class config: + db = 'db' + + +def fill(it: Iterable[Any], *, measurement: str, reset: bool=False) -> None: + # todo infer dt column automatically, reuse in stat? + # it doesn't like dots, ends up some syntax error? + measurement = measurement.replace('.', '_') + # todo autoinfer measurement? + + db = config.db + + from influxdb import InfluxDBClient # type: ignore + client = InfluxDBClient() + # todo maybe create if not exists? + # client.create_database(db) + + # todo should be it be env variable? + if reset: + client.delete_series(database=db, measurement=measurement) + + def dit() -> Iterable[Json]: + for i in it: + d = asdict(i) + tags: Optional[Json] = None + tags = d.get('tags') # meh... handle in a more robust manner + if tags is not None: + del d['tags'] + + # TODO what to do with exceptions?? + # todo handle errors.. not sure how? maybe add tag for 'error' and fill with emtpy data? + dt = d['dt'].isoformat() + del d['dt'] + fields = d + yield dict( + measurement=measurement, + # TODO maybe good idea to tag with database file/name? to inspect inconsistencies etc.. + # hmm, so tags are autoindexed and might be faster? + # not sure what's the big difference though + # "fields are data and tags are metadata" + tags=tags, + time=dt, + fields=d, + ) + + + from more_itertools import chunked + # "The optimal batch size is 5000 lines of line protocol." + # some chunking is def necessary, otherwise it fails + for chi in chunked(dit(), n=5000): + chl = list(chi) + logger.debug('writing next chunk %s', chl[-1]) + client.write_points(chl, database=db) + # todo "Specify timestamp precision when writing to InfluxDB."? diff --git a/my/lastfm/__init__.py b/my/lastfm.py similarity index 75% rename from my/lastfm/__init__.py rename to my/lastfm.py index 0a19152..51ccb7e 100755 --- a/my/lastfm/__init__.py +++ b/my/lastfm.py @@ -2,8 +2,7 @@ Last.fm scrobbles ''' -from ..core.common import Paths -from dataclasses import dataclass +from .core import Paths, dataclass from my.config import lastfm as user_config @dataclass @@ -14,7 +13,7 @@ class lastfm(user_config): export_path: Paths -from ..core.cfg import make_config +from .core.cfg import make_config config = make_config(lastfm) @@ -25,7 +24,8 @@ from typing import NamedTuple, Any, Sequence, Iterable import pytz -from ..core.common import mcachew, Json, get_files +from .core.common import mcachew, Json, get_files + def inputs() -> Sequence[Path]: return get_files(config.export_path) @@ -63,10 +63,25 @@ class Scrobble(NamedTuple): # TODO could also be nice to make generic? maybe even depending on eagerness -@mcachew(hashf=lambda: inputs()) +@mcachew(depends_on=inputs) def scrobbles() -> Iterable[Scrobble]: last = max(inputs()) j = json.loads(last.read_text()) for raw in reversed(j): yield Scrobble(raw=raw) + + +from .core import stat, Stats +def stats() -> Stats: + return stat(scrobbles) + + +def fill_influxdb() -> None: + from .core import influxdb + # todo needs to be more automatic + sd = (dict( + dt=x.dt, + track=x.track, + ) for x in scrobbles()) + influxdb.fill(sd, measurement=__name__) diff --git a/my/lastfm/fill_influxdb.py b/my/lastfm/fill_influxdb.py deleted file mode 100755 index 7754760..0000000 --- a/my/lastfm/fill_influxdb.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -# pip install influxdb -from influxdb import InfluxDBClient # type: ignore -from my.lastfm import scrobbles - - -def main() -> None: - scrobbles = scrobbles() - client = InfluxDBClient() - # TODO client.create_database('lastfm') - - jsons = [{"measurement": 'scrobble', "tags": {}, "time": str(sc.dt), "fields": {"name": sc.track}} for sc in scrobbles] - client.write_points(jsons, database='lastfm') - - -if __name__ == '__main__': - main() diff --git a/my/rescuetime.py b/my/rescuetime.py index c338b1b..03547d9 100644 --- a/my/rescuetime.py +++ b/my/rescuetime.py @@ -79,34 +79,14 @@ def fake_data(rows: int=1000) -> Iterator[None]: # todo would be kinda nice if doctor could run against the fake data, to have a basic health check of the module? -# todo not sure if I want to keep these here? vvv -# guess should move to core? or to 'ext' module, i.e. interfaces? -# make automatic def fill_influxdb() -> None: - from .core.common import asdict - - from influxdb import InfluxDBClient # type: ignore - client = InfluxDBClient() - db = 'db' - measurement = __name__.replace('.', '_') - client.delete_series(database=db, measurement=measurement) - # client.drop_database(db) - # todo create if not exists? - # client.create_database(db) - # todo handle errors.. not sure how? maybe add tag for 'error' and fill with emtpy data? - vit = (e for e in entries() if isinstance(e, Entry)) - jsons = ({ - 'measurement': measurement, # hmm, influx doesn't like dots? - # hmm, so tags are autoindexed and might be faster? - # not sure what's the big difference though - # "fields are data and tags are metadata" - 'tags': {'activity': e.activity}, - 'time': e.dt.isoformat(), - 'fields': {'duration_s': e.duration_s}, - # todo asdict(e), - } for e in vit) - # todo do we need to batch? - client.write_points(jsons, database=db) + from .core import influxdb + it = (dict( + dt=e.dt, + duration_d=e.duration_s, + tags=dict(activity=e.activity), + ) for e in entries() if isinstance(e, Entry)) # TODO handle errors in core.influxdb + influxdb.fill(it, measurement=__name__) # TODO lots of garbage in dir()? maybe need to del the imports...