diff --git a/my/smscalls.py b/my/smscalls.py index dbcf8b2..73c9c96 100644 --- a/my/smscalls.py +++ b/my/smscalls.py @@ -20,7 +20,7 @@ config = make_config(smscalls) from datetime import datetime, timezone from pathlib import Path -from typing import NamedTuple, Iterator, Set, Tuple, Optional +from typing import NamedTuple, Iterator, Set, Tuple, Optional, Any, Dict, List from lxml import etree @@ -150,6 +150,126 @@ def _extract_messages(path: Path) -> Iterator[Res[Message]]: ) +class MMSContentPart(NamedTuple): + sequence_index: int + content_type: str + filename: str + text: str + + +# https://www.synctech.com.au/sms-backup-restore/fields-in-xml-backup-files/ +class MMS(NamedTuple): + dt: datetime + dt_readable: str + content: List[MMSContentPart] + who: Optional[str] + phone_number: str + # 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox + message_type: int + + +def mms() -> Iterator[Res[MMS]]: + files = get_files(config.export_path, glob='sms-*.xml') + + emitted: Set[Tuple[datetime, Optional[str]]] = set() + for p in files: + for c in _parse_mms(p): + if isinstance(c, Exception): + yield c + continue + key = (c.dt, c.who) + if key in emitted: + continue + emitted.add(key) + yield c + + +def _parse_mms(path: Path) -> Iterator[Res[MMS]]: + tr = etree.parse(str(path)) + + for mxml in tr.findall('mms'): + dt = mxml.get('date') + dt_readable = mxml.get('readable_date') + who = mxml.get('contact_name') + if who is not None and who in UNKNOWN: + who = None + phone_number = mxml.get('address') + message_type = mxml.get('msg_box') + + if dt is None or dt_readable is None or message_type is None or phone_number is None: + mxml_str = etree.tostring(mxml).decode('utf-8') + breakpoint() + yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}') + continue + + content: List[MMSContentPart] = [] + + # seems pointless, but will leave here as its how the spec describes it + # The addresses here are also included in 'addresses' (which also doesnt + # include your own address), as well as possibly contains a contact name + # + # the only difference is its a single field (split by '~' it seems?) when + # using the top-level attributes instead of 'addresses', but seems to be no + # reason to use this + # + # addresses = [] + # for addr_parent in mxml.findall('addrs'): + # for addr in addr_parent.findall('addr'): + # if "address" in addr.attrib: + # # 129 = BCC, 130 = CC, 151 = To, 137 = From + # addresses.append(addr.attrib["address"]) + + for part_root in mxml.findall('parts'): + + for part in part_root.findall('part'): + + # the first item is an SMIL XML element encoded as a string which describes + # how the rest of the parts are laid out + # https://www.w3.org/TR/SMIL3/smil-timing.html#Timing-TimeContainerSyntax + # An example: + # + # + # This seems pretty useless, so we should try and skip it, and just return the + # text/images/data + # + # man, attrib is some internal cpython ._Attrib type which can't + # be typed by any sort of mappingproxy. maybe a protocol could work..? + part_data: Dict[str, Any] = part.attrib # type: ignore + seq: Optional[str] = part_data.get('seq') + if seq == "-1": + continue + + if seq is None or not seq.isdigit(): + yield RuntimeError(f"seq must be a number, was seq={seq} in {part_data}") + continue + + charset_type: Optional[str] = part_data.get('ct') + filename: Optional[str] = part_data.get('name') + data: Optional[str] = part_data.get('text') + + if charset_type is None or filename is None or data is None: + yield RuntimeError(f"charset_type, filename, data must be present in {part_data}") + continue + + content.append( + MMSContentPart( + sequence_index=int(seq), + content_type=charset_type, + filename=filename, + text=data + ) + ) + + yield MMS( + dt=_parse_dt_ms(dt), + dt_readable=dt_readable, + who=who, + phone_number=phone_number, + message_type=int(message_type), + content=content + ) + + # See https://github.com/karlicoss/HPI/pull/90#issuecomment-702422351 # for potentially parsing timezone from the readable_date def _parse_dt_ms(d: str) -> datetime: @@ -162,4 +282,5 @@ def stats() -> Stats: return { **stat(calls), **stat(messages), + **stat(mms), }