general: add 'destructive parsing' (kinda what we were doing in my.core.konsume) to my.experimental
also some cleanup for my.codeforces and my.topcoder
This commit is contained in:
parent
1e1e8d8494
commit
1317914bff
4 changed files with 183 additions and 97 deletions
138
my/codeforces.py
138
my/codeforces.py
|
@ -1,86 +1,80 @@
|
||||||
from my.config import codeforces as config # type: ignore[attr-defined]
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
import json
|
import json
|
||||||
from typing import NamedTuple, Dict, Iterator
|
from pathlib import Path
|
||||||
|
from typing import Dict, Iterator, Sequence
|
||||||
|
|
||||||
|
from my.core import get_files, Res, datetime_aware
|
||||||
|
from my.core.common import assert_never
|
||||||
|
|
||||||
|
from my.config import codeforces as config # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
|
||||||
from my.core import get_files, Res
|
def inputs() -> Sequence[Path]:
|
||||||
from my.core.konsume import ignore, wrap
|
return get_files(config.export_path)
|
||||||
|
|
||||||
|
|
||||||
Cid = int
|
ContestId = int
|
||||||
|
|
||||||
class Contest(NamedTuple):
|
|
||||||
cid: Cid
|
|
||||||
when: datetime
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def make(cls, j) -> 'Contest':
|
|
||||||
return cls(
|
|
||||||
cid=j['id'],
|
|
||||||
when=datetime.fromtimestamp(j['startTimeSeconds'], tz=timezone.utc),
|
|
||||||
)
|
|
||||||
|
|
||||||
Cmap = Dict[Cid, Contest]
|
|
||||||
|
|
||||||
|
|
||||||
def get_contests() -> Cmap:
|
@dataclass
|
||||||
last = max(get_files(config.export_path, 'allcontests*.json'))
|
class Contest:
|
||||||
j = json.loads(last.read_text())
|
contest_id: ContestId
|
||||||
d = {}
|
when: datetime_aware
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Competition:
|
||||||
|
contest: Contest
|
||||||
|
old_rating: int
|
||||||
|
new_rating: int
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def when(self) -> datetime_aware:
|
||||||
|
return self.contest.when
|
||||||
|
|
||||||
|
|
||||||
|
# todo not sure if parser is the best name? hmm
|
||||||
|
class Parser:
|
||||||
|
def __init__(self, *, inputs: Sequence[Path]) -> None:
|
||||||
|
self.inputs = inputs
|
||||||
|
self.contests: Dict[ContestId, Contest] = {}
|
||||||
|
|
||||||
|
def _parse_allcontests(self, p: Path) -> Iterator[Contest]:
|
||||||
|
j = json.loads(p.read_text())
|
||||||
for c in j['result']:
|
for c in j['result']:
|
||||||
cc = Contest.make(c)
|
yield Contest(
|
||||||
d[cc.cid] = cc
|
contest_id=c['id'],
|
||||||
return d
|
when=datetime.fromtimestamp(c['startTimeSeconds'], tz=timezone.utc),
|
||||||
|
name=c['name'],
|
||||||
|
|
||||||
class Competition(NamedTuple):
|
|
||||||
contest_id: Cid
|
|
||||||
contest: str
|
|
||||||
cmap: Cmap
|
|
||||||
|
|
||||||
@cached_property
|
|
||||||
def uid(self) -> Cid:
|
|
||||||
return self.contest_id
|
|
||||||
|
|
||||||
def __hash__(self):
|
|
||||||
return hash(self.contest_id)
|
|
||||||
|
|
||||||
@cached_property
|
|
||||||
def when(self) -> datetime:
|
|
||||||
return self.cmap[self.uid].when
|
|
||||||
|
|
||||||
@cached_property
|
|
||||||
def summary(self) -> str:
|
|
||||||
return f'participated in {self.contest}' # TODO
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def make(cls, cmap, json) -> Iterator[Res['Competition']]:
|
|
||||||
# TODO try here??
|
|
||||||
contest_id = json['contestId'].zoom().value
|
|
||||||
contest = json['contestName'].zoom().value
|
|
||||||
yield cls(
|
|
||||||
contest_id=contest_id,
|
|
||||||
contest=contest,
|
|
||||||
cmap=cmap,
|
|
||||||
)
|
)
|
||||||
# TODO ytry???
|
|
||||||
ignore(json, 'rank', 'oldRating', 'newRating')
|
def _parse_competitions(self, p: Path) -> Iterator[Competition]:
|
||||||
|
j = json.loads(p.read_text())
|
||||||
|
for c in j['result']:
|
||||||
|
contest_id = c['contestId']
|
||||||
|
contest = self.contests[contest_id]
|
||||||
|
yield Competition(
|
||||||
|
contest=contest,
|
||||||
|
old_rating=c['oldRating'],
|
||||||
|
new_rating=c['newRating'],
|
||||||
|
)
|
||||||
|
|
||||||
|
def parse(self) -> Iterator[Res[Competition]]:
|
||||||
|
for path in inputs():
|
||||||
|
if 'allcontests' in path.name:
|
||||||
|
# these contain information about all CF contests along with useful metadata
|
||||||
|
for contest in self._parse_allcontests(path):
|
||||||
|
# TODO some method to assert on mismatch if it exists? not sure
|
||||||
|
self.contests[contest.contest_id] = contest
|
||||||
|
elif 'codeforces' in path.name:
|
||||||
|
# these contain only contests the user participated in
|
||||||
|
yield from self._parse_competitions(path)
|
||||||
|
else:
|
||||||
|
raise RuntimeError("shouldn't happen") # TODO switch to compat.assert_never
|
||||||
|
|
||||||
|
|
||||||
def data() -> Iterator[Res[Competition]]:
|
def data() -> Iterator[Res[Competition]]:
|
||||||
cmap = get_contests()
|
return Parser(inputs=inputs()).parse()
|
||||||
last = max(get_files(config.export_path, 'codeforces*.json'))
|
|
||||||
|
|
||||||
with wrap(json.loads(last.read_text())) as j:
|
|
||||||
j['status'].ignore() # type: ignore[index]
|
|
||||||
res = j['result'].zoom() # type: ignore[index]
|
|
||||||
|
|
||||||
for c in list(res): # TODO maybe we want 'iter' method??
|
|
||||||
ignore(c, 'handle', 'ratingUpdateTimeSeconds')
|
|
||||||
yield from Competition.make(cmap=cmap, json=c)
|
|
||||||
c.consume()
|
|
||||||
# TODO maybe if they are all empty, no need to consume??
|
|
||||||
|
|
|
@ -209,3 +209,34 @@ def test_zoom() -> None:
|
||||||
|
|
||||||
|
|
||||||
# TODO type check this...
|
# TODO type check this...
|
||||||
|
|
||||||
|
# TODO feels like the whole thing kind of unnecessarily complex
|
||||||
|
# - cons:
|
||||||
|
# - in most cases this is not even needed? who cares if we miss a few attributes?
|
||||||
|
# - pro: on the other hand it could be interesting to know about new attributes in data,
|
||||||
|
# and without this kind of processing we wouldn't even know
|
||||||
|
# alternatives
|
||||||
|
# - manually process data
|
||||||
|
# e.g. use asserts, dict.pop and dict.values() methods to unpack things
|
||||||
|
# - pros:
|
||||||
|
# - very simple, since uses built in syntax
|
||||||
|
# - very performant, as fast as it gets
|
||||||
|
# - very flexible, easy to adjust behaviour
|
||||||
|
# - cons:
|
||||||
|
# - can forget to assert about extra entities etc, so error prone
|
||||||
|
# - if we do something like =assert j.pop('status') == 200, j=, by the time assert happens we already popped item -- makes erro handling harder
|
||||||
|
# - a bit verbose.. so probably requires some helper functions though (could be much leaner than current konsume though)
|
||||||
|
# - if we assert, then terminates parsing too early, if we're defensive then inflates the code a lot with if statements
|
||||||
|
# - TODO perhaps combine warnings somehow or at least only emit once per module?
|
||||||
|
# - hmm actually tbh if we carefully go through everything and don't make copies, then only requires one assert at the very end?
|
||||||
|
# - TODO this is kinda useful? https://discuss.python.org/t/syntax-for-dictionnary-unpacking-to-variables/18718
|
||||||
|
# operator.itemgetter?
|
||||||
|
# - TODO can use match operator in python for this? quite nice actually! and allows for dynamic behaviour
|
||||||
|
# only from 3.10 tho, and gonna be tricky to do dynamic defensive behaviour with this
|
||||||
|
# - TODO in a sense, blenser already would hint if some meaningful fields aren't being processed? only if they are changing though
|
||||||
|
# - define a "schema" for data, then just recursively match data against the schema?
|
||||||
|
# possibly pydantic already does something like that? not sure about performance though
|
||||||
|
# pros:
|
||||||
|
# - much simpler to extend and understand what's going on
|
||||||
|
# cons:
|
||||||
|
# - more rigid, so it becomes tricky to do dynamic stuff (e.g. if schema actually changes)
|
||||||
|
|
60
my/experimental/destructive_parsing.py
Normal file
60
my/experimental/destructive_parsing.py
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Iterator, List, Tuple
|
||||||
|
|
||||||
|
from my.core import assert_never
|
||||||
|
from my.core.compat import NoneType
|
||||||
|
|
||||||
|
|
||||||
|
# TODO Popper? not sure
|
||||||
|
@dataclass
|
||||||
|
class Helper:
|
||||||
|
manager: 'Manager'
|
||||||
|
item: Any # todo realistically, list or dict? could at least type as indexable or something
|
||||||
|
path: Tuple[str, ...]
|
||||||
|
|
||||||
|
def pop_if_primitive(self, *keys: str) -> None:
|
||||||
|
"""
|
||||||
|
The idea that primitive TODO
|
||||||
|
"""
|
||||||
|
item = self.item
|
||||||
|
for k in keys:
|
||||||
|
v = item[k]
|
||||||
|
if isinstance(v, (str, bool, float, int, NoneType)):
|
||||||
|
item.pop(k) # todo kinda unfortunate to get dict item twice.. but not sure if can avoid?
|
||||||
|
|
||||||
|
def check(self, key: str, expected: Any) -> None:
|
||||||
|
actual = self.item.pop(key)
|
||||||
|
assert actual == expected, (key, actual, expected)
|
||||||
|
|
||||||
|
def zoom(self, key: str) -> 'Helper':
|
||||||
|
return self.manager.helper(item=self.item.pop(key), path=self.path + (key,))
|
||||||
|
|
||||||
|
|
||||||
|
def is_empty(x) -> bool:
|
||||||
|
if isinstance(x, dict):
|
||||||
|
return len(x) == 0
|
||||||
|
elif isinstance(x, list):
|
||||||
|
return all(map(is_empty, x))
|
||||||
|
else:
|
||||||
|
assert_never(x)
|
||||||
|
|
||||||
|
|
||||||
|
class Manager:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.helpers: List[Helper] = []
|
||||||
|
|
||||||
|
def helper(self, item: Any, *, path: Tuple[str, ...] = ()) -> Helper:
|
||||||
|
res = Helper(manager=self, item=item, path=path)
|
||||||
|
self.helpers.append(res)
|
||||||
|
return res
|
||||||
|
|
||||||
|
def check(self) -> Iterator[Exception]:
|
||||||
|
remaining = []
|
||||||
|
for h in self.helpers:
|
||||||
|
# TODO recursively check it's primitive?
|
||||||
|
if is_empty(h.item):
|
||||||
|
continue
|
||||||
|
remaining.append((h.path, h.item))
|
||||||
|
if len(remaining) == 0:
|
||||||
|
return
|
||||||
|
yield RuntimeError(f'Unparsed items remaining: {remaining}')
|
|
@ -1,6 +1,3 @@
|
||||||
from my.config import topcoder as config # type: ignore[attr-defined]
|
|
||||||
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
import json
|
import json
|
||||||
|
@ -8,7 +5,10 @@ from pathlib import Path
|
||||||
from typing import Iterator, Sequence
|
from typing import Iterator, Sequence
|
||||||
|
|
||||||
from my.core import get_files, Res, datetime_aware
|
from my.core import get_files, Res, datetime_aware
|
||||||
from my.core.compat import fromisoformat, NoneType
|
from my.core.compat import fromisoformat
|
||||||
|
from my.experimental.destructive_parsing import Manager
|
||||||
|
|
||||||
|
from my.config import topcoder as config # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
|
||||||
def inputs() -> Sequence[Path]:
|
def inputs() -> Sequence[Path]:
|
||||||
|
@ -30,10 +30,6 @@ class Competition:
|
||||||
def when(self) -> datetime_aware:
|
def when(self) -> datetime_aware:
|
||||||
return fromisoformat(self.date_str)
|
return fromisoformat(self.date_str)
|
||||||
|
|
||||||
@cached_property
|
|
||||||
def summary(self) -> str:
|
|
||||||
return f'participated in {self.contest}: {self.percentile:.0f}'
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def make(cls, j) -> Iterator[Res['Competition']]:
|
def make(cls, j) -> Iterator[Res['Competition']]:
|
||||||
assert isinstance(j.pop('rating'), float)
|
assert isinstance(j.pop('rating'), float)
|
||||||
|
@ -53,38 +49,43 @@ class Competition:
|
||||||
|
|
||||||
|
|
||||||
def _parse_one(p: Path) -> Iterator[Res[Competition]]:
|
def _parse_one(p: Path) -> Iterator[Res[Competition]]:
|
||||||
j = json.loads(p.read_text())
|
d = json.loads(p.read_text())
|
||||||
|
|
||||||
# this is kind of an experiment to parse it exhaustively, making sure we don't miss any data
|
# TODO manager should be a context manager?
|
||||||
assert isinstance(j.pop('version'), str)
|
m = Manager()
|
||||||
assert isinstance(j.pop('id'), str)
|
|
||||||
[j] = j.values() # zoom in
|
|
||||||
|
|
||||||
assert j.pop('success') is True, j
|
h = m.helper(d)
|
||||||
assert j.pop('status') == 200, j
|
h.pop_if_primitive('version', 'id')
|
||||||
assert j.pop('metadata') is None, j
|
|
||||||
[j] = j.values() # zoom in
|
|
||||||
|
|
||||||
# todo hmm, potentially error handling could be nicer since .pop just reports key error
|
h = h.zoom('result')
|
||||||
# also by the time error is reported, key is already removed?
|
h.check('success', True)
|
||||||
for k in ['handle', 'handleLower', 'userId', 'createdAt', 'updatedAt', 'createdBy', 'updatedBy']:
|
h.check('status', 200)
|
||||||
# check it's primitive
|
h.pop_if_primitive('metadata')
|
||||||
assert isinstance(j.pop(k), (str, bool, float, int, NoneType)), k
|
|
||||||
|
|
||||||
j.pop('DEVELOP') # TODO how to handle it?
|
h = h.zoom('content')
|
||||||
[j] = j.values() # zoom in, DATA_SCIENCE section
|
h.pop_if_primitive('handle', 'handleLower', 'userId', 'createdAt', 'updatedAt', 'createdBy', 'updatedBy')
|
||||||
|
|
||||||
mm = j.pop('MARATHON_MATCH')
|
# NOTE at the moment it's empty for me, but it will result in an error later if there is some data here
|
||||||
[mm] = mm.values() # zoom into historu
|
h.zoom('DEVELOP').zoom('subTracks')
|
||||||
|
|
||||||
srm = j.pop('SRM')
|
h = h.zoom('DATA_SCIENCE')
|
||||||
[srm] = srm.values() # zoom into history
|
# TODO multi zoom? not sure which axis, e.g.
|
||||||
|
# zoom('SRM', 'history') or zoom('SRM', 'MARATHON_MATCH')
|
||||||
|
# or zoom(('SRM', 'history'), ('MARATHON_MATCH', 'history'))
|
||||||
|
srms = h.zoom('SRM').zoom('history')
|
||||||
|
mms = h.zoom('MARATHON_MATCH').zoom('history')
|
||||||
|
|
||||||
assert len(j) == 0, j
|
for c in srms.item + mms.item:
|
||||||
|
# NOTE: so here we are actually just using pure dicts in .make method
|
||||||
for c in mm + srm:
|
# this is kinda ok since it will be checked by parent Helper
|
||||||
|
# but also expects cooperation from .make method (e.g. popping items from the dict)
|
||||||
|
# could also wrap in helper and pass to .make .. not sure
|
||||||
|
# an argument could be made that .make isn't really a class methond..
|
||||||
|
# it's pretty specific to this parser onl
|
||||||
yield from Competition.make(j=c)
|
yield from Competition.make(j=c)
|
||||||
|
|
||||||
|
yield from m.check()
|
||||||
|
|
||||||
|
|
||||||
def data() -> Iterator[Res[Competition]]:
|
def data() -> Iterator[Res[Competition]]:
|
||||||
*_, last = inputs()
|
*_, last = inputs()
|
||||||
|
|
Loading…
Add table
Reference in a new issue