commit
c07ea0a600
7 changed files with 229 additions and 61 deletions
|
@ -33,6 +33,7 @@ modules = [
|
|||
('twint' , 'my.twitter.twint' ),
|
||||
('twitter', 'my.twitter.archive' ),
|
||||
('lastfm' , 'my.lastfm' ),
|
||||
('polar' , 'my.reading.polar' ),
|
||||
]
|
||||
|
||||
def indent(s, spaces=4):
|
||||
|
@ -117,4 +118,15 @@ for cls, p in modules:
|
|||
"""
|
||||
export_path: Paths
|
||||
#+end_src
|
||||
- [[file:../my/reading/polar.py][my.reading.polar]]
|
||||
|
||||
[[https://github.com/burtonator/polar-books][Polar]] articles and highlights
|
||||
|
||||
#+begin_src python
|
||||
class polar:
|
||||
'''
|
||||
Polar config is optional, you only need it if you want to specify custom 'polar_dir'
|
||||
'''
|
||||
polar_dir: Path = Path('~/.polar').expanduser()
|
||||
#+end_src
|
||||
:end:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# this file only keeps the most common & critical types/utility functions
|
||||
from .common import PathIsh, Paths, Json
|
||||
from .common import get_files
|
||||
from .common import get_files, LazyLogger
|
||||
from .cfg import make_config
|
||||
|
|
|
@ -134,7 +134,8 @@ def get_files(pp: Paths, glob: str=DEFAULT_GLOB, sort: bool=True) -> Tuple[Path,
|
|||
warnings.warn(f"Treating {ss} as glob path. Explicit glob={glob} argument is ignored!")
|
||||
paths.extend(map(Path, do_glob(ss)))
|
||||
else:
|
||||
assert src.is_file(), src
|
||||
if not src.is_file():
|
||||
raise RuntimeError(f"Expected '{src}' to exist")
|
||||
# todo assert matches glob??
|
||||
paths.append(src)
|
||||
|
||||
|
@ -245,3 +246,10 @@ def isoparse(s: str) -> tzdatetime:
|
|||
assert s.endswith('Z'), s
|
||||
s = s[:-1] + '+00:00'
|
||||
return fromisoformat(s)
|
||||
|
||||
|
||||
import re
|
||||
# https://stackoverflow.com/a/295466/706389
|
||||
def get_valid_filename(s: str) -> str:
|
||||
s = str(s).strip().replace(' ', '_')
|
||||
return re.sub(r'(?u)[^-\w.]', '', s)
|
||||
|
|
|
@ -30,6 +30,7 @@ def setup_config() -> None:
|
|||
import os
|
||||
import warnings
|
||||
from typing import Optional
|
||||
import appdirs # type: ignore[import]
|
||||
|
||||
# not sure if that's necessary, i.e. could rely on PYTHONPATH instead
|
||||
# on the other hand, by using MY_CONFIG we are guaranteed to load it from the desired path?
|
||||
|
@ -37,9 +38,7 @@ def setup_config() -> None:
|
|||
if mvar is not None:
|
||||
mycfg_dir = Path(mvar)
|
||||
else:
|
||||
# TODO use appdir??
|
||||
cfg_dir = Path('~/.config').expanduser()
|
||||
mycfg_dir = cfg_dir / 'my'
|
||||
mycfg_dir = Path(appdirs.user_config_dir('my'))
|
||||
|
||||
if not mycfg_dir.exists():
|
||||
warnings.warn(f"my.config package isn't found! (expected at {mycfg_dir}). This is likely to result in issues.")
|
||||
|
|
|
@ -11,7 +11,7 @@ def zoom(w, *keys):
|
|||
|
||||
# TODO need to support lists
|
||||
class Zoomable:
|
||||
def __init__(self, parent, *args, **kwargs):
|
||||
def __init__(self, parent, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs) # type: ignore
|
||||
self.parent = parent
|
||||
|
||||
|
@ -21,19 +21,19 @@ class Zoomable:
|
|||
def dependants(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def ignore(self):
|
||||
def ignore(self) -> None:
|
||||
self.consume_all()
|
||||
|
||||
def consume_all(self):
|
||||
def consume_all(self) -> None:
|
||||
for d in self.dependants:
|
||||
d.consume_all()
|
||||
self.consume()
|
||||
|
||||
def consume(self):
|
||||
def consume(self) -> None:
|
||||
assert self.parent is not None
|
||||
self.parent._remove(self)
|
||||
|
||||
def zoom(self):
|
||||
def zoom(self) -> 'Zoomable':
|
||||
self.consume()
|
||||
return self
|
||||
|
||||
|
@ -56,6 +56,8 @@ class Wdict(Zoomable, OrderedDict):
|
|||
|
||||
def this_consumed(self):
|
||||
return len(self) == 0
|
||||
# TODO specify mypy type for the index special method?
|
||||
|
||||
|
||||
class Wlist(Zoomable, list):
|
||||
def _remove(self, xx):
|
||||
|
@ -83,7 +85,8 @@ class Wvalue(Zoomable):
|
|||
def __repr__(self):
|
||||
return 'WValue{' + repr(self.value) + '}'
|
||||
|
||||
def _wrap(j, parent=None):
|
||||
from typing import Tuple
|
||||
def _wrap(j, parent=None) -> Tuple[Zoomable, List[Zoomable]]:
|
||||
res: Zoomable
|
||||
cc: List[Zoomable]
|
||||
if isinstance(j, dict):
|
||||
|
@ -109,13 +112,14 @@ def _wrap(j, parent=None):
|
|||
raise RuntimeError(f'Unexpected type: {type(j)} {j}')
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Iterator
|
||||
|
||||
class UnconsumedError(Exception):
|
||||
pass
|
||||
|
||||
# TODO think about error policy later...
|
||||
@contextmanager
|
||||
def wrap(j, throw=True):
|
||||
def wrap(j, throw=True) -> Iterator[Zoomable]:
|
||||
w, children = _wrap(j)
|
||||
|
||||
yield w
|
||||
|
@ -123,33 +127,41 @@ def wrap(j, throw=True):
|
|||
for c in children:
|
||||
if not c.this_consumed(): # TODO hmm. how does it figure out if it's consumed???
|
||||
if throw:
|
||||
raise UnconsumedError(str(c))
|
||||
# TODO need to keep a full path or something...
|
||||
raise UnconsumedError(f'''
|
||||
Expected {c} to be fully consumed by the parser.
|
||||
'''.lstrip())
|
||||
else:
|
||||
# TODO log?
|
||||
pass
|
||||
|
||||
|
||||
from typing import cast
|
||||
def test_unconsumed():
|
||||
import pytest # type: ignore
|
||||
with pytest.raises(UnconsumedError):
|
||||
with wrap({'a': 1234}) as w:
|
||||
w = cast(Wdict, w)
|
||||
pass
|
||||
|
||||
with pytest.raises(UnconsumedError):
|
||||
with wrap({'c': {'d': 2222}}) as w:
|
||||
w = cast(Wdict, w)
|
||||
d = w['c']['d'].zoom()
|
||||
|
||||
def test_consumed():
|
||||
with wrap({'a': 1234}) as w:
|
||||
w = cast(Wdict, w)
|
||||
a = w['a'].zoom()
|
||||
|
||||
with wrap({'c': {'d': 2222}}) as w:
|
||||
w = cast(Wdict, w)
|
||||
c = w['c'].zoom()
|
||||
d = c['d'].zoom()
|
||||
|
||||
def test_types():
|
||||
# (string, number, object, array, boolean or nul
|
||||
with wrap({'string': 'string', 'number': 3.14, 'boolean': True, 'null': None, 'list': [1, 2, 3]}) as w:
|
||||
w = cast(Wdict, w)
|
||||
w['string'].zoom()
|
||||
w['number'].consume()
|
||||
w['boolean'].zoom()
|
||||
|
@ -159,5 +171,31 @@ def test_types():
|
|||
|
||||
def test_consume_all():
|
||||
with wrap({'aaa': {'bbb': {'hi': 123}}}) as w:
|
||||
w = cast(Wdict, w)
|
||||
aaa = w['aaa'].zoom()
|
||||
aaa['bbb'].consume_all()
|
||||
|
||||
|
||||
def test_consume_few():
|
||||
import pytest
|
||||
pytest.skip('Will think about it later..')
|
||||
with wrap({
|
||||
'important': 123,
|
||||
'unimportant': 'whatever'
|
||||
}) as w:
|
||||
w = cast(Wdict, w)
|
||||
w['important'].zoom()
|
||||
w.consume_all()
|
||||
# TODO hmm, we want smth like this to work..
|
||||
|
||||
|
||||
def test_zoom() -> None:
|
||||
import pytest # type: ignore
|
||||
with wrap({'aaa': 'whatever'}) as w:
|
||||
w = cast(Wdict, w)
|
||||
with pytest.raises(KeyError):
|
||||
w['nosuchkey'].zoom()
|
||||
w['aaa'].zoom()
|
||||
|
||||
|
||||
# TODO type check this...
|
||||
|
|
|
@ -1,33 +1,56 @@
|
|||
"""
|
||||
[[https://github.com/burtonator/polar-books][Polar]] articles and highlights
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Type, Any, cast, TYPE_CHECKING
|
||||
|
||||
|
||||
import my.config
|
||||
|
||||
if not TYPE_CHECKING:
|
||||
user_config = getattr(my.config, 'polar', None)
|
||||
else:
|
||||
# mypy can't handle dynamic base classes... https://github.com/python/mypy/issues/2477
|
||||
user_config = object
|
||||
|
||||
# by default, Polar doesn't need any config, so perhaps makes sense to make it defensive here
|
||||
if user_config is None:
|
||||
class user_config: # type: ignore[no-redef]
|
||||
pass
|
||||
|
||||
|
||||
from ..core import PathIsh
|
||||
from dataclasses import dataclass
|
||||
@dataclass
|
||||
class polar(user_config):
|
||||
'''
|
||||
Polar config is optional, you only need it if you want to specify custom 'polar_dir'
|
||||
'''
|
||||
polar_dir: PathIsh = Path('~/.polar').expanduser()
|
||||
defensive: bool = True # pass False if you want it to fail faster on errors (useful for debugging)
|
||||
|
||||
|
||||
from ..core import make_config
|
||||
config = make_config(polar)
|
||||
|
||||
# todo not sure where it keeps stuff on Windows?
|
||||
# https://github.com/burtonator/polar-bookshelf/issues/296
|
||||
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Iterator, NamedTuple, Sequence, Optional
|
||||
from typing import List, Dict, Iterable, NamedTuple, Sequence, Optional
|
||||
import json
|
||||
|
||||
import pytz
|
||||
|
||||
from ..common import LazyLogger, get_files
|
||||
|
||||
from ..error import Res, echain, unwrap, sort_res_by
|
||||
from ..kython.konsume import wrap, zoom, ignore
|
||||
|
||||
|
||||
_POLAR_DIR = Path('~').expanduser() / '.polar'
|
||||
from ..core import LazyLogger, Json
|
||||
from ..core.common import isoparse
|
||||
from ..error import Res, echain, sort_res_by
|
||||
from ..kython.konsume import wrap, zoom, ignore, Zoomable, Wdict
|
||||
|
||||
|
||||
logger = LazyLogger(__name__)
|
||||
|
||||
|
||||
# TODO use core.isoparse
|
||||
def parse_dt(s: str) -> datetime:
|
||||
return pytz.utc.localize(datetime.strptime(s, '%Y-%m-%dT%H:%M:%S.%fZ'))
|
||||
|
||||
Uid = str
|
||||
|
||||
|
||||
# Ok I guess handling comment-level errors is a bit too much..
|
||||
Cid = str
|
||||
class Comment(NamedTuple):
|
||||
|
@ -41,18 +64,26 @@ class Highlight(NamedTuple):
|
|||
created: datetime
|
||||
selection: str
|
||||
comments: Sequence[Comment]
|
||||
tags: Sequence[str]
|
||||
color: Optional[str] = None
|
||||
|
||||
|
||||
Uid = str
|
||||
class Book(NamedTuple):
|
||||
uid: Uid
|
||||
created: datetime
|
||||
filename: str
|
||||
uid: Uid
|
||||
path: Path
|
||||
title: Optional[str]
|
||||
# TODO hmmm. I think this needs to be defensive as well...
|
||||
# think about it later.
|
||||
items: Sequence[Highlight]
|
||||
|
||||
Error = Exception # for backwards compat with Orger; can remove later
|
||||
tags: Sequence[str]
|
||||
|
||||
@property
|
||||
def filename(self) -> str:
|
||||
# TODO deprecate
|
||||
return str(self.path)
|
||||
|
||||
Result = Res[Book]
|
||||
|
||||
|
@ -61,37 +92,45 @@ class Loader:
|
|||
self.path = p
|
||||
self.uid = self.path.parent.name
|
||||
|
||||
def error(self, cause, extra='') -> Exception:
|
||||
def error(self, cause: Exception, extra: str ='') -> Exception:
|
||||
if len(extra) > 0:
|
||||
extra = '\n' + extra
|
||||
return echain(Exception(f'while processing {self.path}{extra}'), cause)
|
||||
|
||||
def load_item(self, meta) -> Iterator[Highlight]:
|
||||
def load_item(self, meta: Zoomable) -> Iterable[Highlight]:
|
||||
meta = cast(Wdict, meta)
|
||||
# TODO this should be destructive zoom?
|
||||
meta['notes'].zoom()
|
||||
meta['pagemarks'].zoom()
|
||||
meta['notes'].zoom() # TODO ??? is it deliberate?
|
||||
|
||||
meta['pagemarks'].consume_all()
|
||||
|
||||
|
||||
if 'notes' in meta:
|
||||
# TODO something nicer?
|
||||
notes = meta['notes'].zoom()
|
||||
else:
|
||||
notes = [] # TODO FIXME dict?
|
||||
comments = meta['comments'].zoom()
|
||||
comments = list(meta['comments'].zoom().values()) if 'comments' in meta else []
|
||||
meta['questions'].zoom()
|
||||
meta['flashcards'].zoom()
|
||||
highlights = meta['textHighlights'].zoom()
|
||||
meta['areaHighlights'].zoom()
|
||||
|
||||
# TODO could be useful to at least add a meta bout area highlights/screens
|
||||
meta['areaHighlights'].consume_all()
|
||||
meta['screenshots'].zoom()
|
||||
meta['thumbnails'].zoom()
|
||||
if 'readingProgress' in meta:
|
||||
meta['readingProgress'].zoom()
|
||||
meta['readingProgress'].consume_all()
|
||||
|
||||
# TODO want to ignore the whold subtree..
|
||||
# TODO want to ignore the whole subtree..
|
||||
pi = meta['pageInfo'].zoom()
|
||||
pi['num'].zoom()
|
||||
if 'dimensions' in pi:
|
||||
pi['dimensions'].consume_all()
|
||||
|
||||
# TODO how to make it nicer?
|
||||
cmap: Dict[Hid, List[Comment]] = {}
|
||||
vals = list(comments.values())
|
||||
vals = list(comments)
|
||||
for v in vals:
|
||||
cid = v['id'].zoom()
|
||||
v['guid'].zoom()
|
||||
|
@ -106,7 +145,7 @@ class Loader:
|
|||
cmap[hlid] = ccs
|
||||
ccs.append(Comment(
|
||||
cid=cid.value,
|
||||
created=parse_dt(crt.value),
|
||||
created=isoparse(crt.value),
|
||||
text=html.value, # TODO perhaps coonvert from html to text or org?
|
||||
))
|
||||
v.consume()
|
||||
|
@ -123,20 +162,32 @@ class Loader:
|
|||
updated = h['lastUpdated'].zoom().value
|
||||
h['rects'].ignore()
|
||||
|
||||
# TODO make it more generic..
|
||||
htags: List[str] = []
|
||||
if 'tags' in h:
|
||||
ht = h['tags'].zoom()
|
||||
for k, v in list(ht.items()):
|
||||
ctag = v.zoom()
|
||||
ctag['id'].consume()
|
||||
ct = ctag['label'].zoom()
|
||||
htags.append(ct.value)
|
||||
|
||||
h['textSelections'].ignore()
|
||||
h['notes'].consume()
|
||||
h['questions'].consume()
|
||||
h['flashcards'].consume()
|
||||
h['color'].consume()
|
||||
color = h['color'].zoom().value
|
||||
h['images'].ignore()
|
||||
# TODO eh, quite excessive \ns...
|
||||
text = h['text'].zoom()['TEXT'].zoom().value
|
||||
|
||||
yield Highlight(
|
||||
hid=hid,
|
||||
created=parse_dt(crt),
|
||||
created=isoparse(crt),
|
||||
selection=text,
|
||||
comments=tuple(comments),
|
||||
tags=tuple(htags),
|
||||
color=color,
|
||||
)
|
||||
h.consume()
|
||||
|
||||
|
@ -146,34 +197,41 @@ class Loader:
|
|||
# TODO sort by date?
|
||||
|
||||
|
||||
def load_items(self, metas) -> Iterator[Highlight]:
|
||||
def load_items(self, metas: Json) -> Iterable[Highlight]:
|
||||
for p, meta in metas.items():
|
||||
with wrap(meta, throw=False) as meta:
|
||||
with wrap(meta, throw=not config.defensive) as meta:
|
||||
yield from self.load_item(meta)
|
||||
|
||||
def load(self) -> Iterator[Result]:
|
||||
def load(self) -> Iterable[Result]:
|
||||
logger.info('processing %s', self.path)
|
||||
j = json.loads(self.path.read_text())
|
||||
|
||||
# TODO konsume here as well?
|
||||
di = j['docInfo']
|
||||
added = di['added']
|
||||
filename = di['filename']
|
||||
filename = di['filename'] # TODO here
|
||||
title = di.get('title', None)
|
||||
tags = di['tags']
|
||||
pm = j['pageMetas']
|
||||
tags_dict = di['tags']
|
||||
pm = j['pageMetas'] # TODO FIXME handle this too
|
||||
|
||||
# todo defensive?
|
||||
tags = tuple(t['label'] for t in tags_dict.values())
|
||||
|
||||
path = Path(config.polar_dir) / 'stash' / filename
|
||||
|
||||
yield Book(
|
||||
created=isoparse(added),
|
||||
uid=self.uid,
|
||||
created=parse_dt(added),
|
||||
filename=filename,
|
||||
path=path,
|
||||
title=title,
|
||||
items=list(self.load_items(pm)),
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
|
||||
def iter_entries() -> Iterator[Result]:
|
||||
for d in get_files(_POLAR_DIR, glob='*/state.json'):
|
||||
def iter_entries() -> Iterable[Result]:
|
||||
from ..core import get_files
|
||||
for d in get_files(config.polar_dir, glob='*/state.json'):
|
||||
loader = Loader(d)
|
||||
try:
|
||||
yield from loader.load()
|
||||
|
@ -185,16 +243,18 @@ def iter_entries() -> Iterator[Result]:
|
|||
|
||||
def get_entries() -> List[Result]:
|
||||
# sorting by first annotation is reasonable I guess???
|
||||
# todo perhaps worth making it a pattern? X() returns iterable, get_X returns reasonably sorted list?
|
||||
return list(sort_res_by(iter_entries(), key=lambda e: e.created))
|
||||
|
||||
|
||||
def main():
|
||||
for entry in iter_entries():
|
||||
try:
|
||||
ee = unwrap(entry)
|
||||
except Error as e:
|
||||
for e in iter_entries():
|
||||
if isinstance(e, Exception):
|
||||
logger.exception(e)
|
||||
else:
|
||||
logger.info('processed %s', ee.uid)
|
||||
for i in ee.items:
|
||||
logger.info('processed %s', e.uid)
|
||||
for i in e.items:
|
||||
logger.info(i)
|
||||
|
||||
|
||||
Error = Exception # for backwards compat with Orger; can remove later
|
||||
|
|
51
tests/extra/polar.py
Normal file
51
tests/extra/polar.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
from pathlib import Path
|
||||
import sys
|
||||
from importlib import reload
|
||||
from my.core.common import get_valid_filename
|
||||
|
||||
ROOT = Path(__file__).parent.absolute()
|
||||
OUTPUTS = ROOT / 'outputs'
|
||||
|
||||
|
||||
import pytest # type: ignore
|
||||
|
||||
|
||||
def test_hpi(prepare: str) -> None:
|
||||
from my.reading.polar import get_entries
|
||||
assert len(list(get_entries())) > 1
|
||||
|
||||
def test_orger(prepare: str, tmp_path: Path) -> None:
|
||||
from my.core.common import import_from, import_file
|
||||
om = import_file(ROOT / 'orger/modules/polar.py')
|
||||
# reload(om)
|
||||
|
||||
pv = om.PolarView() # type: ignore
|
||||
# TODO hmm. worth making public?
|
||||
OUTPUTS.mkdir(exist_ok=True)
|
||||
out = OUTPUTS / (get_valid_filename(prepare) + '.org')
|
||||
pv._run(to=out)
|
||||
|
||||
|
||||
PARAMS = [
|
||||
# 'data/polar/BojanKV_polar/.polar',
|
||||
'',
|
||||
# 'data/polar/TheCedarPrince_KnowledgeRepository',
|
||||
# 'data/polar/coelias_polardocs',
|
||||
# 'data/polar/warkdarrior_polar-document-repository'
|
||||
]
|
||||
|
||||
@pytest.fixture(params=PARAMS)
|
||||
def prepare(request):
|
||||
dotpolar = request.param
|
||||
class user_config:
|
||||
if dotpolar != '': # defaul
|
||||
polar_dir = Path(ROOT / dotpolar)
|
||||
defensive = False
|
||||
|
||||
import my.config
|
||||
setattr(my.config, 'polar', user_config)
|
||||
|
||||
import my.reading.polar as polar
|
||||
reload(polar)
|
||||
# TODO hmm... ok, need to document reload()
|
||||
yield dotpolar
|
Loading…
Add table
Reference in a new issue