Merge pull request #54 from karlicoss/updates
core: update warnings, add warn_if_empty decorator fore move defensive data sources
This commit is contained in:
commit
ce8cd5b52c
7 changed files with 136 additions and 10 deletions
|
@ -1,4 +1,5 @@
|
||||||
# this file only keeps the most common & critical types/utility functions
|
# this file only keeps the most common & critical types/utility functions
|
||||||
from .common import PathIsh, Paths, Json
|
from .common import PathIsh, Paths, Json
|
||||||
from .common import get_files, LazyLogger
|
from .common import get_files, LazyLogger
|
||||||
|
from .common import warn_if_empty
|
||||||
from .cfg import make_config
|
from .cfg import make_config
|
||||||
|
|
|
@ -3,7 +3,7 @@ from pathlib import Path
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import functools
|
import functools
|
||||||
import types
|
import types
|
||||||
from typing import Union, Callable, Dict, Iterable, TypeVar, Sequence, List, Optional, Any, cast, Tuple
|
from typing import Union, Callable, Dict, Iterable, TypeVar, Sequence, List, Optional, Any, cast, Tuple, TYPE_CHECKING
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
# some helper functions
|
# some helper functions
|
||||||
|
@ -130,6 +130,11 @@ def get_files(pp: Paths, glob: str=DEFAULT_GLOB, sort: bool=True) -> Tuple[Path,
|
||||||
else:
|
else:
|
||||||
sources.extend(map(Path, pp))
|
sources.extend(map(Path, pp))
|
||||||
|
|
||||||
|
def caller() -> str:
|
||||||
|
import traceback
|
||||||
|
# TODO ugh. very flaky... -3 because [<this function>, get_files(), <actual caller>]
|
||||||
|
return traceback.extract_stack()[-3].filename
|
||||||
|
|
||||||
paths: List[Path] = []
|
paths: List[Path] = []
|
||||||
for src in sources:
|
for src in sources:
|
||||||
if src.parts[0] == '~':
|
if src.parts[0] == '~':
|
||||||
|
@ -141,7 +146,7 @@ def get_files(pp: Paths, glob: str=DEFAULT_GLOB, sort: bool=True) -> Tuple[Path,
|
||||||
ss = str(src)
|
ss = str(src)
|
||||||
if '*' in ss:
|
if '*' in ss:
|
||||||
if glob != DEFAULT_GLOB:
|
if glob != DEFAULT_GLOB:
|
||||||
warnings.warn(f"Treating {ss} as glob path. Explicit glob={glob} argument is ignored!")
|
warnings.warn(f"{caller()}: treating {ss} as glob path. Explicit glob={glob} argument is ignored!")
|
||||||
paths.extend(map(Path, do_glob(ss)))
|
paths.extend(map(Path, do_glob(ss)))
|
||||||
else:
|
else:
|
||||||
if not src.is_file():
|
if not src.is_file():
|
||||||
|
@ -154,14 +159,15 @@ def get_files(pp: Paths, glob: str=DEFAULT_GLOB, sort: bool=True) -> Tuple[Path,
|
||||||
|
|
||||||
if len(paths) == 0:
|
if len(paths) == 0:
|
||||||
# todo make it conditionally defensive based on some global settings
|
# todo make it conditionally defensive based on some global settings
|
||||||
# todo stacktrace?
|
# TODO not sure about using warnings module for this
|
||||||
warnings.warn(f'No paths were matched against {paths}. This might result in missing data.')
|
import traceback
|
||||||
|
warnings.warn(f'{caller()}: no paths were matched against {paths}. This might result in missing data.')
|
||||||
|
traceback.print_stack()
|
||||||
|
|
||||||
return tuple(paths)
|
return tuple(paths)
|
||||||
|
|
||||||
|
|
||||||
# TODO annotate it, perhaps use 'dependent' type (for @doublewrap stuff)
|
# TODO annotate it, perhaps use 'dependent' type (for @doublewrap stuff)
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from typing import Callable, TypeVar
|
from typing import Callable, TypeVar
|
||||||
from typing_extensions import Protocol
|
from typing_extensions import Protocol
|
||||||
|
@ -269,3 +275,60 @@ import re
|
||||||
def get_valid_filename(s: str) -> str:
|
def get_valid_filename(s: str) -> str:
|
||||||
s = str(s).strip().replace(' ', '_')
|
s = str(s).strip().replace(' ', '_')
|
||||||
return re.sub(r'(?u)[^-\w.]', '', s)
|
return re.sub(r'(?u)[^-\w.]', '', s)
|
||||||
|
|
||||||
|
|
||||||
|
from typing import Generic, Sized, Callable
|
||||||
|
|
||||||
|
|
||||||
|
# X = TypeVar('X')
|
||||||
|
def _warn_iterator(it, f: Any=None):
|
||||||
|
emitted = False
|
||||||
|
for i in it:
|
||||||
|
yield i
|
||||||
|
emitted = True
|
||||||
|
if not emitted:
|
||||||
|
warnings.warn(f"Function {f} didn't emit any data, make sure your config paths are correct")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO ugh, so I want to express something like:
|
||||||
|
# X = TypeVar('X')
|
||||||
|
# C = TypeVar('C', bound=Iterable[X])
|
||||||
|
# _warn_iterable(it: C) -> C
|
||||||
|
# but apparently I can't??? ugh.
|
||||||
|
# https://github.com/python/typing/issues/548
|
||||||
|
# I guess for now overloads are fine...
|
||||||
|
|
||||||
|
from typing import overload
|
||||||
|
X = TypeVar('X')
|
||||||
|
@overload
|
||||||
|
def _warn_iterable(it: List[X] , f: Any=None) -> List[X] : ...
|
||||||
|
@overload
|
||||||
|
def _warn_iterable(it: Iterable[X], f: Any=None) -> Iterable[X]: ...
|
||||||
|
def _warn_iterable(it, f=None):
|
||||||
|
if isinstance(it, Sized):
|
||||||
|
sz = len(it)
|
||||||
|
if sz == 0:
|
||||||
|
warnings.warn(f"Function {f} returned empty container, make sure your config paths are correct")
|
||||||
|
return it
|
||||||
|
else:
|
||||||
|
return _warn_iterator(it, f=f)
|
||||||
|
|
||||||
|
|
||||||
|
# ok, this seems to work...
|
||||||
|
# https://github.com/python/mypy/issues/1927#issue-167100413
|
||||||
|
FL = TypeVar('FL', bound=Callable[..., List])
|
||||||
|
FI = TypeVar('FI', bound=Callable[..., Iterable])
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def warn_if_empty(f: FL) -> FL: ...
|
||||||
|
@overload
|
||||||
|
def warn_if_empty(f: FI) -> FI: ...
|
||||||
|
|
||||||
|
|
||||||
|
def warn_if_empty(f):
|
||||||
|
from functools import wraps
|
||||||
|
@wraps(f)
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
res = f(*args, **kwargs)
|
||||||
|
return _warn_iterable(res, f=f)
|
||||||
|
return wrapped # type: ignore
|
||||||
|
|
|
@ -1,11 +1,16 @@
|
||||||
'''
|
'''
|
||||||
Unified RSS data, merged from different services I used historically
|
Unified RSS data, merged from different services I used historically
|
||||||
'''
|
'''
|
||||||
|
# NOTE: you can comment out the sources you're not using
|
||||||
|
from . import feedbin, feedly
|
||||||
|
|
||||||
from typing import Iterable
|
from typing import Iterable
|
||||||
from .common import Subscription, compute_subscriptions
|
from .common import Subscription, compute_subscriptions
|
||||||
|
|
||||||
|
|
||||||
def subscriptions() -> Iterable[Subscription]:
|
def subscriptions() -> Iterable[Subscription]:
|
||||||
from . import feedbin, feedly
|
|
||||||
# TODO google reader?
|
# TODO google reader?
|
||||||
yield from compute_subscriptions(feedbin.states(), feedly.states())
|
yield from compute_subscriptions(
|
||||||
|
feedbin.states(),
|
||||||
|
feedly .states(),
|
||||||
|
)
|
||||||
|
|
|
@ -17,6 +17,8 @@ from typing import Iterable, Tuple, Sequence
|
||||||
SubscriptionState = Tuple[datetime, Sequence[Subscription]]
|
SubscriptionState = Tuple[datetime, Sequence[Subscription]]
|
||||||
|
|
||||||
|
|
||||||
|
from ..core import warn_if_empty
|
||||||
|
@warn_if_empty
|
||||||
def compute_subscriptions(*sources: Iterable[SubscriptionState]) -> List[Subscription]:
|
def compute_subscriptions(*sources: Iterable[SubscriptionState]) -> List[Subscription]:
|
||||||
"""
|
"""
|
||||||
Keeps track of everything I ever subscribed to.
|
Keeps track of everything I ever subscribed to.
|
||||||
|
@ -34,6 +36,9 @@ def compute_subscriptions(*sources: Iterable[SubscriptionState]) -> List[Subscri
|
||||||
if feed.url not in by_url:
|
if feed.url not in by_url:
|
||||||
by_url[feed.url] = feed
|
by_url[feed.url] = feed
|
||||||
|
|
||||||
|
if len(states) == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
_, last_state = max(states, key=lambda x: x[0])
|
_, last_state = max(states, key=lambda x: x[0])
|
||||||
last_urls = {f.url for f in last_state}
|
last_urls = {f.url for f in last_state}
|
||||||
|
|
||||||
|
|
|
@ -3,11 +3,9 @@ Unified Twitter data (merged from the archive and periodic updates)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# NOTE: you can comment out the sources you don't need
|
# NOTE: you can comment out the sources you don't need
|
||||||
|
|
||||||
|
|
||||||
from . import twint, archive
|
from . import twint, archive
|
||||||
from .common import merge_tweets
|
|
||||||
|
|
||||||
|
from .common import merge_tweets
|
||||||
|
|
||||||
def tweets():
|
def tweets():
|
||||||
yield from merge_tweets(
|
yield from merge_tweets(
|
||||||
|
@ -15,6 +13,7 @@ def tweets():
|
||||||
archive.tweets(),
|
archive.tweets(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from .common import merge_tweets
|
||||||
|
|
||||||
def likes():
|
def likes():
|
||||||
yield from merge_tweets(
|
yield from merge_tweets(
|
||||||
|
|
|
@ -2,7 +2,9 @@ from itertools import chain
|
||||||
|
|
||||||
from more_itertools import unique_everseen
|
from more_itertools import unique_everseen
|
||||||
|
|
||||||
|
from ..core import warn_if_empty
|
||||||
|
|
||||||
|
@warn_if_empty
|
||||||
def merge_tweets(*sources):
|
def merge_tweets(*sources):
|
||||||
yield from unique_everseen(
|
yield from unique_everseen(
|
||||||
chain(*sources),
|
chain(*sources),
|
||||||
|
|
|
@ -48,3 +48,54 @@ def prepare(tmp_path: Path):
|
||||||
|
|
||||||
# meh
|
# meh
|
||||||
from my.core.error import test_sort_res_by
|
from my.core.error import test_sort_res_by
|
||||||
|
|
||||||
|
|
||||||
|
from typing import Iterable, List
|
||||||
|
import warnings
|
||||||
|
from my.core import warn_if_empty
|
||||||
|
def test_warn_if_empty() -> None:
|
||||||
|
@warn_if_empty
|
||||||
|
def nonempty() -> Iterable[str]:
|
||||||
|
yield 'a'
|
||||||
|
yield 'aba'
|
||||||
|
|
||||||
|
@warn_if_empty
|
||||||
|
def empty() -> List[int]:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# should be rejected by mypy!
|
||||||
|
# todo how to actually test it?
|
||||||
|
# @warn_if_empty
|
||||||
|
# def baad() -> float:
|
||||||
|
# return 0.00
|
||||||
|
|
||||||
|
# reveal_type(nonempty)
|
||||||
|
# reveal_type(empty)
|
||||||
|
|
||||||
|
with warnings.catch_warnings(record=True) as w:
|
||||||
|
assert list(nonempty()) == ['a', 'aba']
|
||||||
|
assert len(w) == 0
|
||||||
|
|
||||||
|
eee = empty()
|
||||||
|
assert eee == []
|
||||||
|
assert len(w) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_warn_iterable() -> None:
|
||||||
|
from my.core.common import _warn_iterable
|
||||||
|
i1: List[str] = ['a', 'b']
|
||||||
|
i2: Iterable[int] = iter([1, 2, 3])
|
||||||
|
# reveal_type(i1)
|
||||||
|
# reveal_type(i2)
|
||||||
|
x1 = _warn_iterable(i1)
|
||||||
|
x2 = _warn_iterable(i2)
|
||||||
|
# vvvv this should be flagged by mypy
|
||||||
|
# _warn_iterable(123)
|
||||||
|
# reveal_type(x1)
|
||||||
|
# reveal_type(x2)
|
||||||
|
with warnings.catch_warnings(record=True) as w:
|
||||||
|
assert x1 is i1 # should be unchanged!
|
||||||
|
assert len(w) == 0
|
||||||
|
|
||||||
|
assert list(x2) == [1, 2, 3]
|
||||||
|
assert len(w) == 0
|
||||||
|
|
Loading…
Add table
Reference in a new issue