From 1709b11ede7823b0a86e424cc98a78fb82e1215b Mon Sep 17 00:00:00 2001 From: karlicoss Date: Mon, 30 Oct 2023 02:28:12 +0000 Subject: [PATCH] fbmessenger.android: support processing msys database seems that threads_db2 stopped updating some time ago, and msys contains all new data now --- my/fbmessenger/android.py | 91 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 84 insertions(+), 7 deletions(-) diff --git a/my/fbmessenger/android.py b/my/fbmessenger/android.py index a5e6749..d14b653 100644 --- a/my/fbmessenger/android.py +++ b/my/fbmessenger/android.py @@ -49,6 +49,7 @@ class Thread: id: str name: Optional[str] # isn't set for groups or one to one messages + # todo not sure about order of fields... @dataclass class _BaseMessage: @@ -74,6 +75,8 @@ class Message(_BaseMessage): Entity = Union[Sender, Thread, _Message] + + def _entities() -> Iterator[Res[Entity]]: paths = inputs() total = len(paths) @@ -82,7 +85,11 @@ def _entities() -> Iterator[Res[Entity]]: logger.info(f'processing [{idx:>{width}}/{total:>{width}}] {path}') with sqlite_connection(path, immutable=True, row_factory='row') as db: try: - yield from _process_db(db) + use_msys = len(list(db.execute('SELECT * FROM sqlite_master WHERE name = "logging_events_v2"'))) > 0 + if use_msys: + yield from _process_db_msys(db) + else: + yield from _process_db_threads_db2(db) except Exception as e: yield echain(RuntimeError(f'While processing {path}'), cause=e) @@ -91,7 +98,7 @@ def _normalise_user_id(ukey: str) -> str: # trying to match messages.author from fbchat prefix = 'FACEBOOK:' assert ukey.startswith(prefix), ukey - return ukey[len(prefix):] + return ukey[len(prefix) :] def _normalise_thread_id(key) -> str: @@ -99,7 +106,74 @@ def _normalise_thread_id(key) -> str: return key.split(':')[1] -def _process_db(db: sqlite3.Connection) -> Iterator[Res[Entity]]: +# NOTE: this is sort of copy pasted from other _process_db method +# maybe later could unify them +def _process_db_msys(db: sqlite3.Connection) -> Iterator[Res[Entity]]: + senders: Dict[str, Sender] = {} + for r in db.execute('SELECT CAST(id AS TEXT) AS id, name FROM contacts'): + s = Sender( + id=r['id'], + name=r['name'], + ) + senders[s.id] = s + yield s + + # TODO can we get it from db? could infer as the most common id perhaps? + self_id = config.facebook_id + thread_users: Dict[str, List[Sender]] = {} + for r in db.execute('SELECT CAST(thread_key AS TEXT) AS thread_key, CAST(contact_id AS TEXT) AS contact_id FROM participants'): + thread_key = r['thread_key'] + user_key = r['contact_id'] + if self_id is not None and user_key == self_id: + # exclude yourself, otherwise it's just spammy to show up in all participants + continue + + ll = thread_users.get(thread_key) + if ll is None: + ll = [] + thread_users[thread_key] = ll + ll.append(senders[user_key]) + + # 15 is a weird thread that doesn't have any participants and messages + for r in db.execute('SELECT CAST(thread_key AS TEXT) AS thread_key, thread_name FROM threads WHERE thread_type != 15'): + thread_key = r['thread_key'] + name = r['thread_name'] + if name is None: + users = thread_users[thread_key] + name = ', '.join([u.name or u.id for u in users]) + yield Thread( + id=thread_key, + name=name, + ) + + # TODO should be quicker to explicitly specify columns rather than SELECT * + # should probably add it to module development tips? + for r in db.execute( + ''' + SELECT + message_id, + timestamp_ms, + text, + CAST(thread_key AS TEXT) AS thread_key, + CAST(sender_id AS TEXT) AS sender_id, + reply_source_id + FROM messages + ORDER BY timestamp_ms /* they aren't in order in the database, so need to sort */ + ''' + ): + yield _Message( + id=r['message_id'], + # TODO double check utc + dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000, tz=timezone.utc), + # is_incoming=False, TODO?? + text=r['text'], + thread_id=r['thread_key'], + sender_id=r['sender_id'], + reply_to_id=r['reply_source_id'], + ) + + +def _process_db_threads_db2(db: sqlite3.Connection) -> Iterator[Res[Entity]]: senders: Dict[str, Sender] = {} for r in db.execute('''SELECT * FROM thread_users'''): # for messaging_actor_type == 'REDUCED_MESSAGING_ACTOR', name is None @@ -142,22 +216,25 @@ def _process_db(db: sqlite3.Connection) -> Iterator[Res[Entity]]: name=name, ) - for r in db.execute(''' + for r in db.execute( + ''' SELECT *, json_extract(sender, "$.user_key") AS user_key FROM messages WHERE msg_type NOT IN ( -1, /* these don't have any data at all, likely immediately deleted or something? */ 2 /* these are 'left group' system messages, also a bit annoying since they might reference nonexistent users */ ) ORDER BY timestamp_ms /* they aren't in order in the database, so need to sort */ - '''): + ''' + ): yield _Message( id=r['msg_id'], - dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000, tz=timezone.utc), # double checked against some messages in different timezone + # double checked against some messages in different timezone + dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000, tz=timezone.utc), # is_incoming=False, TODO?? text=r['text'], thread_id=_normalise_thread_id(r['thread_key']), sender_id=_normalise_user_id(r['user_key']), - reply_to_id=r['message_replied_to_id'] + reply_to_id=r['message_replied_to_id'], )