diff --git a/my/github/ghexport.py b/my/github/ghexport.py index faf4de4..cfa7b99 100644 --- a/my/github/ghexport.py +++ b/my/github/ghexport.py @@ -5,10 +5,8 @@ REQUIRES = [ 'git+https://github.com/karlicoss/ghexport', ] from dataclasses import dataclass -from typing import Optional - -from ..core import Paths +from my.core import Paths from my.config import github as user_config @@ -23,12 +21,12 @@ class github(user_config): ### -from ..core.cfg import make_config, Attrs +from my.core.cfg import make_config, Attrs def migration(attrs: Attrs) -> Attrs: export_dir = 'export_dir' if export_dir in attrs: # legacy name attrs['export_path'] = attrs[export_dir] - from ..core.warnings import high + from my.core.warnings import high high(f'"{export_dir}" is deprecated! Please use "export_path" instead."') return attrs config = make_config(github, migration=migration) @@ -37,19 +35,23 @@ config = make_config(github, migration=migration) try: from ghexport import dal except ModuleNotFoundError as e: - from ..core.compat import pre_pip_dal_handler + from my.core.compat import pre_pip_dal_handler dal = pre_pip_dal_handler('ghexport', e, config, requires=REQUIRES) ############################ -from typing import Tuple, Dict, Sequence +from functools import lru_cache +from typing import Tuple, Dict, Sequence, Optional -from ..core import get_files, Path -from ..core.common import mcachew +from my.core import get_files, Path, LazyLogger +from my.core.common import mcachew from .common import Event, parse_dt, Results +logger = LazyLogger(__name__) + + def inputs() -> Sequence[Path]: return get_files(config.export_path) @@ -66,20 +68,30 @@ def events() -> Results: for d in dal.events(): if isinstance(d, Exception): yield d - else: + try: yield _parse_event(d) + except Exception as e: + yield e -from ..core import stat, Stats +from my.core import stat, Stats def stats() -> Stats: return { **stat(events), } +@lru_cache(None) +def _log_if_unhandled(e) -> None: + logger.warning('unhandled event type: %s', e) + + # TODO hmm. need some sort of abstract syntax for this... # TODO split further, title too -def _get_summary(e) -> Tuple[str, Optional[str], Optional[str]]: +Link = str +EventId = str +Body = str +def _get_summary(e) -> Tuple[str, Optional[Link], Optional[EventId], Optional[Body]]: # TODO would be nice to give access to raw event within timeline eid = e['id'] tp = e['type'] @@ -93,58 +105,87 @@ def _get_summary(e) -> Tuple[str, Optional[str], Optional[str]]: if tp == 'ForkEvent': url = e['payload']['forkee']['html_url'] - return f"{rname}: forked", url, None + return f"{rname}: forked", url, None, None elif tp == 'PushEvent': commits = pl['commits'] messages = [c['message'] for c in commits] + ref = pl['ref'] body = '\n'.join(messages) - return f"{rname}: pushed\n{body}", None, None + eid = f'{tp}_{e["id"]}' + return f'{rname}: pushed {len(commits)} commits to {ref}', None, eid, body elif tp == 'WatchEvent': - return f"{rname}: watching", None, None + return f"{rname}: watching", None, None, None elif tp in mapping: what = mapping[tp] rt = pl['ref_type'] ref = pl['ref'] # TODO link to branch? only contains weird API link though # TODO hmm. include timestamp instead? - # breakpoint() # TODO combine automatically instead - return f"{rname}: {what} {rt} {ref}", None, f'{rname}_{what}_{rt}_{ref}_{eid}' + return f"{rname}: {what} {rt} {ref}", None, f'{rname}_{what}_{rt}_{ref}_{eid}', None elif tp == 'PullRequestEvent': pr = pl['pull_request'] - action = pl['action'] - link = pr['html_url'] title = pr['title'] - return f"{rname}: {action} PR {title}", link, f'{rname}_{action}_pr_{link}' - elif tp == "IssuesEvent": + + link = pr['html_url'] + body = pr['body'] + action = pl['action'] + return f"{rname}: {action} PR: {title}", link, f'{rname}_{action}_pr_{link}', body + elif tp == 'PullRequestReviewEvent': + pr = pl['pull_request'] + title = pr['title'] + + rev = pl['review'] + link = rev['html_url'] + body = rev['body'] + eid = f'{tp}_{rev["id"]}' + return f'{rname}: reviewed PR: {title}', link, eid, body + elif tp == 'PullRequestReviewCommentEvent': + pr = pl['pull_request'] + title = pr['title'] + + comment = pl['comment'] + link = comment['html_url'] + body = comment['body'] + eid = f'{tp}_{comment["id"]}' + return f'{rname}: commented on PR: {title}', link, eid, body + elif tp == 'CommitCommentEvent': + comment = pl['comment'] + link = comment['html_url'] + body = comment['body'] + eid = f'{tp}_{comment["id"]}' + cid = comment['commit_id'] + return f'{rname} commented on commit: {cid}', link, eid, body + elif tp == 'IssuesEvent': action = pl['action'] iss = pl['issue'] - link = iss['html_url'] + link = iss['html_url'] + body = iss['body'] title = iss['title'] - return f"{rname}: {action} issue {title}", link, None - elif tp == "IssueCommentEvent": - com = pl['comment'] - link = com['html_url'] + return f'{rname}: {action} issue: {title}', link, None, body + elif tp == 'IssueCommentEvent': + comment = pl['comment'] + link = comment['html_url'] + body = comment['body'] iss = pl['issue'] title = iss['title'] - return f"{rname}: commented on issue {title}", link, 'issue_comment_' + link - elif tp == "ReleaseEvent": + return f'{rname}: commented on issue: {title}', link, 'issue_comment_' + link, body + elif tp == 'ReleaseEvent': action = pl['action'] rel = pl['release'] tag = rel['tag_name'] link = rel['html_url'] - return f"{rname}: {action} [{tag}]", link, None - elif tp in 'PublicEvent': - return f'{tp} {e}', None, None # TODO ??? + body = rel['body'] + return f"{rname}: {action} [{tag}]", link, None, body else: - return tp, None, None + _log_if_unhandled(tp) + return tp, None, None, None def _parse_event(d: Dict) -> Event: - summary, link, eid = _get_summary(d) + summary, link, eid, body = _get_summary(d) if eid is None: - eid = d['id'] - body = d.get('payload', {}).get('comment', {}).get('body') + eid = d['id'] # meh return Event( dt=parse_dt(d['created_at']), summary=summary,