my.bluemaestro: minor cleanup

This commit is contained in:
Dima Gerasimov 2023-06-21 20:10:40 +01:00 committed by karlicoss
parent c25ab51664
commit 88a3aa8d67

View file

@ -4,13 +4,26 @@
"""
# todo most of it belongs to DAL... but considering so few people use it I didn't bother for now
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
import re
import sqlite3
from typing import Iterable, Sequence, Set, Optional
from my.core import get_files, make_logger, dataclass, Res
import pytz
from my.core import (
get_files,
make_logger,
Res,
stat,
Stats,
influxdb,
)
from my.core.common import mcachew
from my.core.error import unwrap
from my.core.pandas import DataFrameT, as_dataframe
from my.core.sqlite import sqlite_connect_immutable
from my.config import bluemaestro as config
@ -25,12 +38,13 @@ def inputs() -> Sequence[Path]:
Celsius = float
Percent = float
mBar = float
mBar = float
@dataclass
class Measurement:
dt: datetime # todo aware/naive
temp : Celsius
dt: datetime # todo aware/naive
temp: Celsius
humidity: Percent
pressure: mBar
dewpoint: Celsius
@ -38,7 +52,6 @@ class Measurement:
# fixme: later, rely on the timezone provider
# NOTE: the timezone should be set with respect to the export date!!!
import pytz
tz = pytz.timezone('Europe/London')
# TODO when I change tz, check the diff
@ -49,9 +62,7 @@ def is_bad_table(name: str) -> bool:
return False if delegate is None else delegate(name)
from my.core.cachew import cache_dir
from my.core.common import mcachew
@mcachew(depends_on=inputs, cache_path=cache_dir('bluemaestro'))
@mcachew(depends_on=inputs)
def measurements() -> Iterable[Res[Measurement]]:
# todo ideally this would be via arguments... but needs to be lazy
dbs = inputs()
@ -68,14 +79,16 @@ def measurements() -> Iterable[Res[Measurement]]:
with sqlite_connect_immutable(f) as db:
db_dt: Optional[datetime] = None
try:
datas = db.execute(f'SELECT "{f.name}" as name, Time, Temperature, Humidity, Pressure, Dewpoint FROM data ORDER BY log_index')
datas = db.execute(
f'SELECT "{f.name}" as name, Time, Temperature, Humidity, Pressure, Dewpoint FROM data ORDER BY log_index'
)
oldfmt = True
db_dts = list(db.execute('SELECT last_download FROM info'))[0][0]
if db_dts == 'N/A':
# ??? happens for 20180923-20180928
continue
if db_dts.endswith(':'):
db_dts += '00' # wtf.. happens on some day
db_dts += '00' # wtf.. happens on some day
db_dt = tz.localize(datetime.strptime(db_dts, '%Y-%m-%d %H:%M:%S'))
except sqlite3.OperationalError:
# Right, this looks really bad.
@ -113,7 +126,7 @@ def measurements() -> Iterable[Res[Measurement]]:
f'SELECT "{t}" AS name, unix, tempReadings / 10.0, humiReadings / 10.0, pressReadings / 10.0, dewpReadings / 10.0 FROM {t}'
for t in log_tables
)
if len(log_tables) > 0: # ugh. otherwise end up with syntax error..
if len(log_tables) > 0: # ugh. otherwise end up with syntax error..
query = f'SELECT * FROM ({query}) ORDER BY name, unix'
datas = db.execute(query)
oldfmt = False
@ -139,8 +152,8 @@ def measurements() -> Iterable[Res[Measurement]]:
## sanity checks (todo make defensive/configurable?)
# not sure how that happens.. but basically they'd better be excluded
lower = timedelta(days=6000 / 24) # ugh some time ago I only did it once in an hour.. in theory can detect from meta?
upper = timedelta(days=10) # kinda arbitrary
lower = timedelta(days=6000 / 24) # ugh some time ago I only did it once in an hour.. in theory can detect from meta?
upper = timedelta(days=10) # kinda arbitrary
if not (db_dt - lower < dt < db_dt + timedelta(days=10)):
# todo could be more defenive??
yield RuntimeError('timestamp too far out', f, name, db_dt, dt)
@ -178,12 +191,11 @@ def measurements() -> Iterable[Res[Measurement]]:
# for k, v in merged.items():
# yield Point(dt=k, temp=v) # meh?
from my.core import stat, Stats
def stats() -> Stats:
return stat(measurements)
from my.core.pandas import DataFrameT, as_dataframe
def dataframe() -> DataFrameT:
"""
%matplotlib gtk
@ -197,7 +209,6 @@ def dataframe() -> DataFrameT:
def fill_influxdb() -> None:
from my.core import influxdb
influxdb.fill(measurements(), measurement=__name__)
@ -205,7 +216,6 @@ def check() -> None:
temps = list(measurements())
latest = temps[:-2]
from my.core.error import unwrap
prev = unwrap(latest[-2]).dt
last = unwrap(latest[-1]).dt
@ -215,12 +225,12 @@ def check() -> None:
#
# TODO also needs to be filtered out on processing, should be rejected on the basis of export date?
POINTS_STORED = 6000 # on device?
POINTS_STORED = 6000 # on device?
FREQ_SEC = 60
SECS_STORED = POINTS_STORED * FREQ_SEC
HOURS_STORED = POINTS_STORED / (60 * 60 / FREQ_SEC) # around 4 days
HOURS_STORED = POINTS_STORED / (60 * 60 / FREQ_SEC) # around 4 days
NOW = datetime.now()
assert NOW - last < timedelta(hours=HOURS_STORED / 2), f'old backup! {last}'
assert last - prev < timedelta(minutes=3), f'bad interval! {last - prev}'
assert last - prev < timedelta(minutes=3), f'bad interval! {last - prev}'
single = (last - prev).seconds