parse content/filenames properly, parse addresses, remove null data

This commit is contained in:
Sean Breckenridge 2024-06-02 17:23:42 -07:00
parent 118b58ee68
commit 88474771e9

View file

@ -154,37 +154,67 @@ class MMSContentPart(NamedTuple):
sequence_index: int
content_type: str
filename: str
text: str
text: Optional[str]
data: Optional[str]
# https://www.synctech.com.au/sms-backup-restore/fields-in-xml-backup-files/
class MMS(NamedTuple):
dt: datetime
dt_readable: str
content: List[MMSContentPart]
parts: List[MMSContentPart]
# NOTE: these is often something like 'Name 1, Name 2', but might be different depending on your client
who: Optional[str]
# NOTE: This can be a single phone number, or multiple, split by '~' or ','. Its better to think
# of this as a 'key' or 'conversation ID', phone numbers are also present in 'addresses'
phone_number: str
addresses: List[Tuple[str, int]]
# 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox
message_type: int
@property
def from_user(self) -> str:
# since these can be group messages, we can't just check message_type,
# we need to iterate through and find who sent it
# who is CC/'To' is not obvious in many message clients
#
# 129 = BCC, 130 = CC, 151 = To, 137 = From
for (addr, _type) in self.addresses:
if _type == 137:
return addr
else:
# hmm, maybe return instead? but this probably shouldnt happen, means
# something is very broken
raise RuntimeError(f'No from address matching 137 found in {self.addresses}')
@property
def from_me(self) -> bool:
return self.message_type == 2
def mms() -> Iterator[Res[MMS]]:
files = get_files(config.export_path, glob='sms-*.xml')
emitted: Set[Tuple[datetime, Optional[str]]] = set()
emitted: Set[Tuple[datetime, Optional[str], str]] = set()
for p in files:
for c in _parse_mms(p):
for c in _extract_mms(p):
if isinstance(c, Exception):
yield c
continue
key = (c.dt, c.who)
key = (c.dt, c.phone_number, c.from_user)
if key in emitted:
continue
emitted.add(key)
yield c
def _resolve_null_str(value: Optional[str]) -> Optional[str]:
if value is None:
return None
if value.lower() == 'null':
return None
return value
def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
def _extract_mms(path: Path) -> Iterator[Res[MMS]]:
tr = etree.parse(str(path))
for mxml in tr.findall('mms'):
@ -192,7 +222,6 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
dt_readable = mxml.get('readable_date')
message_type = mxml.get('msg_box')
# TODO: split these by `~`?
who = mxml.get('contact_name')
if who is not None and who in UNKNOWN:
who = None
@ -200,21 +229,23 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
if dt is None or dt_readable is None or message_type is None or phone_number is None:
mxml_str = etree.tostring(mxml).decode('utf-8')
breakpoint()
yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}')
continue
content: List[MMSContentPart] = []
addresses: List[Tuple[str, int]] = []
for addr_parent in mxml.findall('addrs'):
for addr in addr_parent.findall('addr'):
addr_data = addr.attrib
user_address = addr_data.get('address')
user_type = addr_data.get('type')
if user_address is None or user_type is None:
addr_str = etree.tostring(addr_parent).decode()
return RuntimeError(f'Missing one or more required attributes [address, type] in {addr_str}')
if not user_type.isdigit():
return RuntimeError(f'Invalid type {user_type}')
addresses.append((user_address, int(user_type)))
# TODO: use this to extract out which person actually sent this
# message_type does not accurately encode that when there are multiple people
#
# addresses = []
# for addr_parent in mxml.findall('addrs'):
# for addr in addr_parent.findall('addr'):
# if "address" in addr.attrib:
# # 129 = BCC, 130 = CC, 151 = To, 137 = From
# addresses.append(addr.attrib["address"])
content: List[MMSContentPart] = []
for part_root in mxml.findall('parts'):
@ -233,19 +264,23 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
# be typed by any sort of mappingproxy. maybe a protocol could work..?
part_data: Dict[str, Any] = part.attrib # type: ignore
seq: Optional[str] = part_data.get('seq')
if seq == "-1":
if seq == '-1':
continue
if seq is None or not seq.isdigit():
yield RuntimeError(f"seq must be a number, was seq={seq} in {part_data}")
yield RuntimeError(f'seq must be a number, was seq={seq} in {part_data}')
continue
charset_type: Optional[str] = part_data.get('ct')
filename: Optional[str] = part_data.get('name')
data: Optional[str] = part_data.get('text')
charset_type: Optional[str] = _resolve_null_str(part_data.get('ct'))
filename: Optional[str] = _resolve_null_str(part_data.get('name'))
# in some cases (images, cards), the filename is set in 'cl' instead
if filename is None:
filename = _resolve_null_str(part_data.get('cl'))
text: Optional[str] = _resolve_null_str(part_data.get('text'))
data: Optional[str] = _resolve_null_str(part_data.get('data'))
if charset_type is None or filename is None or data is None:
yield RuntimeError(f"charset_type, filename, data must be present in {part_data}")
if charset_type is None or filename is None or (text is None and data is None):
yield RuntimeError(f'Missing one or more required attributes [ct, name, (text, data)] must be present in {part_data}')
continue
content.append(
@ -253,7 +288,8 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
sequence_index=int(seq),
content_type=charset_type,
filename=filename,
text=data
text=text,
data=data
)
)
@ -263,7 +299,8 @@ def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
who=who,
phone_number=phone_number,
message_type=int(message_type),
content=content
parts=content,
addresses=addresses,
)