diff --git a/my/core/common.py b/my/core/common.py index bfc3505..739971c 100644 --- a/my/core/common.py +++ b/my/core/common.py @@ -5,7 +5,6 @@ from dataclasses import is_dataclass, asdict as dataclasses_asdict import functools from contextlib import contextmanager import os -import sys from typing import ( Any, Callable, @@ -382,45 +381,6 @@ def assert_subpackage(name: str) -> None: assert name == '__main__' or 'my.core' in name, f'Expected module __name__ ({name}) to be __main__ or start with my.core' -from .compat import ParamSpec -_P = ParamSpec('_P') -_T = TypeVar('_T') - -# https://stackoverflow.com/a/10436851/706389 -from concurrent.futures import Future, Executor -class DummyExecutor(Executor): - def __init__(self, max_workers: Optional[int]=1) -> None: - self._shutdown = False - self._max_workers = max_workers - - if TYPE_CHECKING: - if sys.version_info[:2] <= (3, 8): - # 3.8 doesn't support ParamSpec as Callable arg :( - # and any attempt to type results in incompatible supertype.. so whatever - def submit(self, fn, *args, **kwargs): ... - else: - def submit(self, fn: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> Future[_T]: ... - else: - def submit(self, fn, *args, **kwargs): - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f: Future[Any] = Future() - try: - result = fn(*args, **kwargs) - except KeyboardInterrupt: - raise - except BaseException as e: - f.set_exception(e) - else: - f.set_result(result) - - return f - - def shutdown(self, wait: bool=True, **kwargs) -> None: - self._shutdown = True - - # TODO deprecate and suggest to use one from my.core directly? not sure from .utils.itertools import unique_everseen diff --git a/my/core/utils/concurrent.py b/my/core/utils/concurrent.py new file mode 100644 index 0000000..cc17cda --- /dev/null +++ b/my/core/utils/concurrent.py @@ -0,0 +1,51 @@ +from concurrent.futures import Future, Executor +import sys +from typing import Any, Callable, Optional, TypeVar, TYPE_CHECKING + +from ..compat import ParamSpec + + +_P = ParamSpec('_P') +_T = TypeVar('_T') + + +# https://stackoverflow.com/a/10436851/706389 +class DummyExecutor(Executor): + """ + This is useful if you're already using Executor for parallelising, + but also want to provide an option to run the code serially (e.g. for debugging) + """ + def __init__(self, max_workers: Optional[int] = 1) -> None: + self._shutdown = False + self._max_workers = max_workers + + if TYPE_CHECKING: + if sys.version_info[:2] <= (3, 8): + # 3.8 doesn't support ParamSpec as Callable arg :( + # and any attempt to type results in incompatible supertype.. so whatever + def submit(self, fn, *args, **kwargs): ... + + else: + + def submit(self, fn: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> Future[_T]: ... + + else: + + def submit(self, fn, *args, **kwargs): + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f: Future[Any] = Future() + try: + result = fn(*args, **kwargs) + except KeyboardInterrupt: + raise + except BaseException as e: + f.set_exception(e) + else: + f.set_result(result) + + return f + + def shutdown(self, wait: bool = True, **kwargs) -> None: + self._shutdown = True diff --git a/my/pdfs.py b/my/pdfs.py index b3ef85d..0ab4af3 100644 --- a/my/pdfs.py +++ b/my/pdfs.py @@ -121,7 +121,7 @@ def _iter_annotations(pdfs: Sequence[Path]) -> Iterator[Res[Annotation]]: # todo how to print to stdout synchronously? # todo global config option not to use pools? useful for debugging.. from concurrent.futures import ProcessPoolExecutor - from my.core.common import DummyExecutor + from my.core.utils.concurrent import DummyExecutor workers = None # use 0 for debugging Pool = DummyExecutor if workers == 0 else ProcessPoolExecutor with Pool(workers) as pool: