my.github.gdpr/my.zulip.organization: use kompress support for tar.gz if it's available

otherwise fall back onto unpacking into tmp dir via my.core.structure
This commit is contained in:
Dima Gerasimov 2024-09-18 23:03:03 +01:00 committed by karlicoss
parent 201ddd4d7c
commit 6a18f47c37
4 changed files with 135 additions and 82 deletions

View file

@ -1,36 +1,42 @@
"""
Github data (uses [[https://github.com/settings/admin][official GDPR export]])
"""
from dataclasses import dataclass
from __future__ import annotations
import json
from abc import abstractmethod
from pathlib import Path
import tarfile
from typing import Iterable, Any, Sequence, Dict, Optional
from typing import Any, Iterator, Sequence
from my.core import get_files, Res, PathIsh, stat, Stats, make_logger
from my.core.cfg import make_config
from my.core.error import notnone, echain
from .common import Event, parse_dt, EventIds
# TODO later, use a separate user config? (github_gdpr)
from my.config import github as user_config
@dataclass
class github(user_config):
gdpr_dir: PathIsh # path to unpacked GDPR archive
config = make_config(github)
from my.core import Paths, Res, Stats, get_files, make_logger, stat, warnings
from my.core.error import echain
from .common import Event, EventIds, parse_dt
logger = make_logger(__name__)
class config:
@property
@abstractmethod
def gdpr_dir(self) -> Paths:
raise NotImplementedError
def make_config() -> config:
# TODO later, use a separate user config? (github_gdpr)
from my.config import github as user_config
class combined_config(user_config, config):
pass
return combined_config()
def inputs() -> Sequence[Path]:
gdir = config.gdpr_dir
res = get_files(gdir)
gdpr_dir = make_config().gdpr_dir
res = get_files(gdpr_dir)
schema_json = [f for f in res if f.name == 'schema.json']
was_unpacked = len(schema_json) > 0
if was_unpacked:
@ -43,22 +49,37 @@ def inputs() -> Sequence[Path]:
return res
def events() -> Iterable[Res[Event]]:
def events() -> Iterator[Res[Event]]:
last = max(inputs())
logger.info(f'extracting data from {last}')
# a bit naughty and ad-hoc, but we will generify reading from tar.gz. once we have more examples
# another one is zulip archive
if last.is_dir():
files = sorted(last.glob('*.json')) # looks like all files are in the root
open_file = lambda f: f.open()
root: Path | None = None
if last.is_dir(): # if it's already CPath, this will match it
root = last
else:
# treat as .tar.gz
tfile = tarfile.open(last)
files = sorted(map(Path, tfile.getnames()))
files = [p for p in files if len(p.parts) == 1 and p.suffix == '.json']
open_file = lambda p: notnone(tfile.extractfile(f'./{p}')) # NOTE odd, doesn't work without ./
try:
from kompress import CPath
root = CPath(last)
assert len(list(root.iterdir())) > 0 # trigger to check if we have the kompress version with targz support
except Exception as e:
logger.exception(e)
warnings.high("Upgrade 'kompress' to latest version with native .tar.gz support. Falling back to unpacking to tmp dir.")
if root is None:
from my.core.structure import match_structure
with match_structure(last, expected=()) as res: # expected=() matches it regardless any patterns
[root] = res
yield from _process_one(root)
else:
yield from _process_one(root)
def _process_one(root: Path) -> Iterator[Res[Event]]:
files = sorted(root.glob('*.json')) # looks like all files are in the root
# fmt: off
handler_map = {
@ -100,8 +121,7 @@ def events() -> Iterable[Res[Event]]:
# ignored
continue
with open_file(f) as fo:
j = json.load(fo)
j = json.loads(f.read_text())
for r in j:
try:
yield handler(r)
@ -116,7 +136,7 @@ def stats() -> Stats:
# TODO typing.TypedDict could be handy here..
def _parse_common(d: Dict) -> Dict:
def _parse_common(d: dict) -> dict:
url = d['url']
body = d.get('body')
return {
@ -126,7 +146,7 @@ def _parse_common(d: Dict) -> Dict:
}
def _parse_repository(d: Dict) -> Event:
def _parse_repository(d: dict) -> Event:
pref = 'https://github.com/'
url = d['url']
dts = d['created_at']
@ -142,13 +162,13 @@ def _parse_repository(d: Dict) -> Event:
# user may be None if the user was deleted
def _is_bot(user: Optional[str]) -> bool:
def _is_bot(user: str | None) -> bool:
if user is None:
return False
return "[bot]" in user
def _parse_issue_comment(d: Dict) -> Event:
def _parse_issue_comment(d: dict) -> Event:
url = d['url']
return Event(
**_parse_common(d),
@ -158,7 +178,7 @@ def _parse_issue_comment(d: Dict) -> Event:
)
def _parse_issue(d: Dict) -> Event:
def _parse_issue(d: dict) -> Event:
url = d['url']
title = d['title']
return Event(
@ -169,7 +189,7 @@ def _parse_issue(d: Dict) -> Event:
)
def _parse_pull_request(d: Dict) -> Event:
def _parse_pull_request(d: dict) -> Event:
dts = d['created_at']
url = d['url']
title = d['title']
@ -183,7 +203,7 @@ def _parse_pull_request(d: Dict) -> Event:
)
def _parse_project(d: Dict) -> Event:
def _parse_project(d: dict) -> Event:
url = d['url']
title = d['name']
is_bot = "[bot]" in d["creator"]
@ -198,7 +218,7 @@ def _parse_project(d: Dict) -> Event:
)
def _parse_release(d: Dict) -> Event:
def _parse_release(d: dict) -> Event:
tag = d['tag_name']
return Event(
**_parse_common(d),
@ -207,7 +227,7 @@ def _parse_release(d: Dict) -> Event:
)
def _parse_commit_comment(d: Dict) -> Event:
def _parse_commit_comment(d: dict) -> Event:
url = d['url']
return Event(
**_parse_common(d),