HPI/my/smscalls.py
purarue f1d23c5e96 smscalls: allow large XML files as input
once XML files increase past a certain size
(was about 220MB for me), the parser just throws
an error because the tree is too large (iirc for
security reasons)

could maybe look at using iterparse in the future
to parse it without loading the whole file, but this
seems to fix it fine for me
2024-12-28 21:46:28 +00:00

329 lines
11 KiB
Python

"""
Phone calls and SMS messages
Exported using https://play.google.com/store/apps/details?id=com.riteshsahu.SMSBackupRestore&hl=en_US
"""
from __future__ import annotations
# See: https://www.synctech.com.au/sms-backup-restore/fields-in-xml-backup-files/ for schema
REQUIRES = ['lxml']
from dataclasses import dataclass
from my.config import smscalls as user_config
from my.core import Paths, Stats, get_files, stat
@dataclass
class smscalls(user_config):
# path[s] that SMSBackupRestore syncs XML files to
export_path: Paths
from my.core.cfg import make_config
config = make_config(smscalls)
from collections.abc import Iterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, NamedTuple
import lxml.etree as etree
from my.core.error import Res
class Call(NamedTuple):
dt: datetime
dt_readable: str
duration_s: int
phone_number: str
who: str | None
# type - 1 = Incoming, 2 = Outgoing, 3 = Missed, 4 = Voicemail, 5 = Rejected, 6 = Refused List.
call_type: int
@property
def summary(self) -> str:
return f"talked with {self.who} for {self.duration_s} secs"
@property
def from_me(self) -> bool:
return self.call_type == 2
# From docs:
# All the field values are read as-is from the underlying database and no conversion is done by the app in most cases.
#
# The '(Unknown)' is just what my android phone does, not sure if there are others
UNKNOWN: set[str] = {'(Unknown)'}
def _parse_xml(xml: Path) -> Any:
return etree.parse(str(xml), parser=etree.XMLParser(huge_tree=True))
def _extract_calls(path: Path) -> Iterator[Res[Call]]:
tr = _parse_xml(path)
for cxml in tr.findall('call'):
dt = cxml.get('date')
dt_readable = cxml.get('readable_date')
duration = cxml.get('duration')
who = cxml.get('contact_name')
call_type = cxml.get('type')
number = cxml.get('number')
# if name is missing, its not None (its some string), depends on the phone/message app
if who is not None and who in UNKNOWN:
who = None
if dt is None or dt_readable is None or duration is None or call_type is None or number is None:
call_str = etree.tostring(cxml).decode('utf-8')
yield RuntimeError(f"Missing one or more required attributes [date, readable_date, duration, type, number] in {call_str}")
continue
# TODO we've got local tz here, not sure if useful..
# ok, so readable date is local datetime, changing throughout the backup
yield Call(
dt=_parse_dt_ms(dt),
dt_readable=dt_readable,
duration_s=int(duration),
phone_number=number,
who=who,
call_type=int(call_type),
)
def calls() -> Iterator[Res[Call]]:
files = get_files(config.export_path, glob='calls-*.xml')
# TODO always replacing with the latter is good, we get better contact names??
emitted: set[datetime] = set()
for p in files:
for c in _extract_calls(p):
if isinstance(c, Exception):
yield c
continue
if c.dt in emitted:
continue
emitted.add(c.dt)
yield c
class Message(NamedTuple):
dt: datetime
dt_readable: str
who: str | None
message: str
phone_number: str
# type - 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox, 5 = Failed, 6 = Queued
message_type: int
@property
def from_me(self) -> bool:
return self.message_type == 2
def messages() -> Iterator[Res[Message]]:
files = get_files(config.export_path, glob='sms-*.xml')
emitted: set[tuple[datetime, str | None, bool]] = set()
for p in files:
for c in _extract_messages(p):
if isinstance(c, Exception):
yield c
continue
key = (c.dt, c.who, c.from_me)
if key in emitted:
continue
emitted.add(key)
yield c
def _extract_messages(path: Path) -> Iterator[Res[Message]]:
tr = _parse_xml(path)
for mxml in tr.findall('sms'):
dt = mxml.get('date')
dt_readable = mxml.get('readable_date')
who = mxml.get('contact_name')
if who is not None and who in UNKNOWN:
who = None
message = mxml.get('body')
phone_number = mxml.get('address')
message_type = mxml.get('type')
if dt is None or dt_readable is None or message is None or phone_number is None or message_type is None:
msg_str = etree.tostring(mxml).decode('utf-8')
yield RuntimeError(f"Missing one or more required attributes [date, readable_date, body, address, type] in {msg_str}")
continue
yield Message(
dt=_parse_dt_ms(dt),
dt_readable=dt_readable,
who=who,
message=message,
phone_number=phone_number,
message_type=int(message_type),
)
class MMSContentPart(NamedTuple):
sequence_index: int
content_type: str
filename: str
text: str | None
data: str | None
class MMS(NamedTuple):
dt: datetime
dt_readable: str
parts: list[MMSContentPart]
# NOTE: these is often something like 'Name 1, Name 2', but might be different depending on your client
who: str | None
# NOTE: This can be a single phone number, or multiple, split by '~' or ','. Its better to think
# of this as a 'key' or 'conversation ID', phone numbers are also present in 'addresses'
phone_number: str
addresses: list[tuple[str, int]]
# 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox
message_type: int
@property
def from_user(self) -> str:
# since these can be group messages, we can't just check message_type,
# we need to iterate through and find who sent it
# who is CC/'To' is not obvious in many message clients
#
# 129 = BCC, 130 = CC, 151 = To, 137 = From
for (addr, _type) in self.addresses:
if _type == 137:
return addr
# hmm, maybe return instead? but this probably shouldn't happen, means
# something is very broken
raise RuntimeError(f'No from address matching 137 found in {self.addresses}')
@property
def from_me(self) -> bool:
return self.message_type == 2
def mms() -> Iterator[Res[MMS]]:
files = get_files(config.export_path, glob='sms-*.xml')
emitted: set[tuple[datetime, str | None, str]] = set()
for p in files:
for c in _extract_mms(p):
if isinstance(c, Exception):
yield c
continue
key = (c.dt, c.phone_number, c.from_user)
if key in emitted:
continue
emitted.add(key)
yield c
def _resolve_null_str(value: str | None) -> str | None:
if value is None:
return None
# hmm.. there's some risk of the text actually being 'null', but there's
# no way to distinguish that from XML values
if value == 'null':
return None
return value
def _extract_mms(path: Path) -> Iterator[Res[MMS]]:
tr = _parse_xml(path)
for mxml in tr.findall('mms'):
dt = mxml.get('date')
dt_readable = mxml.get('readable_date')
message_type = mxml.get('msg_box')
who = mxml.get('contact_name')
if who is not None and who in UNKNOWN:
who = None
phone_number = mxml.get('address')
if dt is None or dt_readable is None or message_type is None or phone_number is None:
mxml_str = etree.tostring(mxml).decode('utf-8')
yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}')
continue
addresses: list[tuple[str, int]] = []
for addr_parent in mxml.findall('addrs'):
for addr in addr_parent.findall('addr'):
addr_data = addr.attrib
user_address = addr_data.get('address')
user_type = addr_data.get('type')
if user_address is None or user_type is None:
addr_str = etree.tostring(addr_parent).decode()
yield RuntimeError(f'Missing one or more required attributes [address, type] in {addr_str}')
continue
if not user_type.isdigit():
yield RuntimeError(f'Invalid address type {user_type} {type(user_type)}, cannot convert to number')
continue
addresses.append((user_address, int(user_type)))
content: list[MMSContentPart] = []
for part_root in mxml.findall('parts'):
for part in part_root.findall('part'):
# the first item is an SMIL XML element encoded as a string which describes
# how the rest of the parts are laid out
# https://www.w3.org/TR/SMIL3/smil-timing.html#Timing-TimeContainerSyntax
# An example:
# <smil><head><layout><root-layout/><region id="Text" top="0" left="0" height="100%" width="100%"/></layout></head><body><par dur="5000ms"><text src="text.000000.txt" region="Text" /></par></body></smil>
#
# This seems pretty useless, so we should try and skip it, and just return the
# text/images/data
part_data: dict[str, Any] = part.attrib
seq: str | None = part_data.get('seq')
if seq == '-1':
continue
if seq is None or not seq.isdigit():
yield RuntimeError(f'seq must be a number, was seq={seq} {type(seq)} in {part_data}')
continue
charset_type: str | None = _resolve_null_str(part_data.get('ct'))
filename: str | None = _resolve_null_str(part_data.get('name'))
# in some cases (images, cards), the filename is set in 'cl' instead
if filename is None:
filename = _resolve_null_str(part_data.get('cl'))
text: str | None = _resolve_null_str(part_data.get('text'))
data: str | None = _resolve_null_str(part_data.get('data'))
if charset_type is None or filename is None or (text is None and data is None):
yield RuntimeError(f'Missing one or more required attributes [ct, name, (text, data)] must be present in {part_data}')
continue
content.append(
MMSContentPart(
sequence_index=int(seq),
content_type=charset_type,
filename=filename,
text=text,
data=data
)
)
yield MMS(
dt=_parse_dt_ms(dt),
dt_readable=dt_readable,
who=who,
phone_number=phone_number,
message_type=int(message_type),
parts=content,
addresses=addresses,
)
# See https://github.com/karlicoss/HPI/pull/90#issuecomment-702422351
# for potentially parsing timezone from the readable_date
def _parse_dt_ms(d: str) -> datetime:
return datetime.fromtimestamp(int(d) / 1000, tz=timezone.utc)
def stats() -> Stats:
return {
**stat(calls),
**stat(messages),
**stat(mms),
}