Merge pull request #61 from karlicoss/updates
github module: cleanup and proper modular layout
This commit is contained in:
commit
a94b64c273
15 changed files with 453 additions and 284 deletions
|
@ -25,6 +25,8 @@ If you have some issues with the setup, see [[file:SETUP.org::#troubleshooting][
|
|||
- [[#mylastfm][my.lastfm]]
|
||||
- [[#myreadingpolar][my.reading.polar]]
|
||||
- [[#myinstapaper][my.instapaper]]
|
||||
- [[#mygithubgdpr][my.github.gdpr]]
|
||||
- [[#mygithubghexport][my.github.ghexport]]
|
||||
:END:
|
||||
|
||||
* Intro
|
||||
|
@ -44,7 +46,9 @@ Some explanations:
|
|||
- =/a/path/to/directory/=, so the module will consume all files from this directory
|
||||
- a list of files/directories (it will be flattened)
|
||||
- a [[https://docs.python.org/3/library/glob.html?highlight=glob#glob.glob][glob]] string, so you can be flexible about the format of your data on disk (e.g. if you want to keep it compressed)
|
||||
- empty sequence (e.g. ~export_path = ()~), this is useful for modules that merge multiple data sources (for example, =my.twitter=)
|
||||
- empty string (e.g. ~export_path = ''~), this will prevent the module from consuming any data
|
||||
|
||||
This can be useful for modules that merge multiple data sources (for example, =my.twitter= or =my.github=)
|
||||
|
||||
Typically, such variable will be passed to =get_files= to actually extract the list of real files to use. You can see usage examples [[https://github.com/karlicoss/HPI/blob/master/tests/get_files.py][here]].
|
||||
|
||||
|
@ -74,6 +78,8 @@ modules = [
|
|||
('lastfm' , 'my.lastfm' ),
|
||||
('polar' , 'my.reading.polar' ),
|
||||
('instapaper' , 'my.instapaper' ),
|
||||
('github' , 'my.github.gdpr' ),
|
||||
('github' , 'my.github.ghexport' ),
|
||||
]
|
||||
|
||||
def indent(s, spaces=4):
|
||||
|
@ -227,3 +233,31 @@ for cls, p in modules:
|
|||
# alternatively, you can put the repository (or a symlink) in $MY_CONFIG/my/config/repos/instapexport
|
||||
instapexport: Optional[PathIsh] = None
|
||||
#+end_src
|
||||
** [[file:../my/github/gdpr.py][my.github.gdpr]]
|
||||
|
||||
Github data (uses [[https://github.com/settings/admin][official GDPR export]])
|
||||
|
||||
#+begin_src python
|
||||
class github:
|
||||
gdpr_dir: PathIsh # path to unpacked GDPR archive
|
||||
#+end_src
|
||||
** [[file:../my/github/ghexport.py][my.github.ghexport]]
|
||||
|
||||
Github data: events, comments, etc. (API data)
|
||||
|
||||
#+begin_src python
|
||||
class github:
|
||||
'''
|
||||
Uses [[https://github.com/karlicoss/ghexport][ghexport]] outputs.
|
||||
'''
|
||||
# path[s]/glob to the exported JSON data
|
||||
export_path: Paths
|
||||
|
||||
# path to a local clone of ghexport
|
||||
# alternatively, you can put the repository (or a symlink) in $MY_CONFIG/my/config/repos/ghexport
|
||||
ghexport : Optional[PathIsh] = None
|
||||
|
||||
# path to a cache directory
|
||||
# if omitted, will use /tmp
|
||||
cache_dir: Optional[PathIsh] = None
|
||||
#+end_src
|
||||
|
|
|
@ -474,8 +474,7 @@ Since you have two different sources of raw data, you need to specify two bits o
|
|||
: class twitter_archive:
|
||||
: export_path = '/backups/twitter-archives/*.zip'
|
||||
|
||||
Note that you can also just use =my.twitter.archive= or =my.twitter.twint= directly, or set either of paths to 'empty path': =()=
|
||||
# TODO empty string?
|
||||
Note that you can also just use =my.twitter.archive= or =my.twitter.twint= directly, or set either of paths to empty string: =''=
|
||||
# (TODO mypy-safe?)
|
||||
|
||||
# #addingmodifying-modules
|
||||
|
|
|
@ -1,272 +1,8 @@
|
|||
"""
|
||||
Github events and their metadata: comments/issues/pull requests
|
||||
"""
|
||||
from typing import Dict, Any, NamedTuple, Tuple, Optional, Iterator, TypeVar, Set
|
||||
from datetime import datetime
|
||||
import json
|
||||
import warnings
|
||||
|
||||
import pytz
|
||||
warnings.warn('my.coding.github is deprecated! Please use my.github.all instead!', DeprecationWarning)
|
||||
|
||||
from ..kython.klogging import LazyLogger
|
||||
from ..kython.kompress import CPath
|
||||
from ..common import get_files, mcachew
|
||||
from ..error import Res
|
||||
from ..github.all import events, get_events
|
||||
|
||||
from my.config import github as config
|
||||
import my.config.repos.ghexport.dal as ghexport
|
||||
|
||||
|
||||
logger = LazyLogger(__name__)
|
||||
|
||||
|
||||
class Event(NamedTuple):
|
||||
dt: datetime
|
||||
summary: str
|
||||
eid: str
|
||||
link: Optional[str]
|
||||
body: Optional[str]=None
|
||||
is_bot: bool = False
|
||||
|
||||
|
||||
# TODO hmm. need some sort of abstract syntax for this...
|
||||
# TODO split further, title too
|
||||
def _get_summary(e) -> Tuple[str, Optional[str], Optional[str]]:
|
||||
# TODO would be nice to give access to raw event withing timeline
|
||||
eid = e['id']
|
||||
tp = e['type']
|
||||
pl = e['payload']
|
||||
rname = e['repo']['name']
|
||||
|
||||
mapping = {
|
||||
'CreateEvent': 'created',
|
||||
'DeleteEvent': 'deleted',
|
||||
}
|
||||
|
||||
if tp == 'ForkEvent':
|
||||
url = e['payload']['forkee']['html_url']
|
||||
return f"{rname}: forked", url, None
|
||||
elif tp == 'PushEvent':
|
||||
commits = pl['commits']
|
||||
messages = [c['message'] for c in commits]
|
||||
body = '\n'.join(messages)
|
||||
return f"{rname}: pushed\n{body}", None, None
|
||||
elif tp == 'WatchEvent':
|
||||
return f"{rname}: watching", None, None
|
||||
elif tp in mapping:
|
||||
what = mapping[tp]
|
||||
rt = pl['ref_type']
|
||||
ref = pl['ref']
|
||||
# TODO link to branch? only contains weird API link though
|
||||
# TODO hmm. include timestamp instead?
|
||||
# breakpoint()
|
||||
# TODO combine automatically instead
|
||||
return f"{rname}: {what} {rt} {ref}", None, f'{rname}_{what}_{rt}_{ref}_{eid}'
|
||||
elif tp == 'PullRequestEvent':
|
||||
pr = pl['pull_request']
|
||||
action = pl['action']
|
||||
link = pr['html_url']
|
||||
title = pr['title']
|
||||
return f"{rname}: {action} PR {title}", link, f'{rname}_{action}_pr_{link}'
|
||||
elif tp == "IssuesEvent":
|
||||
action = pl['action']
|
||||
iss = pl['issue']
|
||||
link = iss['html_url']
|
||||
title = iss['title']
|
||||
return f"{rname}: {action} issue {title}", link, None
|
||||
elif tp == "IssueCommentEvent":
|
||||
com = pl['comment']
|
||||
link = com['html_url']
|
||||
iss = pl['issue']
|
||||
title = iss['title']
|
||||
return f"{rname}: commented on issue {title}", link, f'issue_comment_' + link
|
||||
elif tp == "ReleaseEvent":
|
||||
action = pl['action']
|
||||
rel = pl['release']
|
||||
tag = rel['tag_name']
|
||||
link = rel['html_url']
|
||||
return f"{rname}: {action} [{tag}]", link, None
|
||||
elif tp in 'PublicEvent':
|
||||
return f'{tp} {e}', None, None # TODO ???
|
||||
else:
|
||||
return tp, None, None
|
||||
|
||||
|
||||
def inputs():
|
||||
return get_files(config.export_dir)
|
||||
|
||||
|
||||
def _dal():
|
||||
sources = inputs()
|
||||
sources = list(map(CPath, sources)) # TODO maybe move it to get_files? e.g. compressed=True arg?
|
||||
return ghexport.DAL(sources)
|
||||
|
||||
|
||||
def _parse_dt(s: str) -> datetime:
|
||||
# TODO isoformat?
|
||||
return pytz.utc.localize(datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ'))
|
||||
|
||||
|
||||
# TODO extract to separate gdpr module?
|
||||
# TODO typing.TypedDict could be handy here..
|
||||
def _parse_common(d: Dict) -> Dict:
|
||||
url = d['url']
|
||||
body = d.get('body')
|
||||
return {
|
||||
'dt' : _parse_dt(d['created_at']),
|
||||
'link': url,
|
||||
'body': body,
|
||||
}
|
||||
|
||||
|
||||
def _parse_repository(d: Dict) -> Event:
|
||||
pref = 'https://github.com/'
|
||||
url = d['url']
|
||||
assert url.startswith(pref); name = url[len(pref):]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary='created ' + name,
|
||||
eid='created_' + name, # TODO ??
|
||||
)
|
||||
|
||||
def _parse_issue_comment(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
is_bot = "[bot]" in d["user"]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'commented on issue {url}',
|
||||
eid='issue_comment_' + url,
|
||||
is_bot=is_bot,
|
||||
)
|
||||
|
||||
|
||||
def _parse_issue(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
title = d['title']
|
||||
is_bot = "[bot]" in d["user"]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'opened issue {title}',
|
||||
eid='issue_comment_' + url,
|
||||
is_bot=is_bot,
|
||||
)
|
||||
|
||||
|
||||
def _parse_pull_request(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
title = d['title']
|
||||
is_bot = "[bot]" in d["user"]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
# TODO distinguish incoming/outgoing?
|
||||
# TODO action? opened/closed??
|
||||
summary=f'opened PR {title}',
|
||||
eid='pull_request_' + url,
|
||||
is_bot=is_bot,
|
||||
)
|
||||
|
||||
|
||||
def _parse_release(d: Dict) -> Event:
|
||||
tag = d['tag_name']
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'released {tag}',
|
||||
eid='release_' + tag,
|
||||
)
|
||||
|
||||
|
||||
def _parse_commit_comment(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'commented on {url}',
|
||||
eid='commoit_comment_' + url,
|
||||
)
|
||||
|
||||
|
||||
def _parse_event(d: Dict) -> Event:
|
||||
summary, link, eid = _get_summary(d)
|
||||
if eid is None:
|
||||
eid = d['id']
|
||||
body = d.get('payload', {}).get('comment', {}).get('body')
|
||||
return Event(
|
||||
dt=_parse_dt(d['created_at']),
|
||||
summary=summary,
|
||||
link=link,
|
||||
eid=eid,
|
||||
body=body,
|
||||
)
|
||||
|
||||
|
||||
def iter_gdpr_events() -> Iterator[Res[Event]]:
|
||||
"""
|
||||
Parses events from GDPR export (https://github.com/settings/admin)
|
||||
"""
|
||||
# TODO allow using archive here?
|
||||
files = get_files(config.gdpr_dir, glob='*.json')
|
||||
handler_map = {
|
||||
'schema' : None,
|
||||
'issue_events_': None, # eh, doesn't seem to have any useful bodies
|
||||
'attachments_' : None, # not sure if useful
|
||||
'users' : None, # just contains random users
|
||||
'repositories_' : _parse_repository,
|
||||
'issue_comments_': _parse_issue_comment,
|
||||
'issues_' : _parse_issue,
|
||||
'pull_requests_' : _parse_pull_request,
|
||||
'releases_' : _parse_release,
|
||||
'commit_comments': _parse_commit_comment,
|
||||
}
|
||||
for f in files:
|
||||
handler: Any
|
||||
for prefix, h in handler_map.items():
|
||||
if not f.name.startswith(prefix):
|
||||
continue
|
||||
handler = h
|
||||
break
|
||||
else:
|
||||
yield RuntimeError(f'Unhandled file: {f}')
|
||||
continue
|
||||
|
||||
if handler is None:
|
||||
# ignored
|
||||
continue
|
||||
|
||||
j = json.loads(f.read_text())
|
||||
for r in j:
|
||||
try:
|
||||
yield handler(r)
|
||||
except Exception as e:
|
||||
yield e
|
||||
|
||||
|
||||
# TODO hmm. not good, need to be lazier?...
|
||||
@mcachew(config.cache_dir, hashf=lambda dal: dal.sources)
|
||||
def iter_backup_events(dal=_dal()) -> Iterator[Event]:
|
||||
for d in dal.events():
|
||||
yield _parse_event(d)
|
||||
|
||||
|
||||
def iter_events() -> Iterator[Res[Event]]:
|
||||
from itertools import chain
|
||||
emitted: Set[Tuple[datetime, str]] = set()
|
||||
for e in chain(iter_gdpr_events(), iter_backup_events()):
|
||||
if isinstance(e, Exception):
|
||||
yield e
|
||||
continue
|
||||
if e.is_bot:
|
||||
continue
|
||||
key = (e.dt, e.eid) # use both just in case
|
||||
# TODO wtf?? some minor (e.g. 1 sec) discrepancies (e.g. create repository events)
|
||||
if key in emitted:
|
||||
logger.debug('ignoring %s: %s', key, e)
|
||||
continue
|
||||
yield e
|
||||
emitted.add(key)
|
||||
|
||||
|
||||
def get_events():
|
||||
return sorted(iter_events(), key=lambda e: e.dt)
|
||||
|
||||
# TODO mm. ok, not much point in deserializing as github.Event as it's basically a fancy dict wrapper?
|
||||
# from github.Event import Event as GEvent # type: ignore
|
||||
# # see https://github.com/PyGithub/PyGithub/blob/master/github/GithubObject.py::GithubObject.__init__
|
||||
# e = GEvent(None, None, raw_event, True)
|
||||
# todo deprecate properly
|
||||
iter_events = events
|
||||
|
|
|
@ -116,6 +116,7 @@ from ..kython.klogging import setup_logger, LazyLogger
|
|||
|
||||
Paths = Union[Sequence[PathIsh], PathIsh]
|
||||
|
||||
# TODO support '' for emtpy path
|
||||
DEFAULT_GLOB = '*'
|
||||
def get_files(pp: Paths, glob: str=DEFAULT_GLOB, sort: bool=True) -> Tuple[Path, ...]:
|
||||
"""
|
||||
|
@ -124,11 +125,16 @@ def get_files(pp: Paths, glob: str=DEFAULT_GLOB, sort: bool=True) -> Tuple[Path,
|
|||
Tuple as return type is a bit friendlier for hashing/caching, so hopefully makes sense
|
||||
"""
|
||||
# TODO FIXME mm, some wrapper to assert iterator isn't empty?
|
||||
sources: List[Path] = []
|
||||
if isinstance(pp, (str, Path)):
|
||||
sources.append(Path(pp))
|
||||
sources: List[Path]
|
||||
if isinstance(pp, Path):
|
||||
sources = [pp]
|
||||
elif isinstance(pp, str):
|
||||
if pp == '':
|
||||
# special case -- makes sense for optional data sources, etc
|
||||
return () # early return to prevent warnings etc
|
||||
sources = [Path(pp)]
|
||||
else:
|
||||
sources.extend(map(Path, pp))
|
||||
sources = [Path(p) for p in pp]
|
||||
|
||||
def caller() -> str:
|
||||
import traceback
|
||||
|
|
21
my/github/all.py
Normal file
21
my/github/all.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
"""
|
||||
Unified Github data (merged from GDPR export and periodic API updates)
|
||||
"""
|
||||
|
||||
from . import gdpr, ghexport
|
||||
|
||||
from .common import merge_events, Results
|
||||
|
||||
|
||||
def events() -> Results:
|
||||
yield from merge_events(
|
||||
gdpr.events(),
|
||||
ghexport.events(),
|
||||
)
|
||||
|
||||
|
||||
# todo hmm. not sure, maybe should be named sorted_events or something..
|
||||
# also, not great that it's in all.py... think of a better way...
|
||||
def get_events() -> Results:
|
||||
from ..core.error import sort_res_by
|
||||
return sort_res_by(events(), key=lambda e: e.dt)
|
52
my/github/common.py
Normal file
52
my/github/common.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
"""
|
||||
Github events and their metadata: comments/issues/pull requests
|
||||
"""
|
||||
from datetime import datetime
|
||||
from typing import Optional, NamedTuple, Iterable, Set, Tuple
|
||||
|
||||
import pytz
|
||||
|
||||
from ..core import warn_if_empty
|
||||
from ..core.error import Res
|
||||
|
||||
|
||||
class Event(NamedTuple):
|
||||
dt: datetime
|
||||
summary: str
|
||||
eid: str
|
||||
link: Optional[str]
|
||||
body: Optional[str]=None
|
||||
is_bot: bool = False
|
||||
|
||||
|
||||
Results = Iterable[Res[Event]]
|
||||
|
||||
@warn_if_empty
|
||||
def merge_events(*sources: Results) -> Results:
|
||||
from ..kython.klogging import LazyLogger
|
||||
logger = LazyLogger(__name__)
|
||||
from itertools import chain
|
||||
emitted: Set[Tuple[datetime, str]] = set()
|
||||
for e in chain(*sources):
|
||||
if isinstance(e, Exception):
|
||||
yield e
|
||||
continue
|
||||
if e.is_bot:
|
||||
continue
|
||||
key = (e.dt, e.eid) # use both just in case
|
||||
# TODO wtf?? some minor (e.g. 1 sec) discrepancies (e.g. create repository events)
|
||||
if key in emitted:
|
||||
logger.debug('ignoring %s: %s', key, e)
|
||||
continue
|
||||
yield e
|
||||
emitted.add(key)
|
||||
# todo use unique_everseen? Might be tricky with Exception etc..
|
||||
|
||||
|
||||
def parse_dt(s: str) -> datetime:
|
||||
# TODO isoformat?
|
||||
return pytz.utc.localize(datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ'))
|
||||
|
||||
# TODO not sure
|
||||
# def get_events() -> Iterable[Res[Event]]:
|
||||
# return sort_res_by(events(), key=lambda e: e.dt)
|
143
my/github/gdpr.py
Normal file
143
my/github/gdpr.py
Normal file
|
@ -0,0 +1,143 @@
|
|||
"""
|
||||
Github data (uses [[https://github.com/settings/admin][official GDPR export]])
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
from typing import Iterable, Dict, Any
|
||||
|
||||
from ..core.error import Res
|
||||
from ..core import get_files
|
||||
|
||||
from .common import Event, parse_dt
|
||||
|
||||
# TODO later, use a separate user config? (github_gdpr)
|
||||
from my.config import github as user_config
|
||||
|
||||
from dataclasses import dataclass
|
||||
from ..core import PathIsh
|
||||
|
||||
@dataclass
|
||||
class github(user_config):
|
||||
gdpr_dir: PathIsh # path to unpacked GDPR archive
|
||||
|
||||
###
|
||||
|
||||
|
||||
from ..core.cfg import make_config
|
||||
config = make_config(github)
|
||||
|
||||
|
||||
def events() -> Iterable[Res[Event]]:
|
||||
# TODO FIXME allow using archive here?
|
||||
files = get_files(config.gdpr_dir, glob='*.json')
|
||||
handler_map = {
|
||||
'schema' : None,
|
||||
'issue_events_': None, # eh, doesn't seem to have any useful bodies
|
||||
'attachments_' : None, # not sure if useful
|
||||
'users' : None, # just contains random users
|
||||
'repositories_' : _parse_repository,
|
||||
'issue_comments_': _parse_issue_comment,
|
||||
'issues_' : _parse_issue,
|
||||
'pull_requests_' : _parse_pull_request,
|
||||
'releases_' : _parse_release,
|
||||
'commit_comments': _parse_commit_comment,
|
||||
}
|
||||
for f in files:
|
||||
handler: Any
|
||||
for prefix, h in handler_map.items():
|
||||
if not f.name.startswith(prefix):
|
||||
continue
|
||||
handler = h
|
||||
break
|
||||
else:
|
||||
yield RuntimeError(f'Unhandled file: {f}')
|
||||
continue
|
||||
|
||||
if handler is None:
|
||||
# ignored
|
||||
continue
|
||||
|
||||
j = json.loads(f.read_text())
|
||||
for r in j:
|
||||
try:
|
||||
yield handler(r)
|
||||
except Exception as e:
|
||||
yield e
|
||||
|
||||
|
||||
# TODO typing.TypedDict could be handy here..
|
||||
def _parse_common(d: Dict) -> Dict:
|
||||
url = d['url']
|
||||
body = d.get('body')
|
||||
return {
|
||||
'dt' : parse_dt(d['created_at']),
|
||||
'link': url,
|
||||
'body': body,
|
||||
}
|
||||
|
||||
|
||||
def _parse_repository(d: Dict) -> Event:
|
||||
pref = 'https://github.com/'
|
||||
url = d['url']
|
||||
assert url.startswith(pref); name = url[len(pref):]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary='created ' + name,
|
||||
eid='created_' + name, # TODO ??
|
||||
)
|
||||
|
||||
|
||||
def _parse_issue_comment(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
is_bot = "[bot]" in d["user"]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'commented on issue {url}',
|
||||
eid='issue_comment_' + url,
|
||||
is_bot=is_bot,
|
||||
)
|
||||
|
||||
|
||||
def _parse_issue(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
title = d['title']
|
||||
is_bot = "[bot]" in d["user"]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'opened issue {title}',
|
||||
eid='issue_comment_' + url,
|
||||
is_bot=is_bot,
|
||||
)
|
||||
|
||||
|
||||
def _parse_pull_request(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
title = d['title']
|
||||
is_bot = "[bot]" in d["user"]
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
# TODO distinguish incoming/outgoing?
|
||||
# TODO action? opened/closed??
|
||||
summary=f'opened PR {title}',
|
||||
eid='pull_request_' + url,
|
||||
is_bot=is_bot,
|
||||
)
|
||||
|
||||
|
||||
def _parse_release(d: Dict) -> Event:
|
||||
tag = d['tag_name']
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'released {tag}',
|
||||
eid='release_' + tag,
|
||||
)
|
||||
|
||||
|
||||
def _parse_commit_comment(d: Dict) -> Event:
|
||||
url = d['url']
|
||||
return Event( # type: ignore[misc]
|
||||
**_parse_common(d),
|
||||
summary=f'commented on {url}',
|
||||
eid='commoit_comment_' + url,
|
||||
)
|
164
my/github/ghexport.py
Normal file
164
my/github/ghexport.py
Normal file
|
@ -0,0 +1,164 @@
|
|||
"""
|
||||
Github data: events, comments, etc. (API data)
|
||||
"""
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from ..core import Paths, PathIsh
|
||||
|
||||
from my.config import github as user_config
|
||||
|
||||
|
||||
@dataclass
|
||||
class github(user_config):
|
||||
'''
|
||||
Uses [[https://github.com/karlicoss/ghexport][ghexport]] outputs.
|
||||
'''
|
||||
# path[s]/glob to the exported JSON data
|
||||
export_path: Paths
|
||||
|
||||
# path to a local clone of ghexport
|
||||
# alternatively, you can put the repository (or a symlink) in $MY_CONFIG/my/config/repos/ghexport
|
||||
ghexport : Optional[PathIsh] = None
|
||||
|
||||
# path to a cache directory
|
||||
# if omitted, will use /tmp
|
||||
cache_dir: Optional[PathIsh] = None
|
||||
|
||||
@property
|
||||
def dal_module(self):
|
||||
rpath = self.ghexport
|
||||
if rpath is not None:
|
||||
from .core.common import import_dir
|
||||
return import_dir(rpath, '.dal')
|
||||
else:
|
||||
import my.config.repos.ghexport.dal as dal
|
||||
return dal
|
||||
###
|
||||
|
||||
# TODO perhaps using /tmp in case of None isn't ideal... maybe it should be treated as if cache is off
|
||||
|
||||
from ..core.cfg import make_config, Attrs
|
||||
def migration(attrs: Attrs) -> Attrs:
|
||||
if 'export_dir' in attrs: # legacy name
|
||||
attrs['export_path'] = attrs['export_dir']
|
||||
return attrs
|
||||
config = make_config(github, migration=migration)
|
||||
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
if TYPE_CHECKING:
|
||||
import my.config.repos.ghexport.dal as dal
|
||||
else:
|
||||
dal = config.dal_module
|
||||
|
||||
############################
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Iterable, Dict, Sequence
|
||||
|
||||
from ..core import get_files
|
||||
from ..core.common import mcachew
|
||||
from ..kython.kompress import CPath
|
||||
|
||||
from .common import Event, parse_dt, Results
|
||||
|
||||
|
||||
def inputs() -> Sequence[Path]:
|
||||
return get_files(config.export_path)
|
||||
|
||||
|
||||
def _dal() -> dal.DAL:
|
||||
sources = inputs()
|
||||
sources = list(map(CPath, sources)) # TODO maybe move it to get_files? e.g. compressed=True arg?
|
||||
return dal.DAL(sources)
|
||||
|
||||
|
||||
# TODO hmm. not good, need to be lazier?...
|
||||
@mcachew(config.cache_dir, hashf=lambda dal: dal.sources)
|
||||
def events(dal=_dal()) -> Results:
|
||||
for d in dal.events():
|
||||
yield _parse_event(d)
|
||||
|
||||
|
||||
# TODO hmm. need some sort of abstract syntax for this...
|
||||
# TODO split further, title too
|
||||
def _get_summary(e) -> Tuple[str, Optional[str], Optional[str]]:
|
||||
# TODO would be nice to give access to raw event withing timeline
|
||||
eid = e['id']
|
||||
tp = e['type']
|
||||
pl = e['payload']
|
||||
rname = e['repo']['name']
|
||||
|
||||
mapping = {
|
||||
'CreateEvent': 'created',
|
||||
'DeleteEvent': 'deleted',
|
||||
}
|
||||
|
||||
if tp == 'ForkEvent':
|
||||
url = e['payload']['forkee']['html_url']
|
||||
return f"{rname}: forked", url, None
|
||||
elif tp == 'PushEvent':
|
||||
commits = pl['commits']
|
||||
messages = [c['message'] for c in commits]
|
||||
body = '\n'.join(messages)
|
||||
return f"{rname}: pushed\n{body}", None, None
|
||||
elif tp == 'WatchEvent':
|
||||
return f"{rname}: watching", None, None
|
||||
elif tp in mapping:
|
||||
what = mapping[tp]
|
||||
rt = pl['ref_type']
|
||||
ref = pl['ref']
|
||||
# TODO link to branch? only contains weird API link though
|
||||
# TODO hmm. include timestamp instead?
|
||||
# breakpoint()
|
||||
# TODO combine automatically instead
|
||||
return f"{rname}: {what} {rt} {ref}", None, f'{rname}_{what}_{rt}_{ref}_{eid}'
|
||||
elif tp == 'PullRequestEvent':
|
||||
pr = pl['pull_request']
|
||||
action = pl['action']
|
||||
link = pr['html_url']
|
||||
title = pr['title']
|
||||
return f"{rname}: {action} PR {title}", link, f'{rname}_{action}_pr_{link}'
|
||||
elif tp == "IssuesEvent":
|
||||
action = pl['action']
|
||||
iss = pl['issue']
|
||||
link = iss['html_url']
|
||||
title = iss['title']
|
||||
return f"{rname}: {action} issue {title}", link, None
|
||||
elif tp == "IssueCommentEvent":
|
||||
com = pl['comment']
|
||||
link = com['html_url']
|
||||
iss = pl['issue']
|
||||
title = iss['title']
|
||||
return f"{rname}: commented on issue {title}", link, f'issue_comment_' + link
|
||||
elif tp == "ReleaseEvent":
|
||||
action = pl['action']
|
||||
rel = pl['release']
|
||||
tag = rel['tag_name']
|
||||
link = rel['html_url']
|
||||
return f"{rname}: {action} [{tag}]", link, None
|
||||
elif tp in 'PublicEvent':
|
||||
return f'{tp} {e}', None, None # TODO ???
|
||||
else:
|
||||
return tp, None, None
|
||||
|
||||
|
||||
def _parse_event(d: Dict) -> Event:
|
||||
summary, link, eid = _get_summary(d)
|
||||
if eid is None:
|
||||
eid = d['id']
|
||||
body = d.get('payload', {}).get('comment', {}).get('body')
|
||||
return Event(
|
||||
dt=parse_dt(d['created_at']),
|
||||
summary=summary,
|
||||
link=link,
|
||||
eid=eid,
|
||||
body=body,
|
||||
)
|
||||
|
||||
|
||||
# TODO mm. ok, not much point in deserializing as github.Event as it's basically a fancy dict wrapper?
|
||||
# from github.Event import Event as GEvent # type: ignore
|
||||
# # see https://github.com/PyGithub/PyGithub/blob/master/github/GithubObject.py::GithubObject.__init__
|
||||
# e = GEvent(None, None, raw_event, True)
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
from datetime import datetime
|
||||
from typing import NamedTuple, List
|
||||
from typing import NamedTuple, List, Iterable
|
||||
|
||||
from ..google.takeout.html import read_html
|
||||
from ..google.takeout.paths import get_last_takeout
|
||||
|
@ -16,7 +16,7 @@ class Watched(NamedTuple):
|
|||
return f'{self.url}-{self.when.isoformat()}'
|
||||
|
||||
|
||||
def get_watched():
|
||||
def watched() -> Iterable[Watched]:
|
||||
# TODO need to use a glob? to make up for old takouts that didn't start with Takeout/
|
||||
path = 'Takeout/My Activity/YouTube/MyActivity.html' # looks like this one doesn't have retention? so enough to use the last
|
||||
# TODO YouTube/history/watch-history.html, also YouTube/history/watch-history.json
|
||||
|
@ -30,6 +30,10 @@ def get_watched():
|
|||
return list(sorted(watches, key=lambda e: e.when))
|
||||
|
||||
|
||||
# todo deprecate
|
||||
get_watched = watched
|
||||
|
||||
|
||||
def main():
|
||||
# TODO shit. a LOT of watches...
|
||||
for w in get_watched():
|
||||
|
|
|
@ -7,13 +7,13 @@ from . import twint, archive
|
|||
|
||||
from .common import merge_tweets
|
||||
|
||||
|
||||
def tweets():
|
||||
yield from merge_tweets(
|
||||
twint .tweets(),
|
||||
archive.tweets(),
|
||||
)
|
||||
|
||||
from .common import merge_tweets
|
||||
|
||||
def likes():
|
||||
yield from merge_tweets(
|
||||
|
|
|
@ -18,9 +18,8 @@ except ImportError as e:
|
|||
|
||||
|
||||
from dataclasses import dataclass
|
||||
from ..core.common import Paths
|
||||
from ..core import Paths
|
||||
|
||||
# TODO perhaps rename to twitter_archive? dunno
|
||||
@dataclass
|
||||
class twitter_archive(user_config):
|
||||
export_path: Paths # path[s]/glob to the twitter archive takeout
|
||||
|
|
|
@ -14,6 +14,7 @@ from my.config import twint as user_config
|
|||
class twint(user_config):
|
||||
export_path: Paths # path[s]/glob to the twint Sqlite database
|
||||
|
||||
####
|
||||
|
||||
from ..core.cfg import make_config
|
||||
config = make_config(twint)
|
||||
|
|
|
@ -102,6 +102,9 @@ def test_no_files():
|
|||
'''
|
||||
Test for empty matches. They work, but should result in warning
|
||||
'''
|
||||
assert get_files('') == ()
|
||||
|
||||
# todo test these for warnings?
|
||||
assert get_files([]) == ()
|
||||
assert get_files('bad*glob') == ()
|
||||
|
||||
|
|
|
@ -1,8 +1,16 @@
|
|||
#!/usr/bin/env python3
|
||||
from more_itertools import ilen
|
||||
|
||||
from my.coding.github import get_events
|
||||
|
||||
|
||||
def test_gdpr():
|
||||
import my.github.gdpr as gdpr
|
||||
assert ilen(gdpr.events()) > 100
|
||||
|
||||
|
||||
def test():
|
||||
events = get_events()
|
||||
assert len(events) > 100
|
||||
assert ilen(events) > 100
|
||||
for e in events:
|
||||
print(e)
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
# TODO move elsewhere?
|
||||
|
||||
# these tests would only make sense with some existing data? although some of them would work for everyone..
|
||||
# not sure what's a good way of handling this..
|
||||
|
||||
|
@ -7,7 +6,7 @@ from my.media.youtube import get_watched, Watched
|
|||
|
||||
|
||||
def test():
|
||||
watched = get_watched()
|
||||
watched = list(get_watched())
|
||||
assert len(watched) > 1000
|
||||
|
||||
from datetime import datetime
|
||||
|
|
Loading…
Add table
Reference in a new issue