simplify provider

This commit is contained in:
Dima Gerasimov 2019-05-08 20:54:37 +01:00
parent 76814f7f06
commit 351528bb64
3 changed files with 30 additions and 18 deletions

9
ci.sh
View file

@ -1,9 +0,0 @@
#!/bin/bash
cd "$(this_dir)" || exit
. ~/bash_ci
ci_run python3.6 test.py
ci_report_errors

16
fill_influxdb.py Executable file
View file

@ -0,0 +1,16 @@
#!/usr/bin/env python3
from influxdb import InfluxDBClient # type: ignore
from my.lastfm import get_scrobbles
def main():
scrobbles = get_scrobbles()
client = InfluxDBClient()
# TODO client.create_database('lastfm')
jsons = [{"measurement": 'scrobble', "tags": {}, "time": str(sc.dt), "fields": {"name": sc.track}} for sc in scrobbles]
client.write_points(jsons, database='lastfm')
if __name__ == '__main__':
main()

23
lastfm/__init__.py Normal file → Executable file
View file

@ -1,12 +1,14 @@
#!/usr/bin/env python3
from functools import lru_cache from functools import lru_cache
from kython import listdir_abs, json_load, JSONType
from typing import Dict, List, NamedTuple from typing import Dict, List, NamedTuple
from pytz import UTC
from datetime import datetime from datetime import datetime
from pathlib import Path
import os import os
import json
import pytz
_PATH = "/L/backups/lastfm" _PATH = Path("/L/backups/lastfm")
class Scrobble(NamedTuple): class Scrobble(NamedTuple):
dt: datetime dt: datetime
@ -17,18 +19,21 @@ class Scrobble(NamedTuple):
# TODO watch out, if we keep the app running it might expire # TODO watch out, if we keep the app running it might expire
def _iter_scrobbles(): def _iter_scrobbles():
last = max(listdir_abs(_PATH)) last = max(_PATH.glob('*.json'))
# TODO mm, no timezone? wonder if it's UTC... # TODO mm, no timezone? wonder if it's UTC...
j: List[Dict[str, str]] j = json.loads(last.read_text())
with open(last, 'r') as fo:
j = json_load(fo)
for d in j: for d in j:
ts = int(d['date']) ts = int(d['date'])
dt = datetime.fromtimestamp(ts, tz=UTC) dt = datetime.fromtimestamp(ts, tz=pytz.utc)
track = f"{d['artist']}{d['name']}" track = f"{d['artist']}{d['name']}"
yield Scrobble(dt, track) yield Scrobble(dt, track)
@lru_cache() @lru_cache(1)
def get_scrobbles(): def get_scrobbles():
return list(_iter_scrobbles()) return list(_iter_scrobbles())
def test():
assert len(get_scrobbles()) > 1000