core.common: move DummyExecutor to core.common.utils.concurrent

without backwards compat, unlikely it's been used by anyone
This commit is contained in:
Dima Gerasimov 2024-08-15 14:15:23 +03:00 committed by karlicoss
parent bcc4c15304
commit 18529257e7
3 changed files with 52 additions and 41 deletions

View file

@ -5,7 +5,6 @@ from dataclasses import is_dataclass, asdict as dataclasses_asdict
import functools import functools
from contextlib import contextmanager from contextlib import contextmanager
import os import os
import sys
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -382,45 +381,6 @@ def assert_subpackage(name: str) -> None:
assert name == '__main__' or 'my.core' in name, f'Expected module __name__ ({name}) to be __main__ or start with my.core' assert name == '__main__' or 'my.core' in name, f'Expected module __name__ ({name}) to be __main__ or start with my.core'
from .compat import ParamSpec
_P = ParamSpec('_P')
_T = TypeVar('_T')
# https://stackoverflow.com/a/10436851/706389
from concurrent.futures import Future, Executor
class DummyExecutor(Executor):
def __init__(self, max_workers: Optional[int]=1) -> None:
self._shutdown = False
self._max_workers = max_workers
if TYPE_CHECKING:
if sys.version_info[:2] <= (3, 8):
# 3.8 doesn't support ParamSpec as Callable arg :(
# and any attempt to type results in incompatible supertype.. so whatever
def submit(self, fn, *args, **kwargs): ...
else:
def submit(self, fn: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> Future[_T]: ...
else:
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f: Future[Any] = Future()
try:
result = fn(*args, **kwargs)
except KeyboardInterrupt:
raise
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)
return f
def shutdown(self, wait: bool=True, **kwargs) -> None:
self._shutdown = True
# TODO deprecate and suggest to use one from my.core directly? not sure # TODO deprecate and suggest to use one from my.core directly? not sure
from .utils.itertools import unique_everseen from .utils.itertools import unique_everseen

View file

@ -0,0 +1,51 @@
from concurrent.futures import Future, Executor
import sys
from typing import Any, Callable, Optional, TypeVar, TYPE_CHECKING
from ..compat import ParamSpec
_P = ParamSpec('_P')
_T = TypeVar('_T')
# https://stackoverflow.com/a/10436851/706389
class DummyExecutor(Executor):
"""
This is useful if you're already using Executor for parallelising,
but also want to provide an option to run the code serially (e.g. for debugging)
"""
def __init__(self, max_workers: Optional[int] = 1) -> None:
self._shutdown = False
self._max_workers = max_workers
if TYPE_CHECKING:
if sys.version_info[:2] <= (3, 8):
# 3.8 doesn't support ParamSpec as Callable arg :(
# and any attempt to type results in incompatible supertype.. so whatever
def submit(self, fn, *args, **kwargs): ...
else:
def submit(self, fn: Callable[_P, _T], /, *args: _P.args, **kwargs: _P.kwargs) -> Future[_T]: ...
else:
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f: Future[Any] = Future()
try:
result = fn(*args, **kwargs)
except KeyboardInterrupt:
raise
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)
return f
def shutdown(self, wait: bool = True, **kwargs) -> None:
self._shutdown = True

View file

@ -121,7 +121,7 @@ def _iter_annotations(pdfs: Sequence[Path]) -> Iterator[Res[Annotation]]:
# todo how to print to stdout synchronously? # todo how to print to stdout synchronously?
# todo global config option not to use pools? useful for debugging.. # todo global config option not to use pools? useful for debugging..
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from my.core.common import DummyExecutor from my.core.utils.concurrent import DummyExecutor
workers = None # use 0 for debugging workers = None # use 0 for debugging
Pool = DummyExecutor if workers == 0 else ProcessPoolExecutor Pool = DummyExecutor if workers == 0 else ProcessPoolExecutor
with Pool(workers) as pool: with Pool(workers) as pool: