initial mms exploration
This commit is contained in:
parent
8a8a1ebb0e
commit
06936aa560
1 changed files with 122 additions and 1 deletions
123
my/smscalls.py
123
my/smscalls.py
|
@ -20,7 +20,7 @@ config = make_config(smscalls)
|
|||
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple, Iterator, Set, Tuple, Optional
|
||||
from typing import NamedTuple, Iterator, Set, Tuple, Optional, Any, Dict, List
|
||||
|
||||
from lxml import etree
|
||||
|
||||
|
@ -150,6 +150,126 @@ def _extract_messages(path: Path) -> Iterator[Res[Message]]:
|
|||
)
|
||||
|
||||
|
||||
class MMSContentPart(NamedTuple):
|
||||
sequence_index: int
|
||||
content_type: str
|
||||
filename: str
|
||||
text: str
|
||||
|
||||
|
||||
# https://www.synctech.com.au/sms-backup-restore/fields-in-xml-backup-files/
|
||||
class MMS(NamedTuple):
|
||||
dt: datetime
|
||||
dt_readable: str
|
||||
content: List[MMSContentPart]
|
||||
who: Optional[str]
|
||||
phone_number: str
|
||||
# 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox
|
||||
message_type: int
|
||||
|
||||
|
||||
def mms() -> Iterator[Res[MMS]]:
|
||||
files = get_files(config.export_path, glob='sms-*.xml')
|
||||
|
||||
emitted: Set[Tuple[datetime, Optional[str]]] = set()
|
||||
for p in files:
|
||||
for c in _parse_mms(p):
|
||||
if isinstance(c, Exception):
|
||||
yield c
|
||||
continue
|
||||
key = (c.dt, c.who)
|
||||
if key in emitted:
|
||||
continue
|
||||
emitted.add(key)
|
||||
yield c
|
||||
|
||||
|
||||
def _parse_mms(path: Path) -> Iterator[Res[MMS]]:
|
||||
tr = etree.parse(str(path))
|
||||
|
||||
for mxml in tr.findall('mms'):
|
||||
dt = mxml.get('date')
|
||||
dt_readable = mxml.get('readable_date')
|
||||
who = mxml.get('contact_name')
|
||||
if who is not None and who in UNKNOWN:
|
||||
who = None
|
||||
phone_number = mxml.get('address')
|
||||
message_type = mxml.get('msg_box')
|
||||
|
||||
if dt is None or dt_readable is None or message_type is None or phone_number is None:
|
||||
mxml_str = etree.tostring(mxml).decode('utf-8')
|
||||
breakpoint()
|
||||
yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}')
|
||||
continue
|
||||
|
||||
content: List[MMSContentPart] = []
|
||||
|
||||
# seems pointless, but will leave here as its how the spec describes it
|
||||
# The addresses here are also included in 'addresses' (which also doesnt
|
||||
# include your own address), as well as possibly contains a contact name
|
||||
#
|
||||
# the only difference is its a single field (split by '~' it seems?) when
|
||||
# using the top-level attributes instead of 'addresses', but seems to be no
|
||||
# reason to use this
|
||||
#
|
||||
# addresses = []
|
||||
# for addr_parent in mxml.findall('addrs'):
|
||||
# for addr in addr_parent.findall('addr'):
|
||||
# if "address" in addr.attrib:
|
||||
# # 129 = BCC, 130 = CC, 151 = To, 137 = From
|
||||
# addresses.append(addr.attrib["address"])
|
||||
|
||||
for part_root in mxml.findall('parts'):
|
||||
|
||||
for part in part_root.findall('part'):
|
||||
|
||||
# the first item is an SMIL XML element encoded as a string which describes
|
||||
# how the rest of the parts are laid out
|
||||
# https://www.w3.org/TR/SMIL3/smil-timing.html#Timing-TimeContainerSyntax
|
||||
# An example:
|
||||
# <smil><head><layout><root-layout/><region id="Text" top="0" left="0" height="100%" width="100%"/></layout></head><body><par dur="5000ms"><text src="text.000000.txt" region="Text" /></par></body></smil>
|
||||
#
|
||||
# This seems pretty useless, so we should try and skip it, and just return the
|
||||
# text/images/data
|
||||
#
|
||||
# man, attrib is some internal cpython ._Attrib type which can't
|
||||
# be typed by any sort of mappingproxy. maybe a protocol could work..?
|
||||
part_data: Dict[str, Any] = part.attrib # type: ignore
|
||||
seq: Optional[str] = part_data.get('seq')
|
||||
if seq == "-1":
|
||||
continue
|
||||
|
||||
if seq is None or not seq.isdigit():
|
||||
yield RuntimeError(f"seq must be a number, was seq={seq} in {part_data}")
|
||||
continue
|
||||
|
||||
charset_type: Optional[str] = part_data.get('ct')
|
||||
filename: Optional[str] = part_data.get('name')
|
||||
data: Optional[str] = part_data.get('text')
|
||||
|
||||
if charset_type is None or filename is None or data is None:
|
||||
yield RuntimeError(f"charset_type, filename, data must be present in {part_data}")
|
||||
continue
|
||||
|
||||
content.append(
|
||||
MMSContentPart(
|
||||
sequence_index=int(seq),
|
||||
content_type=charset_type,
|
||||
filename=filename,
|
||||
text=data
|
||||
)
|
||||
)
|
||||
|
||||
yield MMS(
|
||||
dt=_parse_dt_ms(dt),
|
||||
dt_readable=dt_readable,
|
||||
who=who,
|
||||
phone_number=phone_number,
|
||||
message_type=int(message_type),
|
||||
content=content
|
||||
)
|
||||
|
||||
|
||||
# See https://github.com/karlicoss/HPI/pull/90#issuecomment-702422351
|
||||
# for potentially parsing timezone from the readable_date
|
||||
def _parse_dt_ms(d: str) -> datetime:
|
||||
|
@ -162,4 +282,5 @@ def stats() -> Stats:
|
|||
return {
|
||||
**stat(calls),
|
||||
**stat(messages),
|
||||
**stat(mms),
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue