smscalls: parse mms from smscalls export (#370)

* initial mms exploration
This commit is contained in:
seanbreckenridge 2024-06-05 14:03:03 -07:00 committed by GitHub
parent 8a8a1ebb0e
commit 35dd5d82a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 163 additions and 2 deletions

View file

@ -20,7 +20,7 @@ config = make_config(smscalls)
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import NamedTuple, Iterator, Set, Tuple, Optional from typing import NamedTuple, Iterator, Set, Tuple, Optional, Any, Dict, List
from lxml import etree from lxml import etree
@ -150,6 +150,165 @@ def _extract_messages(path: Path) -> Iterator[Res[Message]]:
) )
class MMSContentPart(NamedTuple):
sequence_index: int
content_type: str
filename: str
text: Optional[str]
data: Optional[str]
class MMS(NamedTuple):
dt: datetime
dt_readable: str
parts: List[MMSContentPart]
# NOTE: these is often something like 'Name 1, Name 2', but might be different depending on your client
who: Optional[str]
# NOTE: This can be a single phone number, or multiple, split by '~' or ','. Its better to think
# of this as a 'key' or 'conversation ID', phone numbers are also present in 'addresses'
phone_number: str
addresses: List[Tuple[str, int]]
# 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox
message_type: int
@property
def from_user(self) -> str:
# since these can be group messages, we can't just check message_type,
# we need to iterate through and find who sent it
# who is CC/'To' is not obvious in many message clients
#
# 129 = BCC, 130 = CC, 151 = To, 137 = From
for (addr, _type) in self.addresses:
if _type == 137:
return addr
else:
# hmm, maybe return instead? but this probably shouldnt happen, means
# something is very broken
raise RuntimeError(f'No from address matching 137 found in {self.addresses}')
@property
def from_me(self) -> bool:
return self.message_type == 2
def mms() -> Iterator[Res[MMS]]:
files = get_files(config.export_path, glob='sms-*.xml')
emitted: Set[Tuple[datetime, Optional[str], str]] = set()
for p in files:
for c in _extract_mms(p):
if isinstance(c, Exception):
yield c
continue
key = (c.dt, c.phone_number, c.from_user)
if key in emitted:
continue
emitted.add(key)
yield c
def _resolve_null_str(value: Optional[str]) -> Optional[str]:
if value is None:
return None
# hmm.. theres some risk of the text actually being 'null', but theres
# no way to distinguish that from XML values
if value == 'null':
return None
return value
def _extract_mms(path: Path) -> Iterator[Res[MMS]]:
tr = etree.parse(str(path))
for mxml in tr.findall('mms'):
dt = mxml.get('date')
dt_readable = mxml.get('readable_date')
message_type = mxml.get('msg_box')
who = mxml.get('contact_name')
if who is not None and who in UNKNOWN:
who = None
phone_number = mxml.get('address')
if dt is None or dt_readable is None or message_type is None or phone_number is None:
mxml_str = etree.tostring(mxml).decode('utf-8')
yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}')
continue
addresses: List[Tuple[str, int]] = []
for addr_parent in mxml.findall('addrs'):
for addr in addr_parent.findall('addr'):
addr_data = addr.attrib
user_address = addr_data.get('address')
user_type = addr_data.get('type')
if user_address is None or user_type is None:
addr_str = etree.tostring(addr_parent).decode()
yield RuntimeError(f'Missing one or more required attributes [address, type] in {addr_str}')
continue
if not user_type.isdigit():
yield RuntimeError(f'Invalid address type {user_type} {type(user_type)}, cannot convert to number')
continue
addresses.append((user_address, int(user_type)))
content: List[MMSContentPart] = []
for part_root in mxml.findall('parts'):
for part in part_root.findall('part'):
# the first item is an SMIL XML element encoded as a string which describes
# how the rest of the parts are laid out
# https://www.w3.org/TR/SMIL3/smil-timing.html#Timing-TimeContainerSyntax
# An example:
# <smil><head><layout><root-layout/><region id="Text" top="0" left="0" height="100%" width="100%"/></layout></head><body><par dur="5000ms"><text src="text.000000.txt" region="Text" /></par></body></smil>
#
# This seems pretty useless, so we should try and skip it, and just return the
# text/images/data
#
# man, attrib is some internal cpython ._Attrib type which can't
# be typed by any sort of mappingproxy. maybe a protocol could work..?
part_data: Dict[str, Any] = part.attrib # type: ignore
seq: Optional[str] = part_data.get('seq')
if seq == '-1':
continue
if seq is None or not seq.isdigit():
yield RuntimeError(f'seq must be a number, was seq={seq} {type(seq)} in {part_data}')
continue
charset_type: Optional[str] = _resolve_null_str(part_data.get('ct'))
filename: Optional[str] = _resolve_null_str(part_data.get('name'))
# in some cases (images, cards), the filename is set in 'cl' instead
if filename is None:
filename = _resolve_null_str(part_data.get('cl'))
text: Optional[str] = _resolve_null_str(part_data.get('text'))
data: Optional[str] = _resolve_null_str(part_data.get('data'))
if charset_type is None or filename is None or (text is None and data is None):
yield RuntimeError(f'Missing one or more required attributes [ct, name, (text, data)] must be present in {part_data}')
continue
content.append(
MMSContentPart(
sequence_index=int(seq),
content_type=charset_type,
filename=filename,
text=text,
data=data
)
)
yield MMS(
dt=_parse_dt_ms(dt),
dt_readable=dt_readable,
who=who,
phone_number=phone_number,
message_type=int(message_type),
parts=content,
addresses=addresses,
)
# See https://github.com/karlicoss/HPI/pull/90#issuecomment-702422351 # See https://github.com/karlicoss/HPI/pull/90#issuecomment-702422351
# for potentially parsing timezone from the readable_date # for potentially parsing timezone from the readable_date
def _parse_dt_ms(d: str) -> datetime: def _parse_dt_ms(d: str) -> datetime:
@ -162,4 +321,5 @@ def stats() -> Stats:
return { return {
**stat(calls), **stat(calls),
**stat(messages), **stat(messages),
**stat(mms),
} }

View file

@ -4,6 +4,7 @@ from my.tests.common import skip_if_not_karlicoss as pytestmark
# TODO implement via stat? # TODO implement via stat?
def test() -> None: def test() -> None:
from my.smscalls import calls, messages from my.smscalls import calls, messages, mms
assert len(list(calls())) > 10 assert len(list(calls())) > 10
assert len(list(messages())) > 10 assert len(list(messages())) > 10
assert len(list(mms())) > 10