Add kython utils (kerror/konsume)
This commit is contained in:
parent
a0b9d2dc05
commit
3e6c670698
5 changed files with 274 additions and 2 deletions
1
my/kython/README
Normal file
1
my/kython/README
Normal file
|
@ -0,0 +1 @@
|
||||||
|
vendorized kython (https://github.com/karlicoss/kython) stuff
|
0
my/kython/__init__.py
Normal file
0
my/kython/__init__.py
Normal file
114
my/kython/kerror.py
Normal file
114
my/kython/kerror.py
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
from typing import Union, TypeVar, Iterator, Callable, Iterable, List, Tuple, Type
|
||||||
|
|
||||||
|
|
||||||
|
T = TypeVar('T')
|
||||||
|
E = TypeVar('E', bound=Exception)
|
||||||
|
|
||||||
|
ResT = Union[T, E]
|
||||||
|
|
||||||
|
Res = ResT[T, Exception]
|
||||||
|
|
||||||
|
|
||||||
|
# TODO make it a bit more typed??
|
||||||
|
def is_error(res: Res[T]) -> bool:
|
||||||
|
return isinstance(res, Exception)
|
||||||
|
|
||||||
|
|
||||||
|
def is_ok(res: Res[T]) -> bool:
|
||||||
|
return not is_error(res)
|
||||||
|
|
||||||
|
|
||||||
|
def unwrap(res: Res[T]) -> T:
|
||||||
|
if isinstance(res, Exception):
|
||||||
|
raise res
|
||||||
|
else:
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def split_errors(l: Iterable[ResT[T, E]], ET=Exception) -> Tuple[List[T], List[E]]:
|
||||||
|
rl: List[T] = []
|
||||||
|
el: List[E] = []
|
||||||
|
for x in l:
|
||||||
|
if isinstance(x, ET):
|
||||||
|
el.append(x)
|
||||||
|
else:
|
||||||
|
rl.append(x) # type: ignore
|
||||||
|
return rl, el
|
||||||
|
|
||||||
|
|
||||||
|
def ytry(cb) -> Iterator[Exception]:
|
||||||
|
try:
|
||||||
|
cb()
|
||||||
|
except Exception as e:
|
||||||
|
yield e
|
||||||
|
|
||||||
|
|
||||||
|
# TODO experimental, not sure if I like it
|
||||||
|
def echain(ex: E, cause: Exception) -> E:
|
||||||
|
ex.__cause__ = cause
|
||||||
|
# TODO assert cause is none?
|
||||||
|
# TODO copy??
|
||||||
|
return ex
|
||||||
|
# try:
|
||||||
|
# # TODO is there a awy to get around raise from?
|
||||||
|
# raise ex from cause
|
||||||
|
# except Exception as e:
|
||||||
|
# if isinstance(e, type(ex)):
|
||||||
|
# return e
|
||||||
|
# else:
|
||||||
|
# raise e
|
||||||
|
|
||||||
|
|
||||||
|
def sort_res_by(items: Iterable[ResT], key) -> List[ResT]:
|
||||||
|
"""
|
||||||
|
The general idea is: just alaways carry errors with the entry that precedes them
|
||||||
|
"""
|
||||||
|
# TODO ResT object should hold exception class?...
|
||||||
|
group = []
|
||||||
|
groups = []
|
||||||
|
for i in items:
|
||||||
|
if isinstance(i, Exception):
|
||||||
|
group.append(i)
|
||||||
|
else:
|
||||||
|
groups.append((i, group))
|
||||||
|
group = []
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for v, errs in sorted(groups, key=lambda p: key(p[0])):
|
||||||
|
results.extend(errs)
|
||||||
|
results.append(v)
|
||||||
|
results.extend(group)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def test_sort_res_by():
|
||||||
|
class Exc(Exception):
|
||||||
|
def __eq__(self, other):
|
||||||
|
return self.args == other.args
|
||||||
|
|
||||||
|
ress = [
|
||||||
|
Exc('first'),
|
||||||
|
Exc('second'),
|
||||||
|
5,
|
||||||
|
3,
|
||||||
|
Exc('xxx'),
|
||||||
|
2,
|
||||||
|
1,
|
||||||
|
Exc('last'),
|
||||||
|
]
|
||||||
|
results = sort_res_by(ress, lambda x: x) # type: ignore
|
||||||
|
assert results == [
|
||||||
|
1,
|
||||||
|
Exc('xxx'),
|
||||||
|
2,
|
||||||
|
3,
|
||||||
|
Exc('first'),
|
||||||
|
Exc('second'),
|
||||||
|
5,
|
||||||
|
Exc('last'),
|
||||||
|
]
|
||||||
|
|
||||||
|
results2 = sort_res_by(ress + [0], lambda x: x) # type: ignore
|
||||||
|
assert results2 == [Exc('last'), 0] + results[:-1]
|
||||||
|
|
157
my/kython/konsume.py
Normal file
157
my/kython/konsume.py
Normal file
|
@ -0,0 +1,157 @@
|
||||||
|
from collections import OrderedDict
|
||||||
|
from typing import Any, List
|
||||||
|
|
||||||
|
|
||||||
|
def ignore(w, *keys):
|
||||||
|
for k in keys:
|
||||||
|
w[k].ignore()
|
||||||
|
|
||||||
|
def zoom(w, *keys):
|
||||||
|
return [w[k].zoom() for k in keys]
|
||||||
|
|
||||||
|
# TODO need to support lists
|
||||||
|
class Zoomable:
|
||||||
|
def __init__(self, parent, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs) # type: ignore
|
||||||
|
self.parent = parent
|
||||||
|
|
||||||
|
# TODO not sure, maybe do it via del??
|
||||||
|
# TODO need to make sure they are in proper order? object should be last..
|
||||||
|
@property
|
||||||
|
def dependants(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def ignore(self):
|
||||||
|
self.consume_all()
|
||||||
|
|
||||||
|
def consume_all(self):
|
||||||
|
for d in self.dependants:
|
||||||
|
d.consume_all()
|
||||||
|
self.consume()
|
||||||
|
|
||||||
|
def consume(self):
|
||||||
|
assert self.parent is not None
|
||||||
|
self.parent._remove(self)
|
||||||
|
|
||||||
|
def zoom(self):
|
||||||
|
self.consume()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _remove(self, xx):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def this_consumed(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class Wdict(Zoomable, OrderedDict):
|
||||||
|
def _remove(self, xx):
|
||||||
|
keys = [k for k, v in self.items() if v is xx]
|
||||||
|
assert len(keys) == 1
|
||||||
|
del self[keys[0]]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dependants(self):
|
||||||
|
return list(self.values())
|
||||||
|
|
||||||
|
def this_consumed(self):
|
||||||
|
return len(self) == 0
|
||||||
|
|
||||||
|
class Wlist(Zoomable, list):
|
||||||
|
def _remove(self, xx):
|
||||||
|
self.remove(xx)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dependants(self):
|
||||||
|
return list(self)
|
||||||
|
|
||||||
|
def this_consumed(self):
|
||||||
|
return len(self) == 0
|
||||||
|
|
||||||
|
class Wvalue(Zoomable):
|
||||||
|
def __init__(self, parent, value: Any) -> None:
|
||||||
|
super().__init__(parent)
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dependants(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def this_consumed(self):
|
||||||
|
return True # TODO not sure..
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'WValue{' + repr(self.value) + '}'
|
||||||
|
|
||||||
|
def _wrap(j, parent=None):
|
||||||
|
res: Zoomable
|
||||||
|
cc: List[Zoomable]
|
||||||
|
if isinstance(j, dict):
|
||||||
|
res = Wdict(parent)
|
||||||
|
cc = [res]
|
||||||
|
for k, v in j.items():
|
||||||
|
vv, c = _wrap(v, parent=res)
|
||||||
|
res[k] = vv
|
||||||
|
cc.extend(c)
|
||||||
|
return res, cc
|
||||||
|
elif isinstance(j, list):
|
||||||
|
res = Wlist(parent)
|
||||||
|
cc = [res]
|
||||||
|
for i in j:
|
||||||
|
ii, c = _wrap(i, parent=res)
|
||||||
|
res.append(ii)
|
||||||
|
cc.extend(c)
|
||||||
|
return res, cc
|
||||||
|
elif isinstance(j, (int, float, str, type(None))):
|
||||||
|
res = Wvalue(parent, j)
|
||||||
|
return res, [res]
|
||||||
|
else:
|
||||||
|
raise RuntimeError(str(j))
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
class UnconsumedError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def wrap(j):
|
||||||
|
w, children = _wrap(j)
|
||||||
|
|
||||||
|
yield w
|
||||||
|
|
||||||
|
for c in children:
|
||||||
|
if not c.this_consumed(): # TODO hmm. how does it figure out if it's consumed???
|
||||||
|
raise UnconsumedError(str(c))
|
||||||
|
|
||||||
|
def test_unconsumed():
|
||||||
|
import pytest # type: ignore
|
||||||
|
with pytest.raises(UnconsumedError):
|
||||||
|
with wrap({'a': 1234}) as w:
|
||||||
|
pass
|
||||||
|
|
||||||
|
with pytest.raises(UnconsumedError):
|
||||||
|
with wrap({'c': {'d': 2222}}) as w:
|
||||||
|
d = w['c']['d'].zoom()
|
||||||
|
|
||||||
|
def test_consumed():
|
||||||
|
with wrap({'a': 1234}) as w:
|
||||||
|
a = w['a'].zoom()
|
||||||
|
|
||||||
|
with wrap({'c': {'d': 2222}}) as w:
|
||||||
|
c = w['c'].zoom()
|
||||||
|
d = c['d'].zoom()
|
||||||
|
|
||||||
|
def test_types():
|
||||||
|
# (string, number, object, array, boolean or nul
|
||||||
|
with wrap({'string': 'string', 'number': 3.14, 'boolean': True, 'null': None, 'list': [1, 2, 3]}) as w:
|
||||||
|
w['string'].zoom()
|
||||||
|
w['number'].consume()
|
||||||
|
w['boolean'].zoom()
|
||||||
|
w['null'].zoom()
|
||||||
|
for x in list(w['list'].zoom()): # TODO eh. how to avoid the extra list thing?
|
||||||
|
x.consume()
|
||||||
|
|
||||||
|
def test_consume_all():
|
||||||
|
with wrap({'aaa': {'bbb': {'hi': 123}}}) as w:
|
||||||
|
aaa = w['aaa'].zoom()
|
||||||
|
aaa['bbb'].consume_all()
|
|
@ -9,8 +9,8 @@ import pytz
|
||||||
|
|
||||||
from ..common import setup_logger
|
from ..common import setup_logger
|
||||||
|
|
||||||
from kython.kerror import ResT, echain, unwrap, sort_res_by
|
from ..kython.kerror import ResT, echain, unwrap, sort_res_by
|
||||||
from kython.konsume import wrap, zoom, ignore
|
from ..kython.konsume import wrap, zoom, ignore
|
||||||
|
|
||||||
|
|
||||||
_POLAR_DIR = Path('~/.polar')
|
_POLAR_DIR = Path('~/.polar')
|
||||||
|
|
Loading…
Add table
Reference in a new issue