From 1f61e853c99c58d16a924ba4a064bd597ca16719 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Tue, 24 Oct 2023 23:41:45 +0100 Subject: [PATCH] reddit.rexport: experiment with using optional cpu pool (used by all of HPI) Enabled by the env variable, specifying how many cores to dedicate, e.g. HPI_CPU_POOL=4 hpi query ... --- my/core/_cpu_pool.py | 33 +++++++++++++++++++++++++++++++++ my/reddit/rexport.py | 24 +++++++++++++++++++----- 2 files changed, 52 insertions(+), 5 deletions(-) create mode 100644 my/core/_cpu_pool.py diff --git a/my/core/_cpu_pool.py b/my/core/_cpu_pool.py new file mode 100644 index 0000000..5ac66de --- /dev/null +++ b/my/core/_cpu_pool.py @@ -0,0 +1,33 @@ +""" +EXPERIMENTAL! use with caution +Manages 'global' ProcessPoolExecutor which is 'managed' by HPI itself, and +can be passed down to DALs to speed up data processing. + +The reason to have it managed by HPI is because we don't want DALs instantiate pools +themselves -- they can't cooperate and it would be hard/infeasible to control +how many cores we want to dedicate to the DAL. + +Enabled by the env variable, specifying how many cores to dedicate +e.g. "HPI_CPU_POOL=4 hpi query ..." +""" +from concurrent.futures import ProcessPoolExecutor +import os +from typing import cast, Optional + + +_NOT_SET = cast(ProcessPoolExecutor, object()) +_INSTANCE: Optional[ProcessPoolExecutor] = _NOT_SET + + +def get_cpu_pool() -> Optional[ProcessPoolExecutor]: + global _INSTANCE + if _INSTANCE is _NOT_SET: + use_cpu_pool = os.environ.get('HPI_CPU_POOL') + if use_cpu_pool is None or int(use_cpu_pool) == 0: + _INSTANCE = None + else: + # NOTE: this won't be cleaned up properly, but I guess it's fine? + # since this it's basically a singleton for the whole process + # , and will be destroyed when python exists + _INSTANCE = ProcessPoolExecutor(max_workers=int(use_cpu_pool)) + return _INSTANCE diff --git a/my/reddit/rexport.py b/my/reddit/rexport.py index 1f72133..f166ecd 100644 --- a/my/reddit/rexport.py +++ b/my/reddit/rexport.py @@ -8,6 +8,7 @@ REQUIRES = [ ] from dataclasses import dataclass +import inspect from pathlib import Path from typing import TYPE_CHECKING, Iterator, Sequence @@ -45,14 +46,16 @@ def migration(attrs: Attrs) -> Attrs: ex: uconfig.rexport = attrs['rexport'] attrs['export_path'] = ex.export_path else: - warnings.high("""DEPRECATED! Please modify your reddit config to look like: + warnings.high( + """DEPRECATED! Please modify your reddit config to look like: class reddit: class rexport: export_path: Paths = '/path/to/rexport/data' - """) + """ + ) export_dir = 'export_dir' - if export_dir in attrs: # legacy name + if export_dir in attrs: # legacy name attrs['export_path'] = attrs[export_dir] warnings.high(f'"{export_dir}" is deprecated! Please use "export_path" instead."') return attrs @@ -93,8 +96,19 @@ Upvote = dal.Upvote def _dal() -> dal.DAL: - inp = list(inputs()) - return dal.DAL(inp) + sources = list(inputs()) + + ## backwards compatibility (old rexport DAL didn't have cpu_pool argument) + cpu_pool_arg = 'cpu_pool' + pass_cpu_pool = cpu_pool_arg in inspect.signature(dal.DAL).parameters + if pass_cpu_pool: + from my.core._cpu_pool import get_cpu_pool + + kwargs = {cpu_pool_arg: get_cpu_pool()} + else: + kwargs = {} + ## + return dal.DAL(sources, **kwargs) cache = mcachew(depends_on=inputs)