my.reddit: refactor into module that supports pushshift/gdpr (#179)

* initial pushshift/rexport merge implementation, using id for merging
* smarter module deprecation warning using regex
* add `RedditBase` from promnesia
* `import_source` helper for gracefully handing mixin data sources
This commit is contained in:
Sean Breckenridge 2021-10-31 13:39:04 -07:00 committed by GitHub
parent b54ec0d7f1
commit 8422c6e420
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 374 additions and 58 deletions

View file

@ -34,7 +34,12 @@ class github:
export_path: Paths = ''
class reddit:
export_path: Paths = ''
class rexport:
export_path: Paths = ''
class pushshift:
export_path: Paths = ''
class gdpr:
export_path: Paths = ''
class endomondo:
export_path: Paths = ''

View file

@ -10,7 +10,7 @@ C = TypeVar('C')
def make_config(cls: Type[C], migration: Callable[[Attrs], Attrs]=lambda x: x) -> C:
user_config = cls.__base__
old_props = {
# NOTE: deliberately use gettatr to 'force' lcass properties here
# NOTE: deliberately use gettatr to 'force' class properties here
k: getattr(user_config, k) for k in vars(user_config)
}
new_props = migration(old_props)

63
my/core/source.py Normal file
View file

@ -0,0 +1,63 @@
"""
Decorator to gracefully handle importing a data source, or warning
and yielding nothing (or a default) when its not available
"""
from typing import Any, Iterator, TypeVar, Callable, Optional, Iterable, Any
from my.core.warnings import warn
from functools import wraps
# The factory function may produce something that has data
# similar to the shared model, but not exactly, so not
# making this a TypeVar, is just to make reading the
# type signature below a bit easier...
T = Any
# https://mypy.readthedocs.io/en/latest/generics.html?highlight=decorators#decorator-factories
FactoryF = TypeVar("FactoryF", bound=Callable[..., Iterator[T]])
_DEFUALT_ITR = ()
# tried to use decorator module but it really doesn't work well
# with types and kw-arguments... :/
def import_source(
default: Iterable[T] = _DEFUALT_ITR,
module_name: Optional[str] = None,
) -> Callable[..., Callable[..., Iterator[T]]]:
"""
doesn't really play well with types, but is used to catch
ModuleNotFoundError's for when modules aren't installed in
all.py files, so the types don't particularly matter
this is meant to be used to wrap some function which imports
and then yields an iterator of objects
If the user doesn't have that module installed, it returns
nothing and warns instead
"""
def decorator(factory_func: FactoryF) -> Callable[..., Iterator[T]]:
@wraps(factory_func)
def wrapper(*args, **kwargs) -> Iterator[T]:
try:
res = factory_func(**kwargs)
yield from res
except ModuleNotFoundError:
from . import core_config as CC
suppressed_in_conf = False
if module_name is not None and CC.config._is_module_active(module_name) is False:
suppressed_in_conf = True
if not suppressed_in_conf:
if module_name is None:
warn(f"Module {factory_func.__qualname__} could not be imported, or isn't configured propertly")
else:
warn(f"""Module {module_name} ({factory_func.__qualname__}) could not be imported, or isn't configured propertly\nTo hide this message, add {module_name} to your core config disabled_classes, like:
class core:
disabled_modules = [{repr(module_name)}]
""")
yield from default
return wrapper
return decorator

41
my/reddit/__init__.py Normal file
View file

@ -0,0 +1,41 @@
"""
This is here temporarily, for backwards compatability purposes
It should be removed in the future, and you should replace any imports
like:
from my.reddit import ...
to:
from my.reddit.all import ...
since that allows for easier overriding using namespace packages
https://github.com/karlicoss/HPI/issues/102
"""
# For now, including this here, since importing the module
# causes .rexport to be imported, which requires rexport
REQUIRES = [
'git+https://github.com/karlicoss/rexport',
]
import re
import traceback
# some hacky traceback to inspect the current stack
# to see if the user is using the old style of importing
warn = False
for f in traceback.extract_stack():
line = f.line or '' # just in case it's None, who knows..
# cover the most common ways of previously interacting with the module
if 'import my.reddit ' in (line + ' '):
warn = True
elif 'from my import reddit' in line:
warn = True
elif re.match(r"from my\.reddit\simport\s(comments|saved|submissions|upvoted)", line):
warn = True
# TODO: add link to instructions to migrate
if warn:
from my.core import warnings as W
W.high("DEPRECATED! Instead of my.reddit, import from my.reddit.all instead.")
from .rexport import *

68
my/reddit/all.py Normal file
View file

@ -0,0 +1,68 @@
from typing import Iterator
from my.core.common import Stats
from my.core.source import import_source
from .common import Save, Upvote, Comment, Submission, _merge_comments
# Man... ideally an all.py file isn't this verbose, but
# reddit just feels like that much of a complicated source and
# data acquired by different methods isn't the same
### 'safe importers' -- falls back to empty data if the module couldn't be found
rexport_src = import_source(module_name="my.reddit.rexport")
pushshift_src = import_source(module_name="my.reddit.pushshift")
@rexport_src
def _rexport_comments() -> Iterator[Comment]:
from . import rexport
yield from rexport.comments()
@rexport_src
def _rexport_submissions() -> Iterator[Submission]:
from . import rexport
yield from rexport.submissions()
@rexport_src
def _rexport_saved() -> Iterator[Save]:
from . import rexport
yield from rexport.saved()
@rexport_src
def _rexport_upvoted() -> Iterator[Upvote]:
from . import rexport
yield from rexport.upvoted()
@pushshift_src
def _pushshift_comments() -> Iterator[Comment]:
from .pushshift import comments as pcomments
yield from pcomments()
# Merged functions
def comments() -> Iterator[Comment]:
# TODO: merge gdpr here
yield from _merge_comments(_rexport_comments(), _pushshift_comments())
def submissions() -> Iterator[Submission]:
# TODO: merge gdpr here
yield from _rexport_submissions()
@rexport_src
def saved() -> Iterator[Save]:
from .rexport import saved
yield from saved()
@rexport_src
def upvoted() -> Iterator[Upvote]:
from .rexport import upvoted
yield from upvoted()
def stats() -> Stats:
from my.core import stat
return {
**stat(saved),
**stat(comments),
**stat(submissions),
**stat(upvoted),
}

72
my/reddit/common.py Normal file
View file

@ -0,0 +1,72 @@
"""
This defines Protocol classes, which make sure that each different
type of shared models have a standardized interface
"""
from typing import Dict, Any, Set, Iterator, TYPE_CHECKING
from itertools import chain
from my.core.common import datetime_aware
Json = Dict[str, Any]
if TYPE_CHECKING:
try:
from typing import Protocol
except ImportError:
# requirement of mypy
from typing_extensions import Protocol # type: ignore[misc]
else:
Protocol = object
# common fields across all the Protocol classes, so generic code can be written
class RedditBase(Protocol):
@property
def raw(self) -> Json: ...
@property
def created(self) -> datetime_aware: ...
@property
def id(self) -> str: ...
@property
def url(self) -> str: ...
@property
def text(self) -> str: ...
# Note: doesn't include GDPR Save's since they don't have the same metadata
class Save(Protocol, RedditBase):
@property
def subreddit(self) -> str: ...
# Note: doesn't include GDPR Upvote's since they don't have the same metadata
class Upvote(Protocol, RedditBase):
@property
def title(self) -> str: ...
# From rexport, pushshift and the reddit GDPR export
class Comment(Protocol, RedditBase):
pass
# From rexport and the GDPR export
class Submission(Protocol, RedditBase):
@property
def title(self) -> str: ...
def _merge_comments(*sources: Iterator[Comment]) -> Iterator[Comment]:
#from .rexport import logger
#ignored = 0
emitted: Set[str] = set()
for e in chain(*sources):
uid = e.id
if uid in emitted:
#ignored += 1
#logger.info('ignoring %s: %s', uid, e)
continue
yield e
emitted.add(uid)
#logger.info(f"Ignored {ignored} comments...")

48
my/reddit/pushshift.py Normal file
View file

@ -0,0 +1,48 @@
"""
Gives you access to older comments possibly not accessible with rexport
using pushshift
See https://github.com/seanbreckenridge/pushshift_comment_export
"""
REQUIRES = [
"git+https://github.com/seanbreckenridge/pushshift_comment_export",
]
from my.core.common import Paths, Stats
from dataclasses import dataclass
from my.core.cfg import make_config
from my.config import reddit as uconfig
@dataclass
class pushshift_config(uconfig.pushshift):
'''
Uses [[https://github.com/seanbreckenridge/pushshift_comment_export][pushshift]] to get access to old comments
'''
# path[s]/glob to the exported JSON data
export_path: Paths
config = make_config(pushshift_config)
from my.core import get_files
from typing import Sequence, Iterator
from pathlib import Path
from pushshift_comment_export.dal import read_file, PComment
def inputs() -> Sequence[Path]:
return get_files(config.export_path)
def comments() -> Iterator[PComment]:
for f in inputs():
yield from read_file(f)
def stats() -> Stats:
from my.core import stat
return {
**stat(comments)
}

View file

@ -5,10 +5,12 @@ REQUIRES = [
'git+https://github.com/karlicoss/rexport',
]
from .core.common import Paths
from my.core.common import Paths
from dataclasses import dataclass
from typing import Any
from my.config import reddit as uconfig
from dataclasses import dataclass
@dataclass
class reddit(uconfig):
@ -20,15 +22,27 @@ class reddit(uconfig):
export_path: Paths
from .core.cfg import make_config, Attrs
from my.core.cfg import make_config, Attrs
# hmm, also nice thing about this is that migration is possible to test without the rest of the config?
def migration(attrs: Attrs) -> Attrs:
export_dir = 'export_dir'
if export_dir in attrs: # legacy name
attrs['export_path'] = attrs[export_dir]
from .core.warnings import high
high(f'"{export_dir}" is deprecated! Please use "export_path" instead."')
# new structure, take top-level config and extract 'rexport' class
if 'rexport' in attrs:
ex: uconfig.rexport = attrs['rexport']
attrs['export_path'] = ex.export_path
else:
from my.core.warnings import high
high("""DEPRECATED! Please modify your reddit config to look like:
class reddit:
class rexport:
export_path: Paths = '/path/to/rexport/data'
""")
export_dir = 'export_dir'
if export_dir in attrs: # legacy name
attrs['export_path'] = attrs[export_dir]
high(f'"{export_dir}" is deprecated! Please use "export_path" instead."')
return attrs
config = make_config(reddit, migration=migration)
###
@ -37,7 +51,7 @@ config = make_config(reddit, migration=migration)
try:
from rexport import dal
except ModuleNotFoundError as e:
from .core.compat import pre_pip_dal_handler
from my.core.compat import pre_pip_dal_handler
dal = pre_pip_dal_handler('rexport', e, config, requires=REQUIRES)
# TODO ugh. this would import too early
# but on the other hand we do want to bring the objects into the scope for easier imports, etc. ugh!
@ -47,8 +61,8 @@ except ModuleNotFoundError as e:
############################
from typing import List, Sequence, Mapping, Iterator
from .core.common import mcachew, get_files, LazyLogger, make_dict
from typing import List, Sequence, Mapping, Iterator, Any
from my.core.common import mcachew, get_files, LazyLogger, make_dict, Stats
logger = LazyLogger(__name__, level='debug')
@ -59,7 +73,7 @@ def inputs() -> Sequence[Path]:
return get_files(config.export_path)
Sid = dal.Sid
Uid = dal.Sid # str
Save = dal.Save
Comment = dal.Comment
Submission = dal.Submission
@ -69,7 +83,7 @@ Upvote = dal.Upvote
def _dal() -> dal.DAL:
inp = list(inputs())
return dal.DAL(inp)
cache = mcachew(hashf=inputs) # depends on inputs only
cache = mcachew(depends_on=inputs) # depends on inputs only
@cache
@ -139,7 +153,7 @@ def _get_bdate(bfile: Path) -> datetime:
return bdt
def _get_state(bfile: Path) -> Dict[Sid, SaveWithDt]:
def _get_state(bfile: Path) -> Dict[Uid, SaveWithDt]:
logger.debug('handling %s', bfile)
bdt = _get_bdate(bfile)
@ -156,11 +170,11 @@ def _get_state(bfile: Path) -> Dict[Sid, SaveWithDt]:
def _get_events(backups: Sequence[Path], parallel: bool=True) -> Iterator[Event]:
# todo cachew: let it transform return type? so you don't have to write a wrapper for lists?
prev_saves: Mapping[Sid, SaveWithDt] = {}
prev_saves: Mapping[Uid, SaveWithDt] = {}
# TODO suppress first batch??
# TODO for initial batch, treat event time as creation time
states: Iterable[Mapping[Sid, SaveWithDt]]
states: Iterable[Mapping[Uid, SaveWithDt]]
if parallel:
with Pool() as p:
states = p.map(_get_state, backups)
@ -213,8 +227,8 @@ def events(*args, **kwargs) -> List[Event]:
return list(sorted(evit, key=lambda e: e.cmp_key)) # type: ignore[attr-defined,arg-type]
def stats():
from .core import stat
def stats() -> Stats:
from my.core import stat
return {
**stat(saved ),
**stat(comments ),
@ -223,9 +237,6 @@ def stats():
}
##
def main() -> None:
for e in events(parallel=False):
print(e)
@ -234,7 +245,3 @@ def main() -> None:
if __name__ == '__main__':
main()
# TODO deprecate...
get_sources = inputs
get_events = events