reddit.rexport: experiment with using optional cpu pool (used by all of HPI)

Enabled by the env variable, specifying how many cores to dedicate, e.g.

HPI_CPU_POOL=4 hpi query ...
This commit is contained in:
Dima Gerasimov 2023-10-24 23:41:45 +01:00
parent a5c04e789a
commit 9681d6f50f
2 changed files with 52 additions and 5 deletions

View file

@ -8,6 +8,7 @@ REQUIRES = [
]
from dataclasses import dataclass
import inspect
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, Sequence
@ -45,14 +46,16 @@ def migration(attrs: Attrs) -> Attrs:
ex: uconfig.rexport = attrs['rexport']
attrs['export_path'] = ex.export_path
else:
warnings.high("""DEPRECATED! Please modify your reddit config to look like:
warnings.high(
"""DEPRECATED! Please modify your reddit config to look like:
class reddit:
class rexport:
export_path: Paths = '/path/to/rexport/data'
""")
"""
)
export_dir = 'export_dir'
if export_dir in attrs: # legacy name
if export_dir in attrs: # legacy name
attrs['export_path'] = attrs[export_dir]
warnings.high(f'"{export_dir}" is deprecated! Please use "export_path" instead."')
return attrs
@ -93,8 +96,19 @@ Upvote = dal.Upvote
def _dal() -> dal.DAL:
inp = list(inputs())
return dal.DAL(inp)
sources = list(inputs())
## backwards compatibility (old rexport DAL didn't have cpu_pool argument)
cpu_pool_arg = 'cpu_pool'
pass_cpu_pool = cpu_pool_arg in inspect.signature(dal.DAL).parameters
if pass_cpu_pool:
from my.core._cpu_pool import get_cpu_pool
kwargs = {cpu_pool_arg: get_cpu_pool()}
else:
kwargs = {}
##
return dal.DAL(sources, **kwargs)
cache = mcachew(depends_on=inputs)