HPI/my/pdfs.py

205 lines
6 KiB
Python
Executable file

'''
PDF documents and annotations on your filesystem
'''
REQUIRES = [
'git+https://github.com/0xabu/pdfannots',
]
from contextlib import redirect_stderr
from datetime import datetime
from dataclasses import dataclass
import io
from pathlib import Path
import re
import sys
import time
from typing import NamedTuple, List, Optional, Iterator, Sequence
from my.core import LazyLogger, get_files, Paths, PathIsh
from my.core.cfg import Attrs, make_config
from my.core.common import mcachew, group_by_key
from my.core.error import Res, split_errors
import pdfannots # type: ignore[import]
from my.config import pdfs as user_config
@dataclass
class pdfs(user_config):
paths: Paths = () # allowed to be empty for 'filelist' logic
def is_ignored(self, p: Path) -> bool:
"""
Used to ignore some extremely heavy files
is_ignored function taken either from config,
or if not defined, it's a function that returns False
"""
user_ignore = getattr(user_config, 'is_ignored', None)
if user_ignore is not None:
return user_ignore(p)
return False
@staticmethod
def _migration(attrs: Attrs) -> Attrs:
roots = 'roots'
if roots in attrs: # legacy name
attrs['paths'] = attrs[roots]
from my.core.warnings import high
high(f'"{roots}" is deprecated! Use "paths" instead.')
return attrs
config = make_config(pdfs, migration=pdfs._migration)
logger = LazyLogger(__name__)
def inputs() -> Sequence[Path]:
# TODO ignoring could be handled on get_files/user config site as well?..
all_files = get_files(config.paths, glob='**/*.pdf')
return [p for p in all_files if not config.is_ignored(p)]
# TODO canonical names/fingerprinting?
# TODO defensive if pdf was removed, also cachew key needs to be defensive
class Annotation(NamedTuple):
path: str
author: Optional[str]
page: int
highlight: Optional[str]
comment: Optional[str]
created: Optional[datetime] # note: can be tz unaware in some bad pdfs...
@property
def date(self) -> Optional[datetime]:
# legacy name
return self.created
def as_annotation(*, raw_ann, path: str) -> Annotation:
d = vars(raw_ann)
d['page'] = raw_ann.page.pageno
for a in ('boxes', 'rect'):
if a in d:
del d[a]
return Annotation(
path = path,
author = d['author'],
page = d['page'],
highlight = d['text'],
comment = d['contents'],
created = d.get('created'), # todo can be non-defensive once pr is merged
)
def get_annots(p: Path) -> List[Annotation]:
b = time.time()
with p.open('rb') as fo:
f = io.StringIO()
with redirect_stderr(f):
# FIXME
(annots, outlines) = pdfannots.process_file(fo, emit_progress=False)
# outlines are kinda like TOC, I don't really need them
a = time.time()
took = a - b
tooks = f'took {took:0.1f} seconds'
if took > 5:
tooks = tooks.upper()
logger.debug('extracting %s %s: %d annotations', tooks, p, len(annots))
return [as_annotation(raw_ann=a, path=str(p)) for a in annots]
# TODO stderr?
def _hash_files(pdfs: Sequence[Path]):
# if mtime hasn't changed then the file hasn't changed either
return [(pdf, pdf.stat().st_mtime) for pdf in pdfs]
# TODO might make more sense to be more fine grained here, e.g. cache annotations for indifidual files
@mcachew(depends_on=_hash_files)
def _iter_annotations(pdfs: Sequence[Path]) -> Iterator[Res[Annotation]]:
logger.info('processing %d pdfs', len(pdfs))
# todo how to print to stdout synchronously?
# todo global config option not to use pools? useful for debugging..
from concurrent.futures import ProcessPoolExecutor
from my.core.common import DummyExecutor
workers = None # use 0 for debugging
Pool = DummyExecutor if workers == 0 else ProcessPoolExecutor
with Pool(workers) as pool:
futures = [
pool.submit(get_annots, pdf)
for pdf in pdfs
]
for f, pdf in zip(futures, pdfs):
try:
yield from f.result()
except Exception as e:
logger.error('While processing %s:', pdf)
logger.exception(e)
# todo add a comment that it can be ignored... or something like that
# TODO not sure if should attach pdf as well; it's a bit annoying to pass around?
# also really have to think about interaction with cachew...
yield e
def annotations() -> Iterator[Res[Annotation]]:
pdfs = inputs()
yield from _iter_annotations(pdfs=pdfs)
class Pdf(NamedTuple):
path: Path
annotations: Sequence[Annotation]
@property
def created(self) -> Optional[datetime]:
annots = self.annotations
return None if len(annots) == 0 else annots[-1].created
@property
def date(self) -> Optional[datetime]:
# legacy
return self.created
def annotated_pdfs(*, filelist: Optional[Sequence[PathIsh]]=None) -> Iterator[Res[Pdf]]:
if filelist is not None:
# hacky... keeping it backwards compatible
# https://github.com/karlicoss/HPI/pull/74
config.paths = filelist
ait = annotations()
vit, eit = split_errors(ait, ET=Exception)
for k, g in group_by_key(vit, key=lambda a: a.path).items():
yield Pdf(path=Path(k), annotations=g)
yield from eit
from my.core import stat, Stats
def stats() -> Stats:
return {
**stat(annotations) ,
**stat(annotated_pdfs),
}
### legacy/misc stuff
# todo retire later if favor of hpi query?
def main() -> None:
from pprint import pprint
collected = annotated_pdfs()
for r in collected:
if isinstance(r, Exception):
logger.exception(r)
else:
logger.info('collected annotations in: %s', r.path)
for a in r.annotations:
pprint(a)
iter_annotations = annotations # for backwards compatibility
###