diff --git a/my/smscalls.py b/my/smscalls.py index 96ac45c..35e4d8c 100644 --- a/my/smscalls.py +++ b/my/smscalls.py @@ -154,37 +154,67 @@ class MMSContentPart(NamedTuple): sequence_index: int content_type: str filename: str - text: str + text: Optional[str] + data: Optional[str] -# https://www.synctech.com.au/sms-backup-restore/fields-in-xml-backup-files/ class MMS(NamedTuple): dt: datetime dt_readable: str - content: List[MMSContentPart] + parts: List[MMSContentPart] + # NOTE: these is often something like 'Name 1, Name 2', but might be different depending on your client who: Optional[str] + # NOTE: This can be a single phone number, or multiple, split by '~' or ','. Its better to think + # of this as a 'key' or 'conversation ID', phone numbers are also present in 'addresses' phone_number: str + addresses: List[Tuple[str, int]] # 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox message_type: int + @property + def from_user(self) -> str: + # since these can be group messages, we can't just check message_type, + # we need to iterate through and find who sent it + # who is CC/'To' is not obvious in many message clients + # + # 129 = BCC, 130 = CC, 151 = To, 137 = From + for (addr, _type) in self.addresses: + if _type == 137: + return addr + else: + # hmm, maybe return instead? but this probably shouldnt happen, means + # something is very broken + raise RuntimeError(f'No from address matching 137 found in {self.addresses}') + + @property + def from_me(self) -> bool: + return self.message_type == 2 + def mms() -> Iterator[Res[MMS]]: files = get_files(config.export_path, glob='sms-*.xml') - emitted: Set[Tuple[datetime, Optional[str]]] = set() + emitted: Set[Tuple[datetime, Optional[str], str]] = set() for p in files: - for c in _parse_mms(p): + for c in _extract_mms(p): if isinstance(c, Exception): yield c continue - key = (c.dt, c.who) + key = (c.dt, c.phone_number, c.from_user) if key in emitted: continue emitted.add(key) yield c +def _resolve_null_str(value: Optional[str]) -> Optional[str]: + if value is None: + return None + if value.lower() == 'null': + return None + return value -def _parse_mms(path: Path) -> Iterator[Res[MMS]]: + +def _extract_mms(path: Path) -> Iterator[Res[MMS]]: tr = etree.parse(str(path)) for mxml in tr.findall('mms'): @@ -192,7 +222,6 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]: dt_readable = mxml.get('readable_date') message_type = mxml.get('msg_box') - # TODO: split these by `~`? who = mxml.get('contact_name') if who is not None and who in UNKNOWN: who = None @@ -200,21 +229,23 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]: if dt is None or dt_readable is None or message_type is None or phone_number is None: mxml_str = etree.tostring(mxml).decode('utf-8') - breakpoint() yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}') continue - content: List[MMSContentPart] = [] + addresses: List[Tuple[str, int]] = [] + for addr_parent in mxml.findall('addrs'): + for addr in addr_parent.findall('addr'): + addr_data = addr.attrib + user_address = addr_data.get('address') + user_type = addr_data.get('type') + if user_address is None or user_type is None: + addr_str = etree.tostring(addr_parent).decode() + return RuntimeError(f'Missing one or more required attributes [address, type] in {addr_str}') + if not user_type.isdigit(): + return RuntimeError(f'Invalid type {user_type}') + addresses.append((user_address, int(user_type))) - # TODO: use this to extract out which person actually sent this - # message_type does not accurately encode that when there are multiple people - # - # addresses = [] - # for addr_parent in mxml.findall('addrs'): - # for addr in addr_parent.findall('addr'): - # if "address" in addr.attrib: - # # 129 = BCC, 130 = CC, 151 = To, 137 = From - # addresses.append(addr.attrib["address"]) + content: List[MMSContentPart] = [] for part_root in mxml.findall('parts'): @@ -233,19 +264,23 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]: # be typed by any sort of mappingproxy. maybe a protocol could work..? part_data: Dict[str, Any] = part.attrib # type: ignore seq: Optional[str] = part_data.get('seq') - if seq == "-1": + if seq == '-1': continue if seq is None or not seq.isdigit(): - yield RuntimeError(f"seq must be a number, was seq={seq} in {part_data}") + yield RuntimeError(f'seq must be a number, was seq={seq} in {part_data}') continue - charset_type: Optional[str] = part_data.get('ct') - filename: Optional[str] = part_data.get('name') - data: Optional[str] = part_data.get('text') + charset_type: Optional[str] = _resolve_null_str(part_data.get('ct')) + filename: Optional[str] = _resolve_null_str(part_data.get('name')) + # in some cases (images, cards), the filename is set in 'cl' instead + if filename is None: + filename = _resolve_null_str(part_data.get('cl')) + text: Optional[str] = _resolve_null_str(part_data.get('text')) + data: Optional[str] = _resolve_null_str(part_data.get('data')) - if charset_type is None or filename is None or data is None: - yield RuntimeError(f"charset_type, filename, data must be present in {part_data}") + if charset_type is None or filename is None or (text is None and data is None): + yield RuntimeError(f'Missing one or more required attributes [ct, name, (text, data)] must be present in {part_data}') continue content.append( @@ -253,7 +288,8 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]: sequence_index=int(seq), content_type=charset_type, filename=filename, - text=data + text=text, + data=data ) ) @@ -263,7 +299,8 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]: who=who, phone_number=phone_number, message_type=int(message_type), - content=content + parts=content, + addresses=addresses, )