From 5933711888dffd3ddc1b2f5ab24e7e25d74fe5f6 Mon Sep 17 00:00:00 2001 From: Sean Breckenridge Date: Thu, 28 Oct 2021 11:28:00 -0700 Subject: [PATCH] initial pushshift/rexport merge implementation --- my/config.py | 7 ++- my/core/cfg.py | 2 +- my/core/source.py | 26 +++++++++ my/reddit/__init__.py | 14 +++++ my/reddit/all.py | 51 ++++++++++++++++++ my/reddit/common.py | 83 +++++++++++++++++++++++++++++ my/reddit/pushshift.py | 45 ++++++++++++++++ my/{reddit.py => reddit/rexport.py} | 51 +++++++++++------- 8 files changed, 259 insertions(+), 20 deletions(-) create mode 100644 my/core/source.py create mode 100644 my/reddit/__init__.py create mode 100644 my/reddit/all.py create mode 100644 my/reddit/common.py create mode 100644 my/reddit/pushshift.py rename my/{reddit.py => reddit/rexport.py} (85%) diff --git a/my/config.py b/my/config.py index 63d962c..4add9e9 100644 --- a/my/config.py +++ b/my/config.py @@ -34,7 +34,12 @@ class github: export_path: Paths = '' class reddit: - export_path: Paths = '' + class rexport: + export_path: Paths = '' + class pushshift: + export_path: Paths = '' + class gdpr: + export_path: Paths = '' class endomondo: export_path: Paths = '' diff --git a/my/core/cfg.py b/my/core/cfg.py index b23fa86..4b5cbed 100644 --- a/my/core/cfg.py +++ b/my/core/cfg.py @@ -10,7 +10,7 @@ C = TypeVar('C') def make_config(cls: Type[C], migration: Callable[[Attrs], Attrs]=lambda x: x) -> C: user_config = cls.__base__ old_props = { - # NOTE: deliberately use gettatr to 'force' lcass properties here + # NOTE: deliberately use gettatr to 'force' class properties here k: getattr(user_config, k) for k in vars(user_config) } new_props = migration(old_props) diff --git a/my/core/source.py b/my/core/source.py new file mode 100644 index 0000000..753c5aa --- /dev/null +++ b/my/core/source.py @@ -0,0 +1,26 @@ +from typing import Any, Iterator, TypeVar, Callable, Optional, Iterable +from my.core.warnings import warn + +T = TypeVar("T") + +# this is probably more generic and results in less code, but is not mypy-friendly +def import_source(factory: Callable[[], Any], default: Any) -> Any: + try: + res = factory() + return res + except ImportError: # presumable means the user hasn't installed the module + warn(f"Module {factory.__name__} could not be imported, or isn't configured propertly") + return default + + +# For an example of this, see the reddit.all file +def import_source_iter(factory: Callable[[], Iterator[T]], default: Optional[Iterable[T]] = None) -> Iterator[T]: + if default is None: + default = [] + try: + res = factory() + yield from res + except ImportError: # presumable means the user hasn't installed the module + warn(f"Module {factory.__name__} could not be imported, or isn't configured propertly") + yield from default + diff --git a/my/reddit/__init__.py b/my/reddit/__init__.py new file mode 100644 index 0000000..7b05d51 --- /dev/null +++ b/my/reddit/__init__.py @@ -0,0 +1,14 @@ +""" +This is here temporarily, for backwards compatability purposes +It should be removed in the future, and you should replace any imports +like: +from my.reddit import ... +to: +from my.reddit.all import ... +since that allows for easier overriding using namespace packages +https://github.com/karlicoss/HPI/issues/102 +""" + +# TODO: add warning here + +from .rexport import * diff --git a/my/reddit/all.py b/my/reddit/all.py new file mode 100644 index 0000000..26a2168 --- /dev/null +++ b/my/reddit/all.py @@ -0,0 +1,51 @@ +from typing import Iterator, Any, Callable, TypeVar +from my.core.source import import_source_iter as imp + +from .common import Save, Upvote, Comment, Submission, _merge_comments + +# Man... ideally an all.py file isn't this verbose, but +# reddit just feels like that much of a complicated source and +# data acquired by different methods isn't the same + +### import helpers + +# this import error is caught in import_source_iter, if rexport isn't installed +def _rexport_import() -> Any: + from . import rexport as source + return source + +def _rexport_comments() -> Iterator[Comment]: + yield from imp(_rexport_import().comments) + +def _pushshift_import() -> Any: + from . import pushshift as source + return source + +def _pushshift_comments() -> Iterator[Comment]: + yield from imp(_pushshift_import().comments) + +# Merged functions + +def comments() -> Iterator[Comment]: + # TODO: merge gdpr here + yield from _merge_comments(_rexport_comments(), _pushshift_comments()) + +def submissions() -> Iterator[Submission]: + # TODO: merge gdpr here + yield from imp(lambda: _rexport_import().submissions()) + +def saved() -> Iterator[Save]: + yield from imp(lambda: _rexport_import().saved()) + +def upvoted() -> Iterator[Upvote]: + yield from imp(lambda: _rexport_import().upvoted()) + +def stats() -> Stats: + from my.core import stat + return { + **stat(saved), + **stat(comments), + **stat(submissions), + **stat(upvoted), + } + diff --git a/my/reddit/common.py b/my/reddit/common.py new file mode 100644 index 0000000..767d3a1 --- /dev/null +++ b/my/reddit/common.py @@ -0,0 +1,83 @@ +""" +This defines Protocol classes, which make sure that each different +type of Comment/Save have a standard interface +""" + +from typing import Dict, Any, Set, Iterator +from itertools import chain +from datetime import datetime + +Json = Dict[str, Any] + +try: + from typing import Protocol +except ImportError: + # hmm -- does this need to be installed on 3.6 or is it already here? + from typing_extensions import Protocol # type: ignore[misc] + +# Note: doesn't include GDPR Save's since they don't have the same metadata +class Save(Protocol): + created: datetime + title: str + raw: Json + + @property + def sid(self) -> str: ... + @property + def url(self) -> str: ... + @property + def text(self) -> str: ... + @property + def subreddit(self) -> str: ... + +# Note: doesn't include GDPR Upvote's since they don't have the same metadata +class Upvote(Protocol): + raw: Json + @property + def created(self) -> datetime: ... + @property + def url(self) -> str: ... + @property + def text(self) -> str: ... + @property + def title(self) -> str: ... + + +# From rexport, pushshift and the reddit gdpr export +class Comment(Protocol): + raw: Json + @property + def created(self) -> datetime: ... + @property + def url(self) -> str: ... + @property + def text(self) -> str: ... + + +# From rexport and the gdpr export +class Submission(Protocol): + raw: Json + @property + def created(self) -> datetime: ... + @property + def url(self) -> str: ... + @property + def text(self) -> str: ... + @property + def title(self) -> str: ... + + +def _merge_comments(*sources: Iterator[Comment]) -> Iterator[Comment]: + #from .rexport import logger + #ignored = 0 + emitted: Set[int] = set() + for e in chain(*sources): + key = int(e.raw["created_utc"]) + if key in emitted: + #ignored += 1 + #logger.info('ignoring %s: %s', key, e) + continue + yield e + emitted.add(key) + #logger.info(f"Ignored {ignored} comments...") + diff --git a/my/reddit/pushshift.py b/my/reddit/pushshift.py new file mode 100644 index 0000000..df1fd1e --- /dev/null +++ b/my/reddit/pushshift.py @@ -0,0 +1,45 @@ +""" +Gives you access to older comments possibly not accessible with rexport +using pushshift +See https://github.com/seanbreckenridge/pushshift_comment_export +""" + +REQUIRES = [ + "git+https://github.com/seanbreckenridge/pushshift_comment_export", +] + +from my.core.common import Paths, Stats +from dataclasses import dataclass +from my.core.cfg import make_config + +from my.config import reddit as uconfig + +@dataclass +class pushshift_config(uconfig.pushshift): + export_path: Paths + +config = make_config(pushshift_config) + +from my.core import get_files +from typing import Sequence, Iterator +from pathlib import Path +from .common import Comment + +from pushshift_comment_export.dal import read_file, PComment + + +def inputs() -> Sequence[Path]: + return get_files(config.export_path) + + +def comments() -> Iterator[PComment]: + for f in inputs(): + yield from read_file(f) + +def stats() -> Stats: + from my.core import stat + return { + **stat(comments) + } + + diff --git a/my/reddit.py b/my/reddit/rexport.py similarity index 85% rename from my/reddit.py rename to my/reddit/rexport.py index bbafe92..32b1f6f 100755 --- a/my/reddit.py +++ b/my/reddit/rexport.py @@ -5,10 +5,31 @@ REQUIRES = [ 'git+https://github.com/karlicoss/rexport', ] -from .core.common import Paths +from my.core.common import Paths +from dataclasses import dataclass from my.config import reddit as uconfig -from dataclasses import dataclass + + +# hmm -- since this was previously just using +# uconfig, we can't have this inherit from +# uconfig.rexport in the dataclass definition here +# since then theres no way to get old attributes +# in the migration + +# need to check before we subclass +if hasattr(uconfig, "rexport"): + # sigh... backwards compatability + uconfig = uconfig.rexport # type: ignore[attr-defined,misc,assignment] +else: + from my.core.warnings import high + high(f"""DEPRECATED! Please modify your reddit config to look like: + +class reddit: + class rexport: + export_path: Paths = '/path/to/rexport/data' + """) + @dataclass class reddit(uconfig): @@ -20,15 +41,16 @@ class reddit(uconfig): export_path: Paths -from .core.cfg import make_config, Attrs +from my.core.cfg import make_config, Attrs # hmm, also nice thing about this is that migration is possible to test without the rest of the config? def migration(attrs: Attrs) -> Attrs: export_dir = 'export_dir' if export_dir in attrs: # legacy name attrs['export_path'] = attrs[export_dir] - from .core.warnings import high + from my.core.warnings import high high(f'"{export_dir}" is deprecated! Please use "export_path" instead."') return attrs + config = make_config(reddit, migration=migration) ### @@ -37,7 +59,7 @@ config = make_config(reddit, migration=migration) try: from rexport import dal except ModuleNotFoundError as e: - from .core.compat import pre_pip_dal_handler + from my.core.compat import pre_pip_dal_handler dal = pre_pip_dal_handler('rexport', e, config, requires=REQUIRES) # TODO ugh. this would import too early # but on the other hand we do want to bring the objects into the scope for easier imports, etc. ugh! @@ -47,8 +69,8 @@ except ModuleNotFoundError as e: ############################ -from typing import List, Sequence, Mapping, Iterator -from .core.common import mcachew, get_files, LazyLogger, make_dict +from typing import List, Sequence, Mapping, Iterator, Any +from my.core.common import mcachew, get_files, LazyLogger, make_dict, Stats logger = LazyLogger(__name__, level='debug') @@ -59,7 +81,7 @@ def inputs() -> Sequence[Path]: return get_files(config.export_path) -Sid = dal.Sid +Sid = dal.Sid # str Save = dal.Save Comment = dal.Comment Submission = dal.Submission @@ -69,7 +91,7 @@ Upvote = dal.Upvote def _dal() -> dal.DAL: inp = list(inputs()) return dal.DAL(inp) -cache = mcachew(hashf=inputs) # depends on inputs only +cache = mcachew(depends_on=inputs) # depends on inputs only @cache @@ -213,8 +235,8 @@ def events(*args, **kwargs) -> List[Event]: return list(sorted(evit, key=lambda e: e.cmp_key)) # type: ignore[attr-defined,arg-type] -def stats(): - from .core import stat +def stats() -> Stats: + from my.core import stat return { **stat(saved ), **stat(comments ), @@ -223,9 +245,6 @@ def stats(): } -## - - def main() -> None: for e in events(parallel=False): print(e) @@ -234,7 +253,3 @@ def main() -> None: if __name__ == '__main__': main() -# TODO deprecate... - -get_sources = inputs -get_events = events