core: add ZipPath encapsulating compressed zip files
this way you don't have to unpack it first and can work as if it's a 'virtual' directory related: https://github.com/karlicoss/HPI/issues/20
This commit is contained in:
parent
444ec1c450
commit
95cd3b9289
3 changed files with 145 additions and 50 deletions
|
@ -1,6 +1,8 @@
|
||||||
"""
|
"""
|
||||||
Various helpers for compression
|
Various helpers for compression
|
||||||
"""
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import pathlib
|
import pathlib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Union, IO
|
from typing import Union, IO
|
||||||
|
@ -107,9 +109,44 @@ open = kopen # TODO deprecate
|
||||||
|
|
||||||
|
|
||||||
# meh
|
# meh
|
||||||
|
# TODO ideally switch to ZipPath or smth similar?
|
||||||
|
# nothing else supports subpath properly anyway
|
||||||
def kexists(path: PathIsh, subpath: str) -> bool:
|
def kexists(path: PathIsh, subpath: str) -> bool:
|
||||||
try:
|
try:
|
||||||
kopen(path, subpath)
|
kopen(path, subpath)
|
||||||
return True
|
return True
|
||||||
except Exception:
|
except Exception:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
import zipfile
|
||||||
|
class ZipPath(zipfile.Path):
|
||||||
|
def absolute(self) -> ZipPath:
|
||||||
|
return ZipPath(Path(self.root.filename).absolute(), self.at)
|
||||||
|
|
||||||
|
def exists(self) -> bool:
|
||||||
|
if self.at == '':
|
||||||
|
# special case, the base class returns False in this case for some reason
|
||||||
|
return Path(self.root.filename).exists()
|
||||||
|
return super().exists()
|
||||||
|
|
||||||
|
def rglob(self, glob: str) -> Sequence[ZipPath]:
|
||||||
|
# note: not 100% sure about the correctness, but seem fine?
|
||||||
|
# Path.match() matches from the right, so need to
|
||||||
|
rpaths = [p for p in self.root.namelist() if p.startswith(self.at)]
|
||||||
|
rpaths = [p for p in rpaths if Path(p).match(glob)]
|
||||||
|
return [ZipPath(self.root, p) for p in rpaths]
|
||||||
|
|
||||||
|
def relative_to(self, other: ZipPath) -> Path:
|
||||||
|
assert self.root == other.root, (self.root, other.root)
|
||||||
|
return Path(self.at).relative_to(Path(other.at))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def __class__(self):
|
||||||
|
return Path
|
||||||
|
|
||||||
|
def __eq__(self, other) -> bool:
|
||||||
|
# hmm, super class doesn't seem to treat as equals unless they are the same object
|
||||||
|
if not isinstance(other, ZipPath):
|
||||||
|
return False
|
||||||
|
return self.root.filename == other.root.filename and self.at == other.at
|
||||||
|
|
106
tests/core/test_kompress.py
Normal file
106
tests/core/test_kompress.py
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
from pathlib import Path
|
||||||
|
import lzma
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
from my.core.kompress import kopen, kexists, CPath
|
||||||
|
|
||||||
|
import pytest # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
structure_data: Path = Path(__file__).parent / "structure_data"
|
||||||
|
|
||||||
|
|
||||||
|
def test_kopen(tmp_path: Path) -> None:
|
||||||
|
"Plaintext handled transparently"
|
||||||
|
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
|
||||||
|
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
|
||||||
|
|
||||||
|
"For zips behaviour is a bit different (not sure about all this, tbh...)"
|
||||||
|
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
|
||||||
|
|
||||||
|
|
||||||
|
# TODO here?
|
||||||
|
def test_kexists(tmp_path: Path) -> None:
|
||||||
|
# TODO also test top level?
|
||||||
|
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
|
||||||
|
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
|
||||||
|
|
||||||
|
# TODO not sure about this?
|
||||||
|
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
|
||||||
|
|
||||||
|
|
||||||
|
def test_cpath(tmp_path: Path) -> None:
|
||||||
|
CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext'
|
||||||
|
CPath( tmp_path / 'file.xz').read_text() == 'compressed text'
|
||||||
|
# TODO not sure about zip files??
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def prepare(tmp_path: Path):
|
||||||
|
(tmp_path / 'file').write_text('just plaintext')
|
||||||
|
with (tmp_path / 'file.xz').open('wb') as f:
|
||||||
|
with lzma.open(f, 'w') as lzf:
|
||||||
|
lzf.write(b'compressed text')
|
||||||
|
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
|
||||||
|
zf.writestr('path/in/archive', 'data in zip')
|
||||||
|
try:
|
||||||
|
yield None
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_zippath() -> None:
|
||||||
|
from my.core.kompress import ZipPath
|
||||||
|
target = structure_data / 'gdpr_export.zip'
|
||||||
|
assert target.exists(), target # precondition
|
||||||
|
|
||||||
|
zp = ZipPath(target)
|
||||||
|
|
||||||
|
# magic! convenient to make third party libraries agnostic of ZipPath
|
||||||
|
assert isinstance(zp, Path)
|
||||||
|
# FIXME maybe change str? since it's a bit misleading...
|
||||||
|
# Path('/code/hpi/tests/core/structure_data/gdpr_export.zip', 'gdpr_export/')
|
||||||
|
|
||||||
|
assert ZipPath(target) == ZipPath(target)
|
||||||
|
assert zp.absolute() == zp
|
||||||
|
|
||||||
|
assert zp.exists()
|
||||||
|
assert (zp / 'gdpr_export/comments').exists()
|
||||||
|
# check str constructor just in case
|
||||||
|
assert (ZipPath(str(target)) / 'gdpr_export/comments').exists()
|
||||||
|
|
||||||
|
matched = list(zp.rglob('*'))
|
||||||
|
assert len(matched) > 0
|
||||||
|
assert all(p.root.filename == str(target) for p in matched), matched
|
||||||
|
|
||||||
|
rpaths = [str(p.relative_to(zp)) for p in matched]
|
||||||
|
assert rpaths == [
|
||||||
|
'gdpr_export',
|
||||||
|
'gdpr_export/comments',
|
||||||
|
'gdpr_export/comments/comments.json',
|
||||||
|
'gdpr_export/profile',
|
||||||
|
'gdpr_export/profile/settings.json',
|
||||||
|
'gdpr_export/messages',
|
||||||
|
'gdpr_export/messages/index.csv',
|
||||||
|
], rpaths
|
||||||
|
|
||||||
|
|
||||||
|
# TODO hmm this doesn't work atm, although Path does
|
||||||
|
# not sure if it should be defensive or something...
|
||||||
|
# ZipPath('doesnotexist')
|
||||||
|
# same for this one
|
||||||
|
# assert ZipPath(Path('test'), 'whatever').absolute() == ZipPath(Path('test').absolute(), 'whatever')
|
||||||
|
#
|
||||||
|
# FIXME vvv this should really work...
|
||||||
|
# assert (ZipPath(target) / 'gdpr_export/comments').exists()
|
||||||
|
# assert ZipPath(target, 'gdpr_export/comments').exists()
|
||||||
|
|
||||||
|
jsons = [str(p.relative_to(zp / 'gdpr_export')) for p in zp.rglob('*.json')]
|
||||||
|
assert jsons == [
|
||||||
|
'comments/comments.json',
|
||||||
|
'profile/settings.json',
|
||||||
|
]
|
||||||
|
|
||||||
|
# FIXME uhh.. this doesn't work? without slash probably should...
|
||||||
|
# assert list(zp.rglob('mes*')) == [ZipPath(target, 'gdpr_export/messages')]
|
||||||
|
assert list(zp.rglob('mes*')) == [ZipPath(target, 'gdpr_export/messages/')]
|
|
@ -1,56 +1,8 @@
|
||||||
from pathlib import Path
|
|
||||||
from subprocess import check_call
|
|
||||||
import gzip
|
|
||||||
import lzma
|
|
||||||
import io
|
|
||||||
import zipfile
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from my.core.kompress import kopen, kexists, CPath
|
|
||||||
|
|
||||||
|
|
||||||
def test_kopen(tmp_path: Path) -> None:
|
|
||||||
"Plaintext handled transparently"
|
|
||||||
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
|
|
||||||
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
|
|
||||||
|
|
||||||
"For zips behaviour is a bit different (not sure about all this, tbh...)"
|
|
||||||
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
|
|
||||||
|
|
||||||
|
|
||||||
def test_kexists(tmp_path: Path) -> None:
|
|
||||||
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
|
|
||||||
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
|
|
||||||
|
|
||||||
# TODO not sure about this?
|
|
||||||
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
|
|
||||||
|
|
||||||
|
|
||||||
def test_cpath(tmp_path: Path) -> None:
|
|
||||||
CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext'
|
|
||||||
CPath( tmp_path / 'file.xz').read_text() == 'compressed text'
|
|
||||||
# TODO not sure about zip files??
|
|
||||||
|
|
||||||
|
|
||||||
import pytest # type: ignore
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def prepare(tmp_path: Path):
|
|
||||||
(tmp_path / 'file').write_text('just plaintext')
|
|
||||||
with (tmp_path / 'file.xz').open('wb') as f:
|
|
||||||
with lzma.open(f, 'w') as lzf:
|
|
||||||
lzf.write(b'compressed text')
|
|
||||||
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
|
|
||||||
zf.writestr('path/in/archive', 'data in zip')
|
|
||||||
try:
|
|
||||||
yield None
|
|
||||||
finally:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
from typing import Iterable, List
|
from typing import Iterable, List
|
||||||
import warnings
|
import warnings
|
||||||
from my.core import warn_if_empty
|
from my.core import warn_if_empty
|
||||||
|
|
||||||
|
|
||||||
def test_warn_if_empty() -> None:
|
def test_warn_if_empty() -> None:
|
||||||
@warn_if_empty
|
@warn_if_empty
|
||||||
def nonempty() -> Iterable[str]:
|
def nonempty() -> Iterable[str]:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue