HPI/my/twitter/android.py
karlicoss 51209c547e my.twitter.android: refactor into a proper module
for now only extracting bookmarks, will use it for some time and see how it goes
2023-12-24 00:49:07 +00:00

202 lines
5.6 KiB
Python

"""
Twitter data from offficial app for Android
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import re
from struct import unpack_from
from typing import Iterator, Sequence
from my.core import datetime_aware, get_files, LazyLogger, Paths, Res
from my.core.common import unique_everseen
from my.core.sqlite import sqlite_connect_immutable
import my.config
from .common import permalink
logger = LazyLogger(__name__)
@dataclass
class config(my.config.twitter.android):
# paths[s]/glob to the exported sqlite databases
export_path: Paths
def inputs() -> Sequence[Path]:
return get_files(config.export_path)
@dataclass(unsafe_hash=True)
class Tweet:
id_str: str
created_at: datetime_aware
screen_name: str
text: str
@property
def permalink(self) -> str:
return permalink(screen_name=self.screen_name, id=self.id_str)
def _parse_content(data: bytes) -> str:
pos = 0
def skip(count: int) -> None:
nonlocal pos
pos += count
def getstring(slen: int) -> str:
if slen == 1:
lfmt = '>B'
elif slen == 2:
lfmt = '>H'
else:
raise RuntimeError
(sz,) = unpack_from(lfmt, data, offset=pos)
skip(slen)
assert sz > 0
assert sz <= 10000 # sanity check?
# soo, this is how it should ideally work:
# (ss,) = unpack_from(f'{sz}s', data, offset=pos)
# skip(sz)
# however sometimes there is a discrepancy between string length in header and actual length (if you stare at the data)
# example is 1725868458246570412
# wtf??? (see logging below)
# ughhhh
seps = [
b'I\x08',
b'I\x09',
]
sep_idxs = [data[pos:].find(sep) for sep in seps]
sep_idxs = [i for i in sep_idxs if i != -1]
assert len(sep_idxs) > 0
sep_idx = min(sep_idxs)
# print("EXPECTED LEN", sz, "GOT", sep_idx, "DIFF", sep_idx - sz)
zz = data[pos : pos + sep_idx]
skip(sep_idx)
return zz.decode('utf8')
skip(2) # always starts with 4a03?
(xx,) = unpack_from('B', data, offset=pos)
skip(1)
# print("TYPE:", xx)
# wtf is this... maybe it's a bitmask?
slen = {
66 : 1,
67 : 2,
106: 1,
107: 2,
}[xx]
text = getstring(slen=slen)
# after the main tweet text it contains entities (e.g. shortened urls)
# however couldn't reverse engineer the schema properly, the links are kinda all over the place
# TODO this also contains image alt descriptions?
# see 1665029077034565633
extracted = []
linksep = 0x6a
while True:
m = re.search(b'\x6a.http', data[pos:])
if m is None:
break
qq = m.start()
pos += qq
while True:
if data[pos] != linksep:
break
pos += 1
(sz,) = unpack_from('B', data, offset=pos)
pos += 1
(ss,) = unpack_from(f'{sz}s', data, offset=pos)
pos += sz
extracted.append(ss)
replacements = {}
i = 0
while i < len(extracted):
if b'https://t.co/' in extracted[i]:
key = extracted[i].decode('utf8')
value = extracted[i + 1].decode('utf8')
i += 2
replacements[key] = value
else:
i += 1
for k, v in replacements.items():
text = text.replace(k, v)
assert 'https://t.co/' not in text # make sure we detected all links
return text
def _process_one(f: Path) -> Iterator[Res[Tweet]]:
with sqlite_connect_immutable(f) as db:
# NOTE:
# - it also has statuses_r_ent_content which has entities' links replaced
# but they are still ellipsized (e.g. check 1692905005479580039)
# so let's just uses statuses_content
# - there is also timeline_created_at, but they look like made up timestamps
# don't think they represent bookmarking time
# - not sure what's timeline_type?
# seems like 30 means bookmarks?
# there is one tweet with timeline type 18, but it has timeline_is_preview=1
for (
tweet_id,
user_name,
user_username,
created_ms,
blob,
) in db.execute(
'''
SELECT
statuses_status_id,
users_name,
users_username,
statuses_created,
CAST(statuses_content AS BLOB)
FROM timeline_view
WHERE statuses_bookmarked = 1
ORDER BY timeline_sort_index DESC
''',
):
if blob is None: # TODO exclude in sql query?
continue
yield Tweet(
id_str=tweet_id,
# TODO double check it's utc?
created_at=datetime.fromtimestamp(created_ms / 1000, tz=timezone.utc),
screen_name=user_username,
text=_parse_content(blob),
)
def bookmarks() -> Iterator[Res[Tweet]]:
# TODO might need to sort by timeline_sort_index again?
# not sure if each database contains full history of bookmarks (likely not!)
def it() -> Iterator[Res[Tweet]]:
paths = inputs()
total = len(paths)
width = len(str(total))
for idx, path in enumerate(paths):
logger.info(f'processing [{idx:>{width}}/{total:>{width}}] {path}')
yield from _process_one(path)
# TODO hmm maybe unique_everseen should be a decorator?
return unique_everseen(it)