core: experimental guessing for last objects' date

This commit is contained in:
Dima Gerasimov 2020-07-31 17:00:32 +01:00 committed by karlicoss
parent cde5502151
commit 6515d1430f

View file

@ -367,29 +367,52 @@ def stat(func: Callable[[], Iterable[C]]) -> Dict[str, Any]:
# todo not sure if there is something in more_itertools to compute this? # todo not sure if there is something in more_itertools to compute this?
errors = 0 errors = 0
last = None
def funcit(): def funcit():
nonlocal errors nonlocal errors, last
for x in func(): for x in func():
if isinstance(x, Exception): if isinstance(x, Exception):
errors += 1 errors += 1
else:
last = x
yield x yield x
it = iter(funcit()) it = iter(funcit())
res: Any count: Any
if QUICK_STATS: if QUICK_STATS:
initial = take(100, it) initial = take(100, it)
res = len(initial) count = len(initial)
if first(it, None) is not None: # todo can actually be none... if first(it, None) is not None: # todo can actually be none...
# haven't exhausted # haven't exhausted
res = f'{res}+' count = f'{count}+'
else: else:
res = ilen(it) count = ilen(it)
res = {
'count': count,
}
if errors > 0: if errors > 0:
# todo not sure, but for now ok res['errors'] = errors
res = f'{res} ({errors} errors)'
if last is not None:
dt = guess_datetime(last)
if dt is not None:
res['last'] = dt
return { return {
func.__name__: res, func.__name__: res,
} }
# experimental, not sure about it..
def guess_datetime(x: Any) -> Optional[datetime]:
# todo support datacalsses
asdict = getattr(x, '_asdict', None)
if asdict is None:
return None
# todo check if there are multiple?
for k, v in asdict().items():
if isinstance(v, datetime):
return v
return None