193 lines
5.9 KiB
Python
193 lines
5.9 KiB
Python
"""
|
|
Bumble data from Android app database (in =/data/data/com.instagram.android/databases/direct.db=)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Iterator, Sequence, Optional, Dict, Union
|
|
|
|
from more_itertools import unique_everseen
|
|
|
|
from my.core import (
|
|
get_files, Paths,
|
|
make_config,
|
|
LazyLogger,
|
|
datetime_naive,
|
|
Json,
|
|
Res, assert_never,
|
|
)
|
|
from my.core.sqlite import sqlite_connect_immutable, select
|
|
|
|
from my.config import instagram as user_config
|
|
|
|
|
|
logger = LazyLogger(__name__, level='debug')
|
|
|
|
@dataclass
|
|
class instagram_android_config(user_config.android):
|
|
# paths[s]/glob to the exported sqlite databases
|
|
export_path: Paths
|
|
|
|
# sadly doesn't seem easy to extract user's own handle/name from the db...
|
|
# todo maybe makes more sense to keep in parent class? not sure...
|
|
username: Optional[str] = None
|
|
full_name: Optional[str] = None
|
|
|
|
|
|
config = make_config(instagram_android_config)
|
|
|
|
|
|
def inputs() -> Sequence[Path]:
|
|
return get_files(config.export_path)
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class User:
|
|
id: str
|
|
username: str
|
|
full_name: str
|
|
|
|
|
|
# todo not sure about order of fields...
|
|
@dataclass
|
|
class _BaseMessage:
|
|
id: str
|
|
# NOTE: ffs, looks like they keep naive timestamps in the db (checked some random messages)
|
|
created: datetime_naive
|
|
text: str
|
|
thread_id: str
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class _Message(_BaseMessage):
|
|
user_id: str
|
|
# TODO ugh. can't figure out if dms have proper replies?
|
|
# reply_to_id: Optional[str]
|
|
|
|
|
|
@dataclass(unsafe_hash=True)
|
|
class Message(_BaseMessage):
|
|
user: User
|
|
# TODO could also extract Thread objec? not sure if useful
|
|
# reply_to: Optional[Message]
|
|
|
|
|
|
# this is kinda expecrimental
|
|
# basically just using RuntimeError(msg_id, *rest) has an unfortunate consequence:
|
|
# there are way too many 'similar' errors (on different msg_id)
|
|
# however passing msg_id is nice as a means of supplying extra context
|
|
# so this is a compromise, the 'duplicate' errors will be filtered out by unique_everseen
|
|
|
|
|
|
class MessageError(RuntimeError):
|
|
def __init__(self, msg_id: str, *rest: str) -> None:
|
|
super().__init__(msg_id, *rest)
|
|
self.rest = rest
|
|
|
|
def __hash__(self, other):
|
|
return hash(self.rest)
|
|
|
|
def __eq__(self, other) -> bool:
|
|
if not isinstance(other, MessageError):
|
|
return False
|
|
return self.rest == other.rest
|
|
|
|
|
|
def _parse_message(j: Json) -> Optional[_Message]:
|
|
id = j['item_id']
|
|
t = j['item_type']
|
|
tid = j['thread_key']['thread_id']
|
|
uid = j['user_id']
|
|
created = datetime.fromtimestamp(int(j['timestamp']) / 1_000_000)
|
|
text: str
|
|
if t == 'text':
|
|
text = j['text']
|
|
elif t == 'reel_share':
|
|
# TODO include reel_share -> media??
|
|
# the problem is that the links are deliberately expired by instagram..
|
|
text = j['reel_share']['text']
|
|
elif t == 'action_log':
|
|
# something like "X liked message" -- hardly useful?
|
|
return None
|
|
else:
|
|
raise MessageError(id, f"{t} isn't handled yet")
|
|
|
|
return _Message(
|
|
id=id,
|
|
created=created,
|
|
text=text,
|
|
thread_id=tid,
|
|
user_id=uid,
|
|
# reply_to_id='FIXME',
|
|
)
|
|
|
|
|
|
def _entities() -> Iterator[Res[Union[User, _Message]]]:
|
|
# NOTE: definitely need to merge multiple, app seems to recycle old messages
|
|
# TODO: hmm hard to guarantee timestamp ordering when we use synthetic input data...
|
|
# todo use TypedDict?
|
|
for f in inputs():
|
|
with sqlite_connect_immutable(f) as db:
|
|
# TODO ugh. seems like no way to extract username?
|
|
# sometimes messages (e.g. media_share) contain it in message field
|
|
# but generally it's not present. ugh
|
|
for (self_uid,) in select(('user_id',), 'FROM session', db=db):
|
|
yield User(
|
|
id=str(self_uid),
|
|
full_name=config.full_name or 'USERS_OWN_FULL_NAME',
|
|
username=config.full_name or 'USERS_OWN_USERNAME',
|
|
)
|
|
|
|
for (thread_json,) in select(('thread_info',), 'FROM threads', db=db):
|
|
j = json.loads(thread_json)
|
|
# todo in principle should leave the thread attached to the message?
|
|
# since thread is a group of users?
|
|
# inviter usually contains our own user
|
|
for r in [j['inviter'], *j['recipients']]:
|
|
# id disappeared and seems that pk_id is in use now (around december 2022)
|
|
uid = r.get('id') or r.get('pk_id')
|
|
assert uid is not None
|
|
yield User(
|
|
id=str(uid), # for some reason it's int in the db
|
|
full_name=r['full_name'],
|
|
username=r['username'],
|
|
)
|
|
|
|
for (msg_json,) in select(('message',), 'FROM messages ORDER BY timestamp', db=db):
|
|
# eh, seems to contain everything in json?
|
|
j = json.loads(msg_json)
|
|
try:
|
|
m = _parse_message(j)
|
|
if m is not None:
|
|
yield m
|
|
except Exception as e:
|
|
yield e
|
|
|
|
|
|
def messages() -> Iterator[Res[Message]]:
|
|
id2user: Dict[str, User] = {}
|
|
for x in unique_everseen(_entities()):
|
|
if isinstance(x, Exception):
|
|
yield x
|
|
continue
|
|
if isinstance(x, User):
|
|
id2user[x.id] = x
|
|
continue
|
|
if isinstance(x, _Message):
|
|
try:
|
|
user = id2user[x.user_id]
|
|
except Exception as e:
|
|
yield e
|
|
continue
|
|
yield Message(
|
|
id=x.id,
|
|
created=x.created,
|
|
text=x.text,
|
|
thread_id=x.thread_id,
|
|
user=user,
|
|
)
|
|
continue
|
|
assert_never(x)
|