initial pushshift/rexport merge implementation

This commit is contained in:
Sean Breckenridge 2021-10-28 11:28:00 -07:00
parent b54ec0d7f1
commit 5933711888
8 changed files with 259 additions and 20 deletions

14
my/reddit/__init__.py Normal file
View file

@ -0,0 +1,14 @@
"""
This is here temporarily, for backwards compatability purposes
It should be removed in the future, and you should replace any imports
like:
from my.reddit import ...
to:
from my.reddit.all import ...
since that allows for easier overriding using namespace packages
https://github.com/karlicoss/HPI/issues/102
"""
# TODO: add warning here
from .rexport import *

51
my/reddit/all.py Normal file
View file

@ -0,0 +1,51 @@
from typing import Iterator, Any, Callable, TypeVar
from my.core.source import import_source_iter as imp
from .common import Save, Upvote, Comment, Submission, _merge_comments
# Man... ideally an all.py file isn't this verbose, but
# reddit just feels like that much of a complicated source and
# data acquired by different methods isn't the same
### import helpers
# this import error is caught in import_source_iter, if rexport isn't installed
def _rexport_import() -> Any:
from . import rexport as source
return source
def _rexport_comments() -> Iterator[Comment]:
yield from imp(_rexport_import().comments)
def _pushshift_import() -> Any:
from . import pushshift as source
return source
def _pushshift_comments() -> Iterator[Comment]:
yield from imp(_pushshift_import().comments)
# Merged functions
def comments() -> Iterator[Comment]:
# TODO: merge gdpr here
yield from _merge_comments(_rexport_comments(), _pushshift_comments())
def submissions() -> Iterator[Submission]:
# TODO: merge gdpr here
yield from imp(lambda: _rexport_import().submissions())
def saved() -> Iterator[Save]:
yield from imp(lambda: _rexport_import().saved())
def upvoted() -> Iterator[Upvote]:
yield from imp(lambda: _rexport_import().upvoted())
def stats() -> Stats:
from my.core import stat
return {
**stat(saved),
**stat(comments),
**stat(submissions),
**stat(upvoted),
}

83
my/reddit/common.py Normal file
View file

@ -0,0 +1,83 @@
"""
This defines Protocol classes, which make sure that each different
type of Comment/Save have a standard interface
"""
from typing import Dict, Any, Set, Iterator
from itertools import chain
from datetime import datetime
Json = Dict[str, Any]
try:
from typing import Protocol
except ImportError:
# hmm -- does this need to be installed on 3.6 or is it already here?
from typing_extensions import Protocol # type: ignore[misc]
# Note: doesn't include GDPR Save's since they don't have the same metadata
class Save(Protocol):
created: datetime
title: str
raw: Json
@property
def sid(self) -> str: ...
@property
def url(self) -> str: ...
@property
def text(self) -> str: ...
@property
def subreddit(self) -> str: ...
# Note: doesn't include GDPR Upvote's since they don't have the same metadata
class Upvote(Protocol):
raw: Json
@property
def created(self) -> datetime: ...
@property
def url(self) -> str: ...
@property
def text(self) -> str: ...
@property
def title(self) -> str: ...
# From rexport, pushshift and the reddit gdpr export
class Comment(Protocol):
raw: Json
@property
def created(self) -> datetime: ...
@property
def url(self) -> str: ...
@property
def text(self) -> str: ...
# From rexport and the gdpr export
class Submission(Protocol):
raw: Json
@property
def created(self) -> datetime: ...
@property
def url(self) -> str: ...
@property
def text(self) -> str: ...
@property
def title(self) -> str: ...
def _merge_comments(*sources: Iterator[Comment]) -> Iterator[Comment]:
#from .rexport import logger
#ignored = 0
emitted: Set[int] = set()
for e in chain(*sources):
key = int(e.raw["created_utc"])
if key in emitted:
#ignored += 1
#logger.info('ignoring %s: %s', key, e)
continue
yield e
emitted.add(key)
#logger.info(f"Ignored {ignored} comments...")

45
my/reddit/pushshift.py Normal file
View file

@ -0,0 +1,45 @@
"""
Gives you access to older comments possibly not accessible with rexport
using pushshift
See https://github.com/seanbreckenridge/pushshift_comment_export
"""
REQUIRES = [
"git+https://github.com/seanbreckenridge/pushshift_comment_export",
]
from my.core.common import Paths, Stats
from dataclasses import dataclass
from my.core.cfg import make_config
from my.config import reddit as uconfig
@dataclass
class pushshift_config(uconfig.pushshift):
export_path: Paths
config = make_config(pushshift_config)
from my.core import get_files
from typing import Sequence, Iterator
from pathlib import Path
from .common import Comment
from pushshift_comment_export.dal import read_file, PComment
def inputs() -> Sequence[Path]:
return get_files(config.export_path)
def comments() -> Iterator[PComment]:
for f in inputs():
yield from read_file(f)
def stats() -> Stats:
from my.core import stat
return {
**stat(comments)
}

255
my/reddit/rexport.py Executable file
View file

@ -0,0 +1,255 @@
"""
Reddit data: saved items/comments/upvotes/etc.
"""
REQUIRES = [
'git+https://github.com/karlicoss/rexport',
]
from my.core.common import Paths
from dataclasses import dataclass
from my.config import reddit as uconfig
# hmm -- since this was previously just using
# uconfig, we can't have this inherit from
# uconfig.rexport in the dataclass definition here
# since then theres no way to get old attributes
# in the migration
# need to check before we subclass
if hasattr(uconfig, "rexport"):
# sigh... backwards compatability
uconfig = uconfig.rexport # type: ignore[attr-defined,misc,assignment]
else:
from my.core.warnings import high
high(f"""DEPRECATED! Please modify your reddit config to look like:
class reddit:
class rexport:
export_path: Paths = '/path/to/rexport/data'
""")
@dataclass
class reddit(uconfig):
'''
Uses [[https://github.com/karlicoss/rexport][rexport]] output.
'''
# path[s]/glob to the exported JSON data
export_path: Paths
from my.core.cfg import make_config, Attrs
# hmm, also nice thing about this is that migration is possible to test without the rest of the config?
def migration(attrs: Attrs) -> Attrs:
export_dir = 'export_dir'
if export_dir in attrs: # legacy name
attrs['export_path'] = attrs[export_dir]
from my.core.warnings import high
high(f'"{export_dir}" is deprecated! Please use "export_path" instead."')
return attrs
config = make_config(reddit, migration=migration)
###
# TODO not sure about the laziness...
try:
from rexport import dal
except ModuleNotFoundError as e:
from my.core.compat import pre_pip_dal_handler
dal = pre_pip_dal_handler('rexport', e, config, requires=REQUIRES)
# TODO ugh. this would import too early
# but on the other hand we do want to bring the objects into the scope for easier imports, etc. ugh!
# ok, fair enough I suppose. It makes sense to configure something before using it. can always figure it out later..
# maybe, the config could dynamically detect change and reimport itself? dunno.
###
############################
from typing import List, Sequence, Mapping, Iterator, Any
from my.core.common import mcachew, get_files, LazyLogger, make_dict, Stats
logger = LazyLogger(__name__, level='debug')
from pathlib import Path
def inputs() -> Sequence[Path]:
return get_files(config.export_path)
Sid = dal.Sid # str
Save = dal.Save
Comment = dal.Comment
Submission = dal.Submission
Upvote = dal.Upvote
def _dal() -> dal.DAL:
inp = list(inputs())
return dal.DAL(inp)
cache = mcachew(depends_on=inputs) # depends on inputs only
@cache
def saved() -> Iterator[Save]:
return _dal().saved()
@cache
def comments() -> Iterator[Comment]:
return _dal().comments()
@cache
def submissions() -> Iterator[Submission]:
return _dal().submissions()
@cache
def upvoted() -> Iterator[Upvote]:
return _dal().upvoted()
### the rest of the file is some elaborate attempt of restoring favorite/unfavorite times
from typing import Dict, Iterable, Iterator, NamedTuple
from functools import lru_cache
import pytz
import re
from datetime import datetime
from multiprocessing import Pool
# TODO hmm. apparently decompressing takes quite a bit of time...
class SaveWithDt(NamedTuple):
save: Save
backup_dt: datetime
def __getattr__(self, x):
return getattr(self.save, x)
# TODO for future events?
EventKind = SaveWithDt
class Event(NamedTuple):
dt: datetime
text: str
kind: EventKind
eid: str
title: str
url: str
@property
def cmp_key(self):
return (self.dt, (1 if 'unfavorited' in self.text else 0))
Url = str
def _get_bdate(bfile: Path) -> datetime:
RE = re.compile(r'reddit.(\d{14})')
stem = bfile.stem
stem = stem.replace('T', '').replace('Z', '') # adapt for arctee
match = RE.search(stem)
assert match is not None
bdt = pytz.utc.localize(datetime.strptime(match.group(1), "%Y%m%d%H%M%S"))
return bdt
def _get_state(bfile: Path) -> Dict[Sid, SaveWithDt]:
logger.debug('handling %s', bfile)
bdt = _get_bdate(bfile)
saves = [SaveWithDt(save, bdt) for save in dal.DAL([bfile]).saved()]
return make_dict(
sorted(saves, key=lambda p: p.save.created),
key=lambda s: s.save.sid,
)
# TODO hmm. think about it.. if we set default backups=inputs()
# it's called early so it ends up as a global variable that we can't monkey patch easily
@mcachew
def _get_events(backups: Sequence[Path], parallel: bool=True) -> Iterator[Event]:
# todo cachew: let it transform return type? so you don't have to write a wrapper for lists?
prev_saves: Mapping[Sid, SaveWithDt] = {}
# TODO suppress first batch??
# TODO for initial batch, treat event time as creation time
states: Iterable[Mapping[Sid, SaveWithDt]]
if parallel:
with Pool() as p:
states = p.map(_get_state, backups)
else:
# also make it lazy...
states = map(_get_state, backups)
# TODO mm, need to make that iterative too?
for i, (bfile, saves) in enumerate(zip(backups, states)):
bdt = _get_bdate(bfile)
first = i == 0
for key in set(prev_saves.keys()).symmetric_difference(set(saves.keys())):
ps = prev_saves.get(key, None)
if ps is not None:
# TODO use backup date, that is more precise...
# eh. I guess just take max and it will always be correct?
assert not first
yield Event(
dt=bdt, # TODO average wit ps.save_dt?
text="unfavorited",
kind=ps,
eid=f'unf-{ps.sid}',
url=ps.url,
title=ps.title,
)
else: # already in saves
s = saves[key]
last_saved = s.backup_dt
yield Event(
dt=s.created if first else last_saved,
text=f"favorited{' [initial]' if first else ''}",
kind=s,
eid=f'fav-{s.sid}',
url=s.url,
title=s.title,
)
prev_saves = saves
# TODO a bit awkward, favorited should compare lower than unfavorited?
@lru_cache(1)
def events(*args, **kwargs) -> List[Event]:
inp = inputs()
# 2.2s for 300 files without cachew
# 0.2s for 300 files with cachew
evit = _get_events(inp, *args, **kwargs) # type: ignore[call-arg]
# todo mypy is confused here and thinks it's iterable of Path? perhaps something to do with mcachew?
return list(sorted(evit, key=lambda e: e.cmp_key)) # type: ignore[attr-defined,arg-type]
def stats() -> Stats:
from my.core import stat
return {
**stat(saved ),
**stat(comments ),
**stat(submissions),
**stat(upvoted ),
}
def main() -> None:
for e in events(parallel=False):
print(e)
if __name__ == '__main__':
main()