diff --git a/my/core/kompress.py b/my/core/kompress.py index a9a8576..2cf1db5 100644 --- a/my/core/kompress.py +++ b/my/core/kompress.py @@ -1,9 +1,12 @@ """ Various helpers for compression """ +from __future__ import annotations + import pathlib from pathlib import Path -from typing import Union, IO +import sys +from typing import Union, IO, Sequence, Any import io PathIsh = Union[Path, str] @@ -107,9 +110,66 @@ open = kopen # TODO deprecate # meh +# TODO ideally switch to ZipPath or smth similar? +# nothing else supports subpath properly anyway def kexists(path: PathIsh, subpath: str) -> bool: try: kopen(path, subpath) return True except Exception: return False + + +import zipfile +if sys.version_info[:2] >= (3, 8): + # meh... zipfile.Path is not available on 3.7 + ZipPathBase = zipfile.Path +else: + if typing.TYPE_CHECKING: + ZipPathBase = Any + else: + ZipPathBase = object + + +class ZipPath(ZipPathBase): + # NOTE: is_dir/is_file might not behave as expected, the base class checks it only based on the slash in path + + # seems that at/root are not exposed in the docs, so might be an implementation detail + at: str + root: zipfile.ZipFile + + @property + def filename(self) -> str: + res = self.root.filename + assert res is not None # make mypy happy + return res + + def absolute(self) -> ZipPath: + return ZipPath(Path(self.filename).absolute(), self.at) + + def exists(self) -> bool: + if self.at == '': + # special case, the base class returns False in this case for some reason + return Path(self.filename).exists() + return super().exists() + + def rglob(self, glob: str) -> Sequence[ZipPath]: + # note: not 100% sure about the correctness, but seem fine? + # Path.match() matches from the right, so need to + rpaths = [p for p in self.root.namelist() if p.startswith(self.at)] + rpaths = [p for p in rpaths if Path(p).match(glob)] + return [ZipPath(self.root, p) for p in rpaths] + + def relative_to(self, other: ZipPath) -> Path: + assert self.root == other.root, (self.root, other.root) + return Path(self.at).relative_to(Path(other.at)) + + @property # type: ignore[misc] + def __class__(self): + return Path + + def __eq__(self, other) -> bool: + # hmm, super class doesn't seem to treat as equals unless they are the same object + if not isinstance(other, ZipPath): + return False + return self.filename == other.filename and Path(self.at) == Path(other.at) diff --git a/tests/core/test_kompress.py b/tests/core/test_kompress.py new file mode 100644 index 0000000..3561444 --- /dev/null +++ b/tests/core/test_kompress.py @@ -0,0 +1,108 @@ +import lzma +from pathlib import Path +import sys +import zipfile + +from my.core.kompress import kopen, kexists, CPath + +import pytest # type: ignore + + +structure_data: Path = Path(__file__).parent / "structure_data" + + +def test_kopen(tmp_path: Path) -> None: + "Plaintext handled transparently" + assert kopen(tmp_path / 'file' ).read() == 'just plaintext' + assert kopen(tmp_path / 'file.xz').read() == 'compressed text' + + "For zips behaviour is a bit different (not sure about all this, tbh...)" + assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip' + + +# TODO here? +def test_kexists(tmp_path: Path) -> None: + # TODO also test top level? + assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive') + assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive') + + # TODO not sure about this? + assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive') + + +def test_cpath(tmp_path: Path) -> None: + CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext' + CPath( tmp_path / 'file.xz').read_text() == 'compressed text' + # TODO not sure about zip files?? + + +@pytest.fixture(autouse=True) +def prepare(tmp_path: Path): + (tmp_path / 'file').write_text('just plaintext') + with (tmp_path / 'file.xz').open('wb') as f: + with lzma.open(f, 'w') as lzf: + lzf.write(b'compressed text') + with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf: + zf.writestr('path/in/archive', 'data in zip') + try: + yield None + finally: + pass + + +@pytest.mark.skipif( + sys.version_info[:2] < (3, 8), + reason=f"ZipFile.Path is only available since 3.8", +) +def test_zippath() -> None: + from my.core.kompress import ZipPath + target = structure_data / 'gdpr_export.zip' + assert target.exists(), target # precondition + + zp = ZipPath(target) + + # magic! convenient to make third party libraries agnostic of ZipPath + assert isinstance(zp, Path) + # TODO maybe change __str__/__repr__? since it's a bit misleading: + # Path('/code/hpi/tests/core/structure_data/gdpr_export.zip', 'gdpr_export/') + + assert ZipPath(target) == ZipPath(target) + assert zp.absolute() == zp + + assert zp.exists() + assert (zp / 'gdpr_export/comments').exists() + # check str constructor just in case + assert (ZipPath(str(target)) / 'gdpr_export/comments').exists() + assert not (ZipPath(str(target)) / 'whatever').exists() + + matched = list(zp.rglob('*')) + assert len(matched) > 0 + assert all(p.filename == str(target) for p in matched), matched + + rpaths = [str(p.relative_to(zp)) for p in matched] + assert rpaths == [ + 'gdpr_export', + 'gdpr_export/comments', + 'gdpr_export/comments/comments.json', + 'gdpr_export/profile', + 'gdpr_export/profile/settings.json', + 'gdpr_export/messages', + 'gdpr_export/messages/index.csv', + ], rpaths + + + # TODO hmm this doesn't work atm, wheras Path does + # not sure if it should be defensive or something... + # ZipPath('doesnotexist') + # same for this one + # assert ZipPath(Path('test'), 'whatever').absolute() == ZipPath(Path('test').absolute(), 'whatever') + + assert (ZipPath(target) / 'gdpr_export/comments').exists() + + jsons = [str(p.relative_to(zp / 'gdpr_export')) for p in zp.rglob('*.json')] + assert jsons == [ + 'comments/comments.json', + 'profile/settings.json', + ] + + assert list(zp.rglob('mes*')) == [ZipPath(target, 'gdpr_export/messages')] diff --git a/tests/misc.py b/tests/misc.py index ea41835..7e666d7 100644 --- a/tests/misc.py +++ b/tests/misc.py @@ -1,56 +1,8 @@ -from pathlib import Path -from subprocess import check_call -import gzip -import lzma -import io -import zipfile -from typing import List - -from my.core.kompress import kopen, kexists, CPath - - -def test_kopen(tmp_path: Path) -> None: - "Plaintext handled transparently" - assert kopen(tmp_path / 'file' ).read() == 'just plaintext' - assert kopen(tmp_path / 'file.xz').read() == 'compressed text' - - "For zips behaviour is a bit different (not sure about all this, tbh...)" - assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip' - - -def test_kexists(tmp_path: Path) -> None: - assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive') - assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive') - - # TODO not sure about this? - assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive') - - -def test_cpath(tmp_path: Path) -> None: - CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext' - CPath( tmp_path / 'file.xz').read_text() == 'compressed text' - # TODO not sure about zip files?? - - -import pytest # type: ignore - -@pytest.fixture(autouse=True) -def prepare(tmp_path: Path): - (tmp_path / 'file').write_text('just plaintext') - with (tmp_path / 'file.xz').open('wb') as f: - with lzma.open(f, 'w') as lzf: - lzf.write(b'compressed text') - with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf: - zf.writestr('path/in/archive', 'data in zip') - try: - yield None - finally: - pass - - from typing import Iterable, List import warnings from my.core import warn_if_empty + + def test_warn_if_empty() -> None: @warn_if_empty def nonempty() -> Iterable[str]: