Use model; only keep necessary methods
This commit is contained in:
parent
1a365762bc
commit
943c572e00
1 changed files with 37 additions and 107 deletions
|
@ -1,97 +1,26 @@
|
|||
#!/usr/bin/env python3
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, NamedTuple, Iterator, Optional, Sequence
|
||||
from functools import lru_cache
|
||||
from typing import NamedTuple
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
|
||||
from lxml import etree as ET # type: ignore
|
||||
from .. import paths
|
||||
|
||||
BPATH = Path("/L/backups/goodreads")
|
||||
|
||||
# TODO might be useful to keep track of updates?...
|
||||
# then I need some sort of system to store diffs in generic way...
|
||||
# althogh... coud use same mechanism as for filtering
|
||||
def get_last() -> Path:
|
||||
return max(sorted(BPATH.glob('*.xmll')))
|
||||
|
||||
_SP = '</review>'
|
||||
|
||||
def get_reviews():
|
||||
fname = get_last()
|
||||
xmls = []
|
||||
with open(fname, 'r') as fo:
|
||||
data = fo.read()
|
||||
for xx in data.split(_SP):
|
||||
if len(xx.strip()) == 0:
|
||||
break
|
||||
xmls.append(ET.fromstring(xx + _SP))
|
||||
return xmls
|
||||
|
||||
class Book(NamedTuple):
|
||||
bid: str
|
||||
title: str
|
||||
authors: Sequence[str]
|
||||
shelves: Sequence[str]
|
||||
date_added: datetime
|
||||
date_started: Optional[datetime]
|
||||
date_read: Optional[datetime]
|
||||
|
||||
from kython import the
|
||||
@lru_cache()
|
||||
def goodrexport():
|
||||
from kython import import_file
|
||||
return import_file(paths.goodrexport.repo / 'model.py')
|
||||
|
||||
|
||||
def _parse_date(s: Optional[str]) -> Optional[datetime]:
|
||||
if s is None:
|
||||
return None
|
||||
res = datetime.strptime(s, "%a %b %d %H:%M:%S %z %Y")
|
||||
assert res.tzinfo is not None
|
||||
return res
|
||||
def get_model():
|
||||
sources = list(sorted(paths.goodrexport.export_dir.glob('*.xml')))
|
||||
model = goodrexport().Model(sources)
|
||||
return model
|
||||
|
||||
|
||||
def iter_books() -> Iterator[Book]:
|
||||
for r in get_reviews():
|
||||
# review_xml = the(review.childNodes)
|
||||
# rdict = {n.tagName: n for n in review_xml.childNodes if isinstance(n, Element)}
|
||||
# fuck xml...
|
||||
|
||||
be = the(r.xpath('book'))
|
||||
title = the(be.xpath('title/text()'))
|
||||
authors = be.xpath('authors/author/name/text()')
|
||||
|
||||
bid = the(r.xpath('id/text()'))
|
||||
# isbn_element = the(book_element.getElementsByTagName('isbn'))
|
||||
# isbn13_element = the(book_element.getElementsByTagName('isbn13'))
|
||||
date_added = the(r.xpath('date_added/text()'))
|
||||
sss = r.xpath('started_at/text()')
|
||||
rrr = r.xpath('read_at/text()')
|
||||
started_at = None if len(sss) == 0 else the(sss)
|
||||
read_at = None if len(rrr) == 0 else the(rrr)
|
||||
|
||||
shelves = r.xpath('shelves/shelf/name/text()')
|
||||
|
||||
# if isbn_element.getAttribute('nil') != 'true':
|
||||
# book['isbn'] = isbn_element.firstChild.data
|
||||
# else:
|
||||
# book['isbn'] = ''
|
||||
|
||||
# if isbn13_element.getAttribute('nil') != 'true':
|
||||
# book['isbn13'] = isbn13_element.firstChild.data
|
||||
# else:
|
||||
# book['isbn13'] = ''
|
||||
|
||||
da = _parse_date(date_added)
|
||||
assert da is not None
|
||||
yield Book(
|
||||
bid=bid,
|
||||
title=title,
|
||||
authors=authors,
|
||||
shelves=shelves,
|
||||
date_added=da,
|
||||
date_started=_parse_date(started_at),
|
||||
date_read=_parse_date(read_at),
|
||||
)
|
||||
|
||||
def get_books():
|
||||
return list(iter_books())
|
||||
model = get_model()
|
||||
return [r.book for r in model.reviews()]
|
||||
|
||||
|
||||
def test_books():
|
||||
|
@ -111,49 +40,50 @@ def get_events():
|
|||
events.append(Event(
|
||||
dt=b.date_added,
|
||||
summary=f'Added book "{b.title}"', # TODO shelf?
|
||||
eid=b.bid
|
||||
eid=b.id
|
||||
))
|
||||
# TODO finished? other updates?
|
||||
return sorted(events, key=lambda e: e.dt)
|
||||
|
||||
|
||||
def test():
|
||||
assert len(get_events()) > 20
|
||||
|
||||
|
||||
def print_read_history():
|
||||
def key(b):
|
||||
read = b.date_read
|
||||
if read is None:
|
||||
def ddate(x):
|
||||
if x is None:
|
||||
return datetime.fromtimestamp(0, pytz.utc)
|
||||
else:
|
||||
return read
|
||||
return x
|
||||
|
||||
def key(b):
|
||||
return ddate(b.date_started)
|
||||
|
||||
def fmtdt(dt):
|
||||
if dt is None:
|
||||
return dt
|
||||
tz = pytz.timezone('Europe/London')
|
||||
return dt.astimezone(tz)
|
||||
for b in sorted(iter_books(), key=key):
|
||||
for b in sorted(get_books(), key=key):
|
||||
print(f"""
|
||||
{b.title} by {', '.join(b.authors)}
|
||||
started : {fmtdt(b.date_started)}
|
||||
finished: {fmtdt(b.date_read)}
|
||||
""")
|
||||
|
||||
def test():
|
||||
assert len(get_events()) > 20
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
p = argparse.ArgumentParser()
|
||||
sp = p.add_argument('mode', nargs='?')
|
||||
args = p.parse_args()
|
||||
|
||||
if args.mode == 'history':
|
||||
print_read_history()
|
||||
else:
|
||||
assert args.mode is None
|
||||
for b in iter_books():
|
||||
print(b)
|
||||
# def main():
|
||||
# import argparse
|
||||
# p = argparse.ArgumentParser()
|
||||
# sp = p.add_argument('mode', nargs='?')
|
||||
# args = p.parse_args()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
# if args.mode == 'history':
|
||||
# print_read_history()
|
||||
# else:
|
||||
# assert args.mode is None
|
||||
# for b in iter_books():
|
||||
# print(b)
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# main()
|
||||
|
|
Loading…
Add table
Reference in a new issue