reddit.rexport: experiment with using optional cpu pool (used by all of HPI)

Enabled by the env variable, specifying how many cores to dedicate, e.g.

HPI_CPU_POOL=4 hpi query ...
This commit is contained in:
Dima Gerasimov 2023-10-24 23:41:45 +01:00 committed by karlicoss
parent a5c04e789a
commit 1f61e853c9
2 changed files with 52 additions and 5 deletions

33
my/core/_cpu_pool.py Normal file
View file

@ -0,0 +1,33 @@
"""
EXPERIMENTAL! use with caution
Manages 'global' ProcessPoolExecutor which is 'managed' by HPI itself, and
can be passed down to DALs to speed up data processing.
The reason to have it managed by HPI is because we don't want DALs instantiate pools
themselves -- they can't cooperate and it would be hard/infeasible to control
how many cores we want to dedicate to the DAL.
Enabled by the env variable, specifying how many cores to dedicate
e.g. "HPI_CPU_POOL=4 hpi query ..."
"""
from concurrent.futures import ProcessPoolExecutor
import os
from typing import cast, Optional
_NOT_SET = cast(ProcessPoolExecutor, object())
_INSTANCE: Optional[ProcessPoolExecutor] = _NOT_SET
def get_cpu_pool() -> Optional[ProcessPoolExecutor]:
global _INSTANCE
if _INSTANCE is _NOT_SET:
use_cpu_pool = os.environ.get('HPI_CPU_POOL')
if use_cpu_pool is None or int(use_cpu_pool) == 0:
_INSTANCE = None
else:
# NOTE: this won't be cleaned up properly, but I guess it's fine?
# since this it's basically a singleton for the whole process
# , and will be destroyed when python exists
_INSTANCE = ProcessPoolExecutor(max_workers=int(use_cpu_pool))
return _INSTANCE

View file

@ -8,6 +8,7 @@ REQUIRES = [
]
from dataclasses import dataclass
import inspect
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, Sequence
@ -45,12 +46,14 @@ def migration(attrs: Attrs) -> Attrs:
ex: uconfig.rexport = attrs['rexport']
attrs['export_path'] = ex.export_path
else:
warnings.high("""DEPRECATED! Please modify your reddit config to look like:
warnings.high(
"""DEPRECATED! Please modify your reddit config to look like:
class reddit:
class rexport:
export_path: Paths = '/path/to/rexport/data'
""")
"""
)
export_dir = 'export_dir'
if export_dir in attrs: # legacy name
attrs['export_path'] = attrs[export_dir]
@ -93,8 +96,19 @@ Upvote = dal.Upvote
def _dal() -> dal.DAL:
inp = list(inputs())
return dal.DAL(inp)
sources = list(inputs())
## backwards compatibility (old rexport DAL didn't have cpu_pool argument)
cpu_pool_arg = 'cpu_pool'
pass_cpu_pool = cpu_pool_arg in inspect.signature(dal.DAL).parameters
if pass_cpu_pool:
from my.core._cpu_pool import get_cpu_pool
kwargs = {cpu_pool_arg: get_cpu_pool()}
else:
kwargs = {}
##
return dal.DAL(sources, **kwargs)
cache = mcachew(depends_on=inputs)