193 lines
6.6 KiB
Python
193 lines
6.6 KiB
Python
"""
|
|
Instagram data (uses [[https://www.instagram.com/download/request][official GDPR export]])
|
|
"""
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Iterator, Sequence, Dict, Union
|
|
|
|
from more_itertools import bucket
|
|
|
|
from my.core import (
|
|
get_files,
|
|
Paths,
|
|
datetime_naive,
|
|
Res,
|
|
assert_never,
|
|
LazyLogger,
|
|
)
|
|
from my.core.kompress import ZipPath
|
|
|
|
from my.config import instagram as user_config
|
|
|
|
|
|
logger = LazyLogger(__name__, level='debug')
|
|
|
|
|
|
@dataclass
|
|
class config(user_config.gdpr):
|
|
# paths[s]/glob to the exported zip archives
|
|
export_path: Paths
|
|
# TODO later also support unpacked directories?
|
|
|
|
|
|
def inputs() -> Sequence[Path]:
|
|
return get_files(config.export_path)
|
|
|
|
|
|
# TODO think about unifying with stuff from android.py
|
|
@dataclass(unsafe_hash=True)
|
|
class User:
|
|
id: str
|
|
username: str
|
|
full_name: str
|
|
|
|
|
|
@dataclass
|
|
class _BaseMessage:
|
|
# ugh, this is insane, but does look like it's just keeping local device time???
|
|
# checked against a message sent on 3 June, which should be UTC+1, but timestamp seems local
|
|
created: datetime_naive
|
|
text: str
|
|
thread_id: str
|
|
# NOTE: doesn't look like there aren't any meaningful message ids in the export
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class _Message(_BaseMessage):
|
|
user_id: str
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class Message(_BaseMessage):
|
|
user: User
|
|
|
|
|
|
def _decode(s: str) -> str:
|
|
# yeah... idk why they do that
|
|
return s.encode('latin-1').decode('utf8')
|
|
|
|
|
|
def _entities() -> Iterator[Res[Union[User, _Message]]]:
|
|
last = ZipPath(max(inputs()))
|
|
# TODO make sure it works both with plan directory
|
|
# idelaly get_files should return the right thing, and we won't have to force ZipPath/match_structure here
|
|
# e.g. possible options are:
|
|
# - if packed things are detected, just return ZipPath
|
|
# - if packed things are detected, possibly return match_structure_wrapper
|
|
# it might be a bit tricky because it's a context manager -- who will recycle it?
|
|
# - if unpacked things are detected, just return the dir as it is
|
|
# (possibly detect them via match_structure? e.g. what if we have a bunch of unpacked dirs)
|
|
#
|
|
# I guess the goal for core.structure module was to pass it to other functions that expect unpacked structure
|
|
# https://github.com/karlicoss/HPI/pull/175
|
|
# whereas here I don't need it..
|
|
# so for now will just implement this adhoc thing and think about properly fixing later
|
|
|
|
personal_info = last / 'personal_information'
|
|
if not personal_info.exists():
|
|
# old path, used up to somewhere between feb-aug 2022
|
|
personal_info = last / 'account_information'
|
|
|
|
j = json.loads((personal_info / 'personal_information.json').read_text())
|
|
[profile] = j['profile_user']
|
|
pdata = profile['string_map_data']
|
|
username = pdata['Username']['value']
|
|
full_name = _decode(pdata['Name']['value'])
|
|
|
|
# just make up something :shrug:
|
|
self_id = username
|
|
self_user = User(
|
|
id=self_id,
|
|
username=username,
|
|
full_name=full_name,
|
|
)
|
|
yield self_user
|
|
|
|
files = list(last.rglob('messages/inbox/*/message_*.json'))
|
|
assert len(files) > 0, last
|
|
|
|
buckets = bucket(files, key=lambda p: p.parts[-2])
|
|
file_map = {k: list(buckets[k]) for k in buckets}
|
|
|
|
for fname, ffiles in file_map.items():
|
|
for ffile in sorted(ffiles, key=lambda p: int(p.stem.split('_')[-1])):
|
|
j = json.loads(ffile.read_text())
|
|
|
|
id_len = 10
|
|
# NOTE: I'm not actually sure it's other user's id.., since it corresponds to the whole converstation
|
|
# but I stared a bit at these ids vs database ids and can't see any way to find the correspondence :(
|
|
# so basically the only way to merge is to actually try some magic and correlate timestamps/message texts?
|
|
# another option is perhaps to query user id from username with some free API
|
|
# it's still fragile: e.g. if user deletes themselves there is no more username (it becomes "instagramuser")
|
|
# if we use older exports we might be able to figure it out though... so think about it?
|
|
# it also names grouped ones like instagramuserchrisfoodishblogand25others_einihreoog
|
|
# so I feel like there is just not guaranteed way to correlate :(
|
|
other_id = fname[-id_len:]
|
|
# NOTE: no match in android db?
|
|
other_username = fname[:-id_len - 1]
|
|
other_full_name = _decode(j['title'])
|
|
yield User(
|
|
id=other_id,
|
|
username=other_username,
|
|
full_name=other_full_name,
|
|
)
|
|
|
|
# todo "thread_type": "Regular" ?
|
|
for jm in j['messages']:
|
|
try:
|
|
content = None
|
|
if 'content' in jm:
|
|
content = _decode(jm['content'])
|
|
else:
|
|
share = jm.get('share')
|
|
photos = jm.get('photos')
|
|
videos = jm.get('videos')
|
|
cc = share or photos or videos
|
|
if cc is not None:
|
|
content = str(cc)
|
|
|
|
if content is None:
|
|
# not sure what it means.. perhaps likes or something?
|
|
logger.warning(f'content is None: {jm}')
|
|
continue
|
|
|
|
timestamp_ms = jm['timestamp_ms']
|
|
sender_name = _decode(jm['sender_name'])
|
|
|
|
user_id = other_id if sender_name == other_full_name else self_id
|
|
yield _Message(
|
|
created=datetime.fromtimestamp(timestamp_ms / 1000),
|
|
text=content,
|
|
user_id=user_id,
|
|
thread_id=fname, # meh.. but no better way?
|
|
)
|
|
except Exception as e:
|
|
yield e
|
|
|
|
|
|
# TODO basically copy pasted from android.py... hmm
|
|
def messages() -> Iterator[Res[Message]]:
|
|
id2user: Dict[str, User] = {}
|
|
for x in _entities():
|
|
if isinstance(x, Exception):
|
|
yield x
|
|
continue
|
|
if isinstance(x, User):
|
|
id2user[x.id] = x
|
|
continue
|
|
if isinstance(x, _Message):
|
|
try:
|
|
user = id2user[x.user_id]
|
|
except Exception as e:
|
|
yield e
|
|
continue
|
|
yield Message(
|
|
created=x.created,
|
|
text=x.text,
|
|
thread_id=x.thread_id,
|
|
user=user,
|
|
)
|
|
continue
|
|
assert_never(x)
|