core: add ZipPath encapsulating compressed zip files (#227)

* core: add ZipPath encapsulating compressed zip files

this way you don't have to unpack it first and can work as if it's a 'virtual' directory

related: https://github.com/karlicoss/HPI/issues/20
This commit is contained in:
karlicoss 2022-04-14 10:06:13 +01:00 committed by GitHub
parent 444ec1c450
commit 7c0f304f94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 51 deletions

View file

@ -1,9 +1,12 @@
""" """
Various helpers for compression Various helpers for compression
""" """
from __future__ import annotations
import pathlib import pathlib
from pathlib import Path from pathlib import Path
from typing import Union, IO import sys
from typing import Union, IO, Sequence, Any
import io import io
PathIsh = Union[Path, str] PathIsh = Union[Path, str]
@ -107,9 +110,66 @@ open = kopen # TODO deprecate
# meh # meh
# TODO ideally switch to ZipPath or smth similar?
# nothing else supports subpath properly anyway
def kexists(path: PathIsh, subpath: str) -> bool: def kexists(path: PathIsh, subpath: str) -> bool:
try: try:
kopen(path, subpath) kopen(path, subpath)
return True return True
except Exception: except Exception:
return False return False
import zipfile
if sys.version_info[:2] >= (3, 8):
# meh... zipfile.Path is not available on 3.7
ZipPathBase = zipfile.Path
else:
if typing.TYPE_CHECKING:
ZipPathBase = Any
else:
ZipPathBase = object
class ZipPath(ZipPathBase):
# NOTE: is_dir/is_file might not behave as expected, the base class checks it only based on the slash in path
# seems that at/root are not exposed in the docs, so might be an implementation detail
at: str
root: zipfile.ZipFile
@property
def filename(self) -> str:
res = self.root.filename
assert res is not None # make mypy happy
return res
def absolute(self) -> ZipPath:
return ZipPath(Path(self.filename).absolute(), self.at)
def exists(self) -> bool:
if self.at == '':
# special case, the base class returns False in this case for some reason
return Path(self.filename).exists()
return super().exists()
def rglob(self, glob: str) -> Sequence[ZipPath]:
# note: not 100% sure about the correctness, but seem fine?
# Path.match() matches from the right, so need to
rpaths = [p for p in self.root.namelist() if p.startswith(self.at)]
rpaths = [p for p in rpaths if Path(p).match(glob)]
return [ZipPath(self.root, p) for p in rpaths]
def relative_to(self, other: ZipPath) -> Path:
assert self.root == other.root, (self.root, other.root)
return Path(self.at).relative_to(Path(other.at))
@property # type: ignore[misc]
def __class__(self):
return Path
def __eq__(self, other) -> bool:
# hmm, super class doesn't seem to treat as equals unless they are the same object
if not isinstance(other, ZipPath):
return False
return self.filename == other.filename and Path(self.at) == Path(other.at)

108
tests/core/test_kompress.py Normal file
View file

@ -0,0 +1,108 @@
import lzma
from pathlib import Path
import sys
import zipfile
from my.core.kompress import kopen, kexists, CPath
import pytest # type: ignore
structure_data: Path = Path(__file__).parent / "structure_data"
def test_kopen(tmp_path: Path) -> None:
"Plaintext handled transparently"
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
"For zips behaviour is a bit different (not sure about all this, tbh...)"
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
# TODO here?
def test_kexists(tmp_path: Path) -> None:
# TODO also test top level?
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
# TODO not sure about this?
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
def test_cpath(tmp_path: Path) -> None:
CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext'
CPath( tmp_path / 'file.xz').read_text() == 'compressed text'
# TODO not sure about zip files??
@pytest.fixture(autouse=True)
def prepare(tmp_path: Path):
(tmp_path / 'file').write_text('just plaintext')
with (tmp_path / 'file.xz').open('wb') as f:
with lzma.open(f, 'w') as lzf:
lzf.write(b'compressed text')
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
zf.writestr('path/in/archive', 'data in zip')
try:
yield None
finally:
pass
@pytest.mark.skipif(
sys.version_info[:2] < (3, 8),
reason=f"ZipFile.Path is only available since 3.8",
)
def test_zippath() -> None:
from my.core.kompress import ZipPath
target = structure_data / 'gdpr_export.zip'
assert target.exists(), target # precondition
zp = ZipPath(target)
# magic! convenient to make third party libraries agnostic of ZipPath
assert isinstance(zp, Path)
# TODO maybe change __str__/__repr__? since it's a bit misleading:
# Path('/code/hpi/tests/core/structure_data/gdpr_export.zip', 'gdpr_export/')
assert ZipPath(target) == ZipPath(target)
assert zp.absolute() == zp
assert zp.exists()
assert (zp / 'gdpr_export/comments').exists()
# check str constructor just in case
assert (ZipPath(str(target)) / 'gdpr_export/comments').exists()
assert not (ZipPath(str(target)) / 'whatever').exists()
matched = list(zp.rglob('*'))
assert len(matched) > 0
assert all(p.filename == str(target) for p in matched), matched
rpaths = [str(p.relative_to(zp)) for p in matched]
assert rpaths == [
'gdpr_export',
'gdpr_export/comments',
'gdpr_export/comments/comments.json',
'gdpr_export/profile',
'gdpr_export/profile/settings.json',
'gdpr_export/messages',
'gdpr_export/messages/index.csv',
], rpaths
# TODO hmm this doesn't work atm, wheras Path does
# not sure if it should be defensive or something...
# ZipPath('doesnotexist')
# same for this one
# assert ZipPath(Path('test'), 'whatever').absolute() == ZipPath(Path('test').absolute(), 'whatever')
assert (ZipPath(target) / 'gdpr_export/comments').exists()
jsons = [str(p.relative_to(zp / 'gdpr_export')) for p in zp.rglob('*.json')]
assert jsons == [
'comments/comments.json',
'profile/settings.json',
]
assert list(zp.rglob('mes*')) == [ZipPath(target, 'gdpr_export/messages')]

View file

@ -1,56 +1,8 @@
from pathlib import Path
from subprocess import check_call
import gzip
import lzma
import io
import zipfile
from typing import List
from my.core.kompress import kopen, kexists, CPath
def test_kopen(tmp_path: Path) -> None:
"Plaintext handled transparently"
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
"For zips behaviour is a bit different (not sure about all this, tbh...)"
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
def test_kexists(tmp_path: Path) -> None:
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
# TODO not sure about this?
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
def test_cpath(tmp_path: Path) -> None:
CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext'
CPath( tmp_path / 'file.xz').read_text() == 'compressed text'
# TODO not sure about zip files??
import pytest # type: ignore
@pytest.fixture(autouse=True)
def prepare(tmp_path: Path):
(tmp_path / 'file').write_text('just plaintext')
with (tmp_path / 'file.xz').open('wb') as f:
with lzma.open(f, 'w') as lzf:
lzf.write(b'compressed text')
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
zf.writestr('path/in/archive', 'data in zip')
try:
yield None
finally:
pass
from typing import Iterable, List from typing import Iterable, List
import warnings import warnings
from my.core import warn_if_empty from my.core import warn_if_empty
def test_warn_if_empty() -> None: def test_warn_if_empty() -> None:
@warn_if_empty @warn_if_empty
def nonempty() -> Iterable[str]: def nonempty() -> Iterable[str]: