From af874d2d759a91ecec8e6f0a53541a18a169385e Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sun, 19 Feb 2023 02:45:08 +0000 Subject: [PATCH] my.fbmessenger.android: minor refactoring, comments & error handling --- my/fbmessenger/android.py | 119 +++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 61 deletions(-) diff --git a/my/fbmessenger/android.py b/my/fbmessenger/android.py index ef3711a..99afc15 100644 --- a/my/fbmessenger/android.py +++ b/my/fbmessenger/android.py @@ -5,19 +5,22 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime -import json from pathlib import Path import sqlite3 from typing import Iterator, Sequence, Optional, Dict, Union from more_itertools import unique_everseen -from my.core import get_files, Paths, datetime_naive, Res, assert_never +from my.core import get_files, Paths, datetime_naive, Res, assert_never, LazyLogger +from my.core.error import echain from my.core.sqlite import sqlite_connection from my.config import fbmessenger as user_config +logger = LazyLogger(__name__) + + @dataclass class config(user_config.android): # paths[s]/glob to the exported sqlite databases @@ -66,68 +69,62 @@ class Message(_BaseMessage): Entity = Union[Sender, Thread, _Message] def _entities() -> Iterator[Res[Entity]]: - for f in inputs(): + dbs = inputs() + for i, f in enumerate(dbs): + logger.debug(f'processing {f} {i}/{len(dbs)}') with sqlite_connection(f, immutable=True, row_factory='row') as db: - yield from _process_db(db) + try: + yield from _process_db(db) + except Exception as e: + yield echain(RuntimeError(f'While processing {f}'), cause=e) + + +def _normalise_user_id(ukey: str) -> str: + # trying to match messages.author from fbchat + prefix = 'FACEBOOK:' + assert ukey.startswith(prefix), ukey + return ukey[len(prefix):] + + +def _normalise_thread_id(key) -> str: + # works both for GROUP:group_id and ONE_TO_ONE:other_user:your_user + return key.split(':')[1] def _process_db(db: sqlite3.Connection) -> Iterator[Res[Entity]]: - # works both for GROUP:group_id and ONE_TO_ONE:other_user:your_user - threadkey2id = lambda key: key.split(':')[1] for r in db.execute('SELECT * FROM threads'): - try: - yield Thread( - id=threadkey2id(r['thread_key']), - name=r['name'], - ) - except Exception as e: - yield e - continue + yield Thread( + id=_normalise_thread_id(r['thread_key']), + name=r['name'], + ) + + for r in db.execute('''SELECT * FROM thread_users'''): + # for messaging_actor_type == 'REDUCED_MESSAGING_ACTOR', name is None + # but they are still referenced, so need to keep + name = r['name'] or '' + yield Sender( + id=_normalise_user_id(r['user_key']), + name=name, + ) - for r in db.execute('SELECT * FROM messages ORDER BY timestamp_ms'): - mtype: int = r['msg_type'] - if mtype == -1: - # likely immediately deleted or something? doesn't have any data at all - continue - - user_id = None - try: - # todo could use thread_users? - sj = json.loads(r['sender']) - ukey: str = sj['user_key'] - prefix = 'FACEBOOK:' - assert ukey.startswith(prefix), ukey - user_id = ukey[len(prefix):] - yield Sender( - id=user_id, - name=sj['name'], - ) - except Exception as e: - yield e - continue - - thread_id = None - try: - thread_id = threadkey2id(r['thread_key']) - except Exception as e: - yield e - continue - - try: - assert user_id is not None - assert thread_id is not None - yield _Message( - id=r['msg_id'], - dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000), - # is_incoming=False, TODO?? - text=r['text'], - thread_id=thread_id, - sender_id=user_id, - reply_to_id=r['message_replied_to_id'] - ) - except Exception as e: - yield e + for r in db.execute(''' + SELECT *, json_extract(sender, "$.user_key") AS user_key FROM messages + WHERE msg_type NOT IN ( + -1, /* these don't have any data at all, likely immediately deleted or something? */ + 2 /* these are 'left group' system messages, also a bit annoying since they might reference nonexistent users */ + ) + ORDER BY timestamp_ms /* they aren't in order in the database, so need to sort */ + '''): + yield _Message( + id=r['msg_id'], + dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000), + # is_incoming=False, TODO?? + text=r['text'], + thread_id=_normalise_thread_id(r['thread_key']), + sender_id=_normalise_user_id(r['user_key']), + reply_to_id=r['message_replied_to_id'] + ) def messages() -> Iterator[Res[Message]]: @@ -146,12 +143,12 @@ def messages() -> Iterator[Res[Message]]: continue if isinstance(x, _Message): reply_to_id = x.reply_to_id + # hmm, reply_to be missing due to the synthetic nature of export, so have to be defensive + reply_to = None if reply_to_id is None else msgs.get(reply_to_id) + # also would be interesting to merge together entities rather than resuling messages from different sources.. + # then the merging thing could be moved to common? try: sender = senders[x.sender_id] - # hmm, reply_to be missing due to the synthetic nature of export - # also would be interesting to merge together entities rather than resuling messages from different sources.. - # then the merging thing could be moved to common? - reply_to = None if reply_to_id is None else msgs[reply_to_id] thread = threads[x.thread_id] except Exception as e: yield e