pdfs: migrate config to Protocol with properties

allowes to remove a whole bunch of hacky crap from tests!
This commit is contained in:
Dima Gerasimov 2024-08-23 00:47:00 +01:00 committed by karlicoss
parent d154825591
commit 5a67f0bafe
3 changed files with 56 additions and 64 deletions

View file

@ -1,64 +1,64 @@
''' '''
PDF documents and annotations on your filesystem PDF documents and annotations on your filesystem
''' '''
REQUIRES = [ REQUIRES = [
'git+https://github.com/0xabu/pdfannots', 'git+https://github.com/0xabu/pdfannots',
# todo not sure if should use pypi version? # todo not sure if should use pypi version?
] ]
from datetime import datetime
from dataclasses import dataclass
import io
from pathlib import Path
import time import time
from typing import NamedTuple, List, Optional, Iterator, Sequence from datetime import datetime
from pathlib import Path
from typing import Iterator, List, NamedTuple, Optional, Protocol, Sequence
import pdfannots
from more_itertools import bucket
from my.core import LazyLogger, get_files, Paths, PathIsh from my.core import PathIsh, Paths, Stats, get_files, make_logger, stat
from my.core.cachew import mcachew from my.core.cachew import mcachew
from my.core.cfg import Attrs, make_config
from my.core.error import Res, split_errors from my.core.error import Res, split_errors
from more_itertools import bucket class config(Protocol):
import pdfannots @property
def paths(self) -> Paths:
return () # allowed to be empty for 'filelist' logic
from my.config import pdfs as user_config
@dataclass
class pdfs(user_config):
paths: Paths = () # allowed to be empty for 'filelist' logic
def is_ignored(self, p: Path) -> bool: def is_ignored(self, p: Path) -> bool:
""" """
Used to ignore some extremely heavy files You can override this in user config if you want to ignore some files that are tooheavy
is_ignored function taken either from config,
or if not defined, it's a function that returns False
""" """
user_ignore = getattr(user_config, 'is_ignored', None)
if user_ignore is not None:
return user_ignore(p)
return False return False
@staticmethod
def _migration(attrs: Attrs) -> Attrs: def make_config() -> config:
roots = 'roots' from my.config import pdfs as user_config
if roots in attrs: # legacy name
attrs['paths'] = attrs[roots] class migration:
@property
def paths(self) -> Paths:
roots = getattr(user_config, 'roots', None)
if roots is not None:
from my.core.warnings import high from my.core.warnings import high
high(f'"{roots}" is deprecated! Use "paths" instead.')
return attrs high('"roots" is deprecated! Use "paths" instead.')
return roots
else:
return ()
class combined_config(user_config, migration, config): ...
return combined_config()
config = make_config(pdfs, migration=pdfs._migration) logger = make_logger(__name__)
logger = LazyLogger(__name__)
def inputs() -> Sequence[Path]: def inputs() -> Sequence[Path]:
all_files = get_files(config.paths, glob='**/*.pdf') cfg = make_config()
return [p for p in all_files if not config.is_ignored(p)] all_files = get_files(cfg.paths, glob='**/*.pdf')
return [p for p in all_files if not cfg.is_ignored(p)]
# TODO canonical names/fingerprinting? # TODO canonical names/fingerprinting?
@ -121,14 +121,13 @@ def _iter_annotations(pdfs: Sequence[Path]) -> Iterator[Res[Annotation]]:
# todo how to print to stdout synchronously? # todo how to print to stdout synchronously?
# todo global config option not to use pools? useful for debugging.. # todo global config option not to use pools? useful for debugging..
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from my.core.utils.concurrent import DummyExecutor from my.core.utils.concurrent import DummyExecutor
workers = None # use 0 for debugging workers = None # use 0 for debugging
Pool = DummyExecutor if workers == 0 else ProcessPoolExecutor Pool = DummyExecutor if workers == 0 else ProcessPoolExecutor
with Pool(workers) as pool: with Pool(workers) as pool:
futures = [ futures = [pool.submit(get_annots, pdf) for pdf in pdfs]
pool.submit(get_annots, pdf)
for pdf in pdfs
]
for f, pdf in zip(futures, pdfs): for f, pdf in zip(futures, pdfs):
try: try:
yield from f.result() yield from f.result()
@ -161,11 +160,13 @@ class Pdf(NamedTuple):
return self.created return self.created
def annotated_pdfs(*, filelist: Optional[Sequence[PathIsh]]=None) -> Iterator[Res[Pdf]]: def annotated_pdfs(*, filelist: Optional[Sequence[PathIsh]] = None) -> Iterator[Res[Pdf]]:
if filelist is not None: if filelist is not None:
# hacky... keeping it backwards compatible # hacky... keeping it backwards compatible
# https://github.com/karlicoss/HPI/pull/74 # https://github.com/karlicoss/HPI/pull/74
config.paths = filelist from my.config import pdfs as user_config
user_config.paths = filelist
ait = annotations() ait = annotations()
vit, eit = split_errors(ait, ET=Exception) vit, eit = split_errors(ait, ET=Exception)
@ -176,10 +177,9 @@ def annotated_pdfs(*, filelist: Optional[Sequence[PathIsh]]=None) -> Iterator[Re
yield from eit yield from eit
from my.core import stat, Stats
def stats() -> Stats: def stats() -> Stats:
return { return {
**stat(annotations) , **stat(annotations),
**stat(annotated_pdfs), **stat(annotated_pdfs),
} }

View file

@ -20,6 +20,10 @@ def reset_modules() -> None:
''' '''
to_unload = [m for m in sys.modules if re.match(r'my[.]?', m)] to_unload = [m for m in sys.modules if re.match(r'my[.]?', m)]
for m in to_unload: for m in to_unload:
if 'my.pdfs' in m:
# temporary hack -- since my.pdfs migrated to a 'lazy' config, this isn't necessary anymore
# but if we reset module anyway, it confuses the ProcessPool inside my.pdfs
continue
del sys.modules[m] del sys.modules[m]

View file

@ -1,17 +1,16 @@
import inspect
from pathlib import Path from pathlib import Path
import pytest
from more_itertools import ilen from more_itertools import ilen
import pytest from my.core.cfg import tmp_config
from my.tests.common import testdata from my.tests.common import testdata
from my.pdfs import annotated_pdfs, annotations, get_annots
def test_module(with_config) -> None: def test_module(with_config) -> None:
# TODO crap. if module is imported too early (on the top level, it makes it super hard to override config)
# need to at least detect it...
from my.pdfs import annotations, annotated_pdfs
# todo check types etc as well # todo check types etc as well
assert ilen(annotations()) >= 3 assert ilen(annotations()) >= 3
assert ilen(annotated_pdfs()) >= 1 assert ilen(annotated_pdfs()) >= 1
@ -22,12 +21,13 @@ def test_with_error(with_config, tmp_path: Path) -> None:
root = tmp_path root = tmp_path
g = root / 'garbage.pdf' g = root / 'garbage.pdf'
g.write_text('garbage') g.write_text('garbage')
from my.config import pdfs from my.config import pdfs
# meh. otherwise legacy config value 'wins' # meh. otherwise legacy config value 'wins'
del pdfs.roots # type: ignore[attr-defined] del pdfs.roots # type: ignore[attr-defined]
pdfs.paths = (root,) pdfs.paths = (root,)
from my.pdfs import annotations
annots = list(annotations()) annots = list(annotations())
[annot] = annots [annot] = annots
assert isinstance(annot, Exception) assert isinstance(annot, Exception)
@ -35,9 +35,6 @@ def test_with_error(with_config, tmp_path: Path) -> None:
@pytest.fixture @pytest.fixture
def with_config(): def with_config():
from my.tests.common import reset_modules
reset_modules() # todo ugh.. getting boilerplaty.. need to make it a bit more automatic..
# extra_data = Path(__file__).absolute().parent / 'extra/data/polar' # extra_data = Path(__file__).absolute().parent / 'extra/data/polar'
# assert extra_data.exists(), extra_data # assert extra_data.exists(), extra_data
# todo hmm, turned out no annotations in these ones.. whatever # todo hmm, turned out no annotations in these ones.. whatever
@ -47,13 +44,9 @@ def with_config():
testdata(), testdata(),
] ]
import my.core.cfg as C with tmp_config() as config:
with C.tmp_config() as config:
config.pdfs = user_config config.pdfs = user_config
try:
yield yield
finally:
reset_modules()
EXPECTED_HIGHLIGHTS = { EXPECTED_HIGHLIGHTS = {
@ -68,8 +61,6 @@ def test_get_annots() -> None:
Test get_annots, with a real PDF file Test get_annots, with a real PDF file
get_annots should return a list of three Annotation objects get_annots should return a list of three Annotation objects
""" """
from my.pdfs import get_annots
annotations = get_annots(testdata() / 'pdfs' / 'Information Architecture for the World Wide Web.pdf') annotations = get_annots(testdata() / 'pdfs' / 'Information Architecture for the World Wide Web.pdf')
assert len(annotations) == 3 assert len(annotations) == 3
assert set([a.highlight for a in annotations]) == EXPECTED_HIGHLIGHTS assert set([a.highlight for a in annotations]) == EXPECTED_HIGHLIGHTS
@ -80,12 +71,9 @@ def test_annotated_pdfs_with_filelist() -> None:
Test annotated_pdfs, with a real PDF file Test annotated_pdfs, with a real PDF file
annotated_pdfs should return a list of one Pdf object, with three Annotations annotated_pdfs should return a list of one Pdf object, with three Annotations
""" """
from my.pdfs import annotated_pdfs
filelist = [testdata() / 'pdfs' / 'Information Architecture for the World Wide Web.pdf'] filelist = [testdata() / 'pdfs' / 'Information Architecture for the World Wide Web.pdf']
annotations_generator = annotated_pdfs(filelist=filelist) annotations_generator = annotated_pdfs(filelist=filelist)
import inspect
assert inspect.isgeneratorfunction(annotated_pdfs) assert inspect.isgeneratorfunction(annotated_pdfs)
highlights_from_pdfs = [] highlights_from_pdfs = []