my.emfit: cleanup and pass cpu pool
This commit is contained in:
parent
32aa87b3ec
commit
fb2b3e07de
1 changed files with 44 additions and 24 deletions
|
@ -8,22 +8,30 @@ REQUIRES = [
|
||||||
'git+https://github.com/karlicoss/emfitexport',
|
'git+https://github.com/karlicoss/emfitexport',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
import dataclasses
|
||||||
|
from datetime import datetime, time, timedelta
|
||||||
|
import inspect
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Iterable, Any, Optional
|
from typing import Any, Dict, Iterable, Iterator, List, Optional
|
||||||
|
|
||||||
from ..core import get_files
|
from my.core import (
|
||||||
from ..core.common import mcachew
|
get_files,
|
||||||
from ..core.cachew import cache_dir
|
stat,
|
||||||
from ..core.error import Res, set_error_datetime, extract_error_datetime
|
Res,
|
||||||
from ..core.pandas import DataFrameT
|
Stats,
|
||||||
|
)
|
||||||
|
from my.core.common import mcachew
|
||||||
|
from my.core.cachew import cache_dir
|
||||||
|
from my.core.error import set_error_datetime, extract_error_datetime
|
||||||
|
from my.core.pandas import DataFrameT
|
||||||
|
|
||||||
from my.config import emfit as config
|
from my.config import emfit as config
|
||||||
|
|
||||||
|
|
||||||
import emfitexport.dal as dal
|
import emfitexport.dal as dal
|
||||||
# todo ugh. need to make up my mind on log vs logger naming... I guessl ogger makes more sense
|
|
||||||
logger = dal.log
|
|
||||||
Emfit = dal.Emfit
|
Emfit = dal.Emfit
|
||||||
|
|
||||||
|
|
||||||
# TODO move to common?
|
# TODO move to common?
|
||||||
|
@ -39,13 +47,22 @@ def _cachew_depends_on():
|
||||||
# TODO take __file__ into account somehow?
|
# TODO take __file__ into account somehow?
|
||||||
@mcachew(cache_path=cache_dir() / 'emfit.cache', depends_on=_cachew_depends_on)
|
@mcachew(cache_path=cache_dir() / 'emfit.cache', depends_on=_cachew_depends_on)
|
||||||
def datas() -> Iterable[Res[Emfit]]:
|
def datas() -> Iterable[Res[Emfit]]:
|
||||||
import dataclasses
|
|
||||||
|
|
||||||
# data from emfit is coming in UTC. There is no way (I think?) to know the 'real' timezone, and local times matter more for sleep analysis
|
# data from emfit is coming in UTC. There is no way (I think?) to know the 'real' timezone, and local times matter more for sleep analysis
|
||||||
# TODO actually this is wrong?? check this..
|
# TODO actually this is wrong?? there is some sort of local offset in the export
|
||||||
emfit_tz = config.timezone
|
emfit_tz = config.timezone
|
||||||
|
|
||||||
for x in dal.sleeps(config.export_path):
|
## backwards compatibility (old DAL didn't have cpu_pool argument)
|
||||||
|
cpu_pool_arg = 'cpu_pool'
|
||||||
|
pass_cpu_pool = cpu_pool_arg in inspect.signature(dal.sleeps).parameters
|
||||||
|
if pass_cpu_pool:
|
||||||
|
from my.core._cpu_pool import get_cpu_pool
|
||||||
|
|
||||||
|
kwargs = {cpu_pool_arg: get_cpu_pool()}
|
||||||
|
else:
|
||||||
|
kwargs = {}
|
||||||
|
##
|
||||||
|
|
||||||
|
for x in dal.sleeps(config.export_path, **kwargs):
|
||||||
if isinstance(x, Exception):
|
if isinstance(x, Exception):
|
||||||
yield x
|
yield x
|
||||||
else:
|
else:
|
||||||
|
@ -54,6 +71,7 @@ def datas() -> Iterable[Res[Emfit]]:
|
||||||
continue
|
continue
|
||||||
# TODO maybe have a helper to 'patch up' all dattetimes in a namedtuple/dataclass?
|
# TODO maybe have a helper to 'patch up' all dattetimes in a namedtuple/dataclass?
|
||||||
# TODO do the same for jawbone data?
|
# TODO do the same for jawbone data?
|
||||||
|
# fmt: off
|
||||||
x = dataclasses.replace(
|
x = dataclasses.replace(
|
||||||
x,
|
x,
|
||||||
start =x.start .astimezone(emfit_tz),
|
start =x.start .astimezone(emfit_tz),
|
||||||
|
@ -61,6 +79,7 @@ def datas() -> Iterable[Res[Emfit]]:
|
||||||
sleep_start=x.sleep_start.astimezone(emfit_tz),
|
sleep_start=x.sleep_start.astimezone(emfit_tz),
|
||||||
sleep_end =x.sleep_end .astimezone(emfit_tz),
|
sleep_end =x.sleep_end .astimezone(emfit_tz),
|
||||||
)
|
)
|
||||||
|
# fmt: on
|
||||||
yield x
|
yield x
|
||||||
|
|
||||||
|
|
||||||
|
@ -78,7 +97,7 @@ def pre_dataframe() -> Iterable[Res[Emfit]]:
|
||||||
yield r
|
yield r
|
||||||
else:
|
else:
|
||||||
err = RuntimeError(f'Multiple sleeps per night, not supported yet: {g}')
|
err = RuntimeError(f'Multiple sleeps per night, not supported yet: {g}')
|
||||||
set_error_datetime(err, dt=g[0].date)
|
set_error_datetime(err, dt=datetime.combine(g[0].date, time.min))
|
||||||
g.clear()
|
g.clear()
|
||||||
yield err
|
yield err
|
||||||
|
|
||||||
|
@ -94,7 +113,6 @@ def pre_dataframe() -> Iterable[Res[Emfit]]:
|
||||||
|
|
||||||
|
|
||||||
def dataframe() -> DataFrameT:
|
def dataframe() -> DataFrameT:
|
||||||
from datetime import timedelta
|
|
||||||
dicts: List[Dict[str, Any]] = []
|
dicts: List[Dict[str, Any]] = []
|
||||||
last: Optional[Emfit] = None
|
last: Optional[Emfit] = None
|
||||||
for s in pre_dataframe():
|
for s in pre_dataframe():
|
||||||
|
@ -102,7 +120,7 @@ def dataframe() -> DataFrameT:
|
||||||
if isinstance(s, Exception):
|
if isinstance(s, Exception):
|
||||||
edt = extract_error_datetime(s)
|
edt = extract_error_datetime(s)
|
||||||
d = {
|
d = {
|
||||||
'date' : edt,
|
'date': edt,
|
||||||
'error': str(s),
|
'error': str(s),
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
|
@ -117,6 +135,7 @@ def dataframe() -> DataFrameT:
|
||||||
|
|
||||||
# todo ugh. get rid of hardcoding, just generate the schema automatically
|
# todo ugh. get rid of hardcoding, just generate the schema automatically
|
||||||
# TODO use 'workdays' provider....
|
# TODO use 'workdays' provider....
|
||||||
|
# fmt: off
|
||||||
d = {
|
d = {
|
||||||
'date' : dd,
|
'date' : dd,
|
||||||
|
|
||||||
|
@ -133,25 +152,24 @@ def dataframe() -> DataFrameT:
|
||||||
'hrv_change' : hrv_change,
|
'hrv_change' : hrv_change,
|
||||||
'respiratory_rate_avg': s.respiratory_rate_avg,
|
'respiratory_rate_avg': s.respiratory_rate_avg,
|
||||||
}
|
}
|
||||||
last = s # meh
|
# fmt: on
|
||||||
|
last = s # meh
|
||||||
dicts.append(d)
|
dicts.append(d)
|
||||||
|
|
||||||
|
|
||||||
import pandas
|
import pandas
|
||||||
|
|
||||||
return pandas.DataFrame(dicts)
|
return pandas.DataFrame(dicts)
|
||||||
|
|
||||||
|
|
||||||
from ..core import stat, Stats
|
|
||||||
def stats() -> Stats:
|
def stats() -> Stats:
|
||||||
return stat(pre_dataframe)
|
return stat(pre_dataframe)
|
||||||
|
|
||||||
|
|
||||||
from contextlib import contextmanager
|
|
||||||
from typing import Iterator
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def fake_data(nights: int=500) -> Iterator:
|
def fake_data(nights: int = 500) -> Iterator:
|
||||||
from my.core.cfg import tmp_config
|
from my.core.cfg import tmp_config
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
with TemporaryDirectory() as td:
|
with TemporaryDirectory() as td:
|
||||||
tdir = Path(td)
|
tdir = Path(td)
|
||||||
gen = dal.FakeData()
|
gen = dal.FakeData()
|
||||||
|
@ -168,5 +186,7 @@ def fake_data(nights: int=500) -> Iterator:
|
||||||
# TODO remove/deprecate it? I think used by timeline
|
# TODO remove/deprecate it? I think used by timeline
|
||||||
def get_datas() -> List[Emfit]:
|
def get_datas() -> List[Emfit]:
|
||||||
# todo ugh. run lint properly
|
# todo ugh. run lint properly
|
||||||
return list(sorted(datas(), key=lambda e: e.start)) # type: ignore
|
return list(sorted(datas(), key=lambda e: e.start)) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
# TODO move away old entries if there is a diff??
|
# TODO move away old entries if there is a diff??
|
||||||
|
|
Loading…
Add table
Reference in a new issue