core/kompress: move vendorized to _deprecated, use kompress library directly
This commit is contained in:
parent
bb478f369d
commit
fe26efaea8
6 changed files with 283 additions and 385 deletions
260
my/core/_deprecated/kompress.py
Normal file
260
my/core/_deprecated/kompress.py
Normal file
|
@ -0,0 +1,260 @@
|
||||||
|
"""
|
||||||
|
Various helpers for compression
|
||||||
|
"""
|
||||||
|
# fmt: off
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from functools import total_ordering
|
||||||
|
import io
|
||||||
|
import pathlib
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
from typing import Union, IO, Sequence, Any, Iterator
|
||||||
|
|
||||||
|
PathIsh = Union[Path, str]
|
||||||
|
|
||||||
|
|
||||||
|
class Ext:
|
||||||
|
xz = '.xz'
|
||||||
|
zip = '.zip'
|
||||||
|
lz4 = '.lz4'
|
||||||
|
zstd = '.zstd'
|
||||||
|
zst = '.zst'
|
||||||
|
targz = '.tar.gz'
|
||||||
|
|
||||||
|
|
||||||
|
def is_compressed(p: Path) -> bool:
|
||||||
|
# todo kinda lame way for now.. use mime ideally?
|
||||||
|
# should cooperate with kompress.kopen?
|
||||||
|
return any(p.name.endswith(ext) for ext in {Ext.xz, Ext.zip, Ext.lz4, Ext.zstd, Ext.zst, Ext.targz})
|
||||||
|
|
||||||
|
|
||||||
|
def _zstd_open(path: Path, *args, **kwargs) -> IO:
|
||||||
|
import zstandard as zstd # type: ignore
|
||||||
|
fh = path.open('rb')
|
||||||
|
dctx = zstd.ZstdDecompressor()
|
||||||
|
reader = dctx.stream_reader(fh)
|
||||||
|
|
||||||
|
mode = kwargs.get('mode', 'rt')
|
||||||
|
if mode == 'rb':
|
||||||
|
return reader
|
||||||
|
else:
|
||||||
|
# must be text mode
|
||||||
|
kwargs.pop('mode') # TextIOWrapper doesn't like it
|
||||||
|
return io.TextIOWrapper(reader, **kwargs) # meh
|
||||||
|
|
||||||
|
|
||||||
|
# TODO use the 'dependent type' trick for return type?
|
||||||
|
def kopen(path: PathIsh, *args, mode: str='rt', **kwargs) -> IO:
|
||||||
|
# just in case, but I think this shouldn't be necessary anymore
|
||||||
|
# since when we call .read_text, encoding is passed already
|
||||||
|
if mode in {'r', 'rt'}:
|
||||||
|
encoding = kwargs.get('encoding', 'utf8')
|
||||||
|
else:
|
||||||
|
encoding = None
|
||||||
|
kwargs['encoding'] = encoding
|
||||||
|
|
||||||
|
pp = Path(path)
|
||||||
|
name = pp.name
|
||||||
|
if name.endswith(Ext.xz):
|
||||||
|
import lzma
|
||||||
|
|
||||||
|
# ugh. for lzma, 'r' means 'rb'
|
||||||
|
# https://github.com/python/cpython/blob/d01cf5072be5511595b6d0c35ace6c1b07716f8d/Lib/lzma.py#L97
|
||||||
|
# whereas for regular open, 'r' means 'rt'
|
||||||
|
# https://docs.python.org/3/library/functions.html#open
|
||||||
|
if mode == 'r':
|
||||||
|
mode = 'rt'
|
||||||
|
kwargs['mode'] = mode
|
||||||
|
return lzma.open(pp, *args, **kwargs)
|
||||||
|
elif name.endswith(Ext.zip):
|
||||||
|
# eh. this behaviour is a bit dodgy...
|
||||||
|
from zipfile import ZipFile
|
||||||
|
zfile = ZipFile(pp)
|
||||||
|
|
||||||
|
[subpath] = args # meh?
|
||||||
|
|
||||||
|
## oh god... https://stackoverflow.com/a/5639960/706389
|
||||||
|
ifile = zfile.open(subpath, mode='r')
|
||||||
|
ifile.readable = lambda: True # type: ignore
|
||||||
|
ifile.writable = lambda: False # type: ignore
|
||||||
|
ifile.seekable = lambda: False # type: ignore
|
||||||
|
ifile.read1 = ifile.read # type: ignore
|
||||||
|
# TODO pass all kwargs here??
|
||||||
|
# todo 'expected "BinaryIO"'??
|
||||||
|
return io.TextIOWrapper(ifile, encoding=encoding)
|
||||||
|
elif name.endswith(Ext.lz4):
|
||||||
|
import lz4.frame # type: ignore
|
||||||
|
return lz4.frame.open(str(pp), mode, *args, **kwargs)
|
||||||
|
elif name.endswith(Ext.zstd) or name.endswith(Ext.zst):
|
||||||
|
kwargs['mode'] = mode
|
||||||
|
return _zstd_open(pp, *args, **kwargs)
|
||||||
|
elif name.endswith(Ext.targz):
|
||||||
|
import tarfile
|
||||||
|
# FIXME pass mode?
|
||||||
|
tf = tarfile.open(pp)
|
||||||
|
# TODO pass encoding?
|
||||||
|
x = tf.extractfile(*args); assert x is not None
|
||||||
|
return x
|
||||||
|
else:
|
||||||
|
return pp.open(mode, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
import typing
|
||||||
|
import os
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
# otherwise mypy can't figure out that BasePath is a type alias..
|
||||||
|
BasePath = pathlib.Path
|
||||||
|
else:
|
||||||
|
BasePath = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath
|
||||||
|
|
||||||
|
|
||||||
|
class CPath(BasePath):
|
||||||
|
"""
|
||||||
|
Hacky way to support compressed files.
|
||||||
|
If you can think of a better way to do this, please let me know! https://github.com/karlicoss/HPI/issues/20
|
||||||
|
|
||||||
|
Ugh. So, can't override Path because of some _flavour thing.
|
||||||
|
Path only has _accessor and _closed slots, so can't directly set .open method
|
||||||
|
_accessor.open has to return file descriptor, doesn't work for compressed stuff.
|
||||||
|
"""
|
||||||
|
def open(self, *args, **kwargs):
|
||||||
|
kopen_kwargs = {}
|
||||||
|
mode = kwargs.get('mode')
|
||||||
|
if mode is not None:
|
||||||
|
kopen_kwargs['mode'] = mode
|
||||||
|
encoding = kwargs.get('encoding')
|
||||||
|
if encoding is not None:
|
||||||
|
kopen_kwargs['encoding'] = encoding
|
||||||
|
# TODO assert read only?
|
||||||
|
return kopen(str(self), **kopen_kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
open = kopen # TODO deprecate
|
||||||
|
|
||||||
|
|
||||||
|
# meh
|
||||||
|
# TODO ideally switch to ZipPath or smth similar?
|
||||||
|
# nothing else supports subpath properly anyway
|
||||||
|
def kexists(path: PathIsh, subpath: str) -> bool:
|
||||||
|
try:
|
||||||
|
kopen(path, subpath)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
import zipfile
|
||||||
|
if sys.version_info[:2] >= (3, 8):
|
||||||
|
# meh... zipfile.Path is not available on 3.7
|
||||||
|
zipfile_Path = zipfile.Path
|
||||||
|
else:
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
zipfile_Path = Any
|
||||||
|
else:
|
||||||
|
zipfile_Path = object
|
||||||
|
|
||||||
|
|
||||||
|
@total_ordering
|
||||||
|
class ZipPath(zipfile_Path):
|
||||||
|
# NOTE: is_dir/is_file might not behave as expected, the base class checks it only based on the slash in path
|
||||||
|
|
||||||
|
# seems that root/at are not exposed in the docs, so might be an implementation detail
|
||||||
|
root: zipfile.ZipFile
|
||||||
|
at: str
|
||||||
|
|
||||||
|
@property
|
||||||
|
def filepath(self) -> Path:
|
||||||
|
res = self.root.filename
|
||||||
|
assert res is not None # make mypy happy
|
||||||
|
return Path(res)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def subpath(self) -> Path:
|
||||||
|
return Path(self.at)
|
||||||
|
|
||||||
|
def absolute(self) -> ZipPath:
|
||||||
|
return ZipPath(self.filepath.absolute(), self.at)
|
||||||
|
|
||||||
|
def expanduser(self) -> ZipPath:
|
||||||
|
return ZipPath(self.filepath.expanduser(), self.at)
|
||||||
|
|
||||||
|
def exists(self) -> bool:
|
||||||
|
if self.at == '':
|
||||||
|
# special case, the base class returns False in this case for some reason
|
||||||
|
return self.filepath.exists()
|
||||||
|
return super().exists() or self._as_dir().exists()
|
||||||
|
|
||||||
|
def _as_dir(self) -> zipfile_Path:
|
||||||
|
# note: seems that zip always uses forward slash, regardless OS?
|
||||||
|
return zipfile_Path(self.root, self.at + '/')
|
||||||
|
|
||||||
|
def rglob(self, glob: str) -> Sequence[ZipPath]:
|
||||||
|
# note: not 100% sure about the correctness, but seem fine?
|
||||||
|
# Path.match() matches from the right, so need to
|
||||||
|
rpaths = [p for p in self.root.namelist() if p.startswith(self.at)]
|
||||||
|
rpaths = [p for p in rpaths if Path(p).match(glob)]
|
||||||
|
return [ZipPath(self.root, p) for p in rpaths]
|
||||||
|
|
||||||
|
def relative_to(self, other: ZipPath) -> Path:
|
||||||
|
assert self.filepath == other.filepath, (self.filepath, other.filepath)
|
||||||
|
return self.subpath.relative_to(other.subpath)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def parts(self) -> Sequence[str]:
|
||||||
|
# messy, but might be ok..
|
||||||
|
return self.filepath.parts + self.subpath.parts
|
||||||
|
|
||||||
|
def __truediv__(self, key) -> ZipPath:
|
||||||
|
# need to implement it so the return type is not zipfile.Path
|
||||||
|
tmp = zipfile_Path(self.root) / self.at / key
|
||||||
|
return ZipPath(self.root, tmp.at)
|
||||||
|
|
||||||
|
def iterdir(self) -> Iterator[ZipPath]:
|
||||||
|
for s in self._as_dir().iterdir():
|
||||||
|
yield ZipPath(s.root, s.at) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stem(self) -> str:
|
||||||
|
return self.subpath.stem
|
||||||
|
|
||||||
|
@property # type: ignore[misc]
|
||||||
|
def __class__(self):
|
||||||
|
return Path
|
||||||
|
|
||||||
|
def __eq__(self, other) -> bool:
|
||||||
|
# hmm, super class doesn't seem to treat as equals unless they are the same object
|
||||||
|
if not isinstance(other, ZipPath):
|
||||||
|
return False
|
||||||
|
return (self.filepath, self.subpath) == (other.filepath, other.subpath)
|
||||||
|
|
||||||
|
def __lt__(self, other) -> bool:
|
||||||
|
if not isinstance(other, ZipPath):
|
||||||
|
return False
|
||||||
|
return (self.filepath, self.subpath) < (other.filepath, other.subpath)
|
||||||
|
|
||||||
|
def __hash__(self) -> int:
|
||||||
|
return hash((self.filepath, self.subpath))
|
||||||
|
|
||||||
|
def stat(self) -> os.stat_result:
|
||||||
|
# NOTE: zip datetimes have no notion of time zone, usually they just keep local time?
|
||||||
|
# see https://en.wikipedia.org/wiki/ZIP_(file_format)#Structure
|
||||||
|
dt = datetime(*self.root.getinfo(self.at).date_time)
|
||||||
|
ts = int(dt.timestamp())
|
||||||
|
params = dict(
|
||||||
|
st_mode=0,
|
||||||
|
st_ino=0,
|
||||||
|
st_dev=0,
|
||||||
|
st_nlink=1,
|
||||||
|
st_uid=1000,
|
||||||
|
st_gid=1000,
|
||||||
|
st_size=0, # todo compute it properly?
|
||||||
|
st_atime=ts,
|
||||||
|
st_mtime=ts,
|
||||||
|
st_ctime=ts,
|
||||||
|
)
|
||||||
|
return os.stat_result(tuple(params.values()))
|
||||||
|
|
||||||
|
# fmt: on
|
|
@ -162,7 +162,7 @@ Paths = Union[Sequence[PathIsh], PathIsh]
|
||||||
|
|
||||||
|
|
||||||
def _is_zippath(p: Path) -> bool:
|
def _is_zippath(p: Path) -> bool:
|
||||||
# weak type check here, don't want to depend on .kompress module in get_files
|
# weak type check here, don't want to depend on kompress library in get_files
|
||||||
return type(p).__name__ == 'ZipPath'
|
return type(p).__name__ == 'ZipPath'
|
||||||
|
|
||||||
|
|
||||||
|
@ -234,8 +234,8 @@ def get_files(
|
||||||
traceback.print_stack()
|
traceback.print_stack()
|
||||||
|
|
||||||
if guess_compression:
|
if guess_compression:
|
||||||
from .kompress import CPath, is_compressed
|
from kompress import CPath, is_compressed
|
||||||
paths = [CPath(p) if is_compressed(p) and not _is_zippath(p) else p for p in paths]
|
paths = [CPath(p) if is_compressed(p) and not _is_zippath(p) else p for p in paths] # TODO fwtf is going on here?... make sure it's tested
|
||||||
return tuple(paths)
|
return tuple(paths)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,257 +1,17 @@
|
||||||
"""
|
from .common import assert_subpackage; assert_subpackage(__name__)
|
||||||
Various helpers for compression
|
from . import warnings
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from datetime import datetime
|
# do this later -- for now need to transition modules to avoid using kompress directly (e.g. ZipPath)
|
||||||
from functools import total_ordering
|
# warnings.high('my.core.kompress is deprecated, please use "kompress" library directly. See https://github.com/karlicoss/kompress')
|
||||||
import io
|
|
||||||
import pathlib
|
|
||||||
from pathlib import Path
|
|
||||||
import sys
|
|
||||||
from typing import Union, IO, Sequence, Any, Iterator
|
|
||||||
|
|
||||||
PathIsh = Union[Path, str]
|
try:
|
||||||
|
from kompress import *
|
||||||
|
except ModuleNotFoundError as e:
|
||||||
class Ext:
|
if e.name == 'kompress':
|
||||||
xz = '.xz'
|
warnings.high('Please install kompress (pip3 install kompress), it will be required in the future. Falling onto vendorized kompress for now.')
|
||||||
zip = '.zip'
|
from ._deprecated.kompress import * # type: ignore[assignment]
|
||||||
lz4 = '.lz4'
|
|
||||||
zstd = '.zstd'
|
|
||||||
zst = '.zst'
|
|
||||||
targz = '.tar.gz'
|
|
||||||
|
|
||||||
|
|
||||||
def is_compressed(p: Path) -> bool:
|
|
||||||
# todo kinda lame way for now.. use mime ideally?
|
|
||||||
# should cooperate with kompress.kopen?
|
|
||||||
return any(p.name.endswith(ext) for ext in {Ext.xz, Ext.zip, Ext.lz4, Ext.zstd, Ext.zst, Ext.targz})
|
|
||||||
|
|
||||||
|
|
||||||
def _zstd_open(path: Path, *args, **kwargs) -> IO:
|
|
||||||
import zstandard as zstd # type: ignore
|
|
||||||
fh = path.open('rb')
|
|
||||||
dctx = zstd.ZstdDecompressor()
|
|
||||||
reader = dctx.stream_reader(fh)
|
|
||||||
|
|
||||||
mode = kwargs.get('mode', 'rt')
|
|
||||||
if mode == 'rb':
|
|
||||||
return reader
|
|
||||||
else:
|
else:
|
||||||
# must be text mode
|
raise e
|
||||||
kwargs.pop('mode') # TextIOWrapper doesn't like it
|
|
||||||
return io.TextIOWrapper(reader, **kwargs) # meh
|
|
||||||
|
|
||||||
|
# this is deprecated in compress, keep here for backwards compatibility
|
||||||
# TODO use the 'dependent type' trick for return type?
|
open = kopen # noqa: F405
|
||||||
def kopen(path: PathIsh, *args, mode: str='rt', **kwargs) -> IO:
|
|
||||||
# just in case, but I think this shouldn't be necessary anymore
|
|
||||||
# since when we call .read_text, encoding is passed already
|
|
||||||
if mode in {'r', 'rt'}:
|
|
||||||
encoding = kwargs.get('encoding', 'utf8')
|
|
||||||
else:
|
|
||||||
encoding = None
|
|
||||||
kwargs['encoding'] = encoding
|
|
||||||
|
|
||||||
pp = Path(path)
|
|
||||||
name = pp.name
|
|
||||||
if name.endswith(Ext.xz):
|
|
||||||
import lzma
|
|
||||||
|
|
||||||
# ugh. for lzma, 'r' means 'rb'
|
|
||||||
# https://github.com/python/cpython/blob/d01cf5072be5511595b6d0c35ace6c1b07716f8d/Lib/lzma.py#L97
|
|
||||||
# whereas for regular open, 'r' means 'rt'
|
|
||||||
# https://docs.python.org/3/library/functions.html#open
|
|
||||||
if mode == 'r':
|
|
||||||
mode = 'rt'
|
|
||||||
kwargs['mode'] = mode
|
|
||||||
return lzma.open(pp, *args, **kwargs)
|
|
||||||
elif name.endswith(Ext.zip):
|
|
||||||
# eh. this behaviour is a bit dodgy...
|
|
||||||
from zipfile import ZipFile
|
|
||||||
zfile = ZipFile(pp)
|
|
||||||
|
|
||||||
[subpath] = args # meh?
|
|
||||||
|
|
||||||
## oh god... https://stackoverflow.com/a/5639960/706389
|
|
||||||
ifile = zfile.open(subpath, mode='r')
|
|
||||||
ifile.readable = lambda: True # type: ignore
|
|
||||||
ifile.writable = lambda: False # type: ignore
|
|
||||||
ifile.seekable = lambda: False # type: ignore
|
|
||||||
ifile.read1 = ifile.read # type: ignore
|
|
||||||
# TODO pass all kwargs here??
|
|
||||||
# todo 'expected "BinaryIO"'??
|
|
||||||
return io.TextIOWrapper(ifile, encoding=encoding)
|
|
||||||
elif name.endswith(Ext.lz4):
|
|
||||||
import lz4.frame # type: ignore
|
|
||||||
return lz4.frame.open(str(pp), mode, *args, **kwargs)
|
|
||||||
elif name.endswith(Ext.zstd) or name.endswith(Ext.zst):
|
|
||||||
kwargs['mode'] = mode
|
|
||||||
return _zstd_open(pp, *args, **kwargs)
|
|
||||||
elif name.endswith(Ext.targz):
|
|
||||||
import tarfile
|
|
||||||
# FIXME pass mode?
|
|
||||||
tf = tarfile.open(pp)
|
|
||||||
# TODO pass encoding?
|
|
||||||
x = tf.extractfile(*args); assert x is not None
|
|
||||||
return x
|
|
||||||
else:
|
|
||||||
return pp.open(mode, *args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
import typing
|
|
||||||
import os
|
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
|
||||||
# otherwise mypy can't figure out that BasePath is a type alias..
|
|
||||||
BasePath = pathlib.Path
|
|
||||||
else:
|
|
||||||
BasePath = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath
|
|
||||||
|
|
||||||
|
|
||||||
class CPath(BasePath):
|
|
||||||
"""
|
|
||||||
Hacky way to support compressed files.
|
|
||||||
If you can think of a better way to do this, please let me know! https://github.com/karlicoss/HPI/issues/20
|
|
||||||
|
|
||||||
Ugh. So, can't override Path because of some _flavour thing.
|
|
||||||
Path only has _accessor and _closed slots, so can't directly set .open method
|
|
||||||
_accessor.open has to return file descriptor, doesn't work for compressed stuff.
|
|
||||||
"""
|
|
||||||
def open(self, *args, **kwargs):
|
|
||||||
kopen_kwargs = {}
|
|
||||||
mode = kwargs.get('mode')
|
|
||||||
if mode is not None:
|
|
||||||
kopen_kwargs['mode'] = mode
|
|
||||||
encoding = kwargs.get('encoding')
|
|
||||||
if encoding is not None:
|
|
||||||
kopen_kwargs['encoding'] = encoding
|
|
||||||
# TODO assert read only?
|
|
||||||
return kopen(str(self), **kopen_kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
open = kopen # TODO deprecate
|
|
||||||
|
|
||||||
|
|
||||||
# meh
|
|
||||||
# TODO ideally switch to ZipPath or smth similar?
|
|
||||||
# nothing else supports subpath properly anyway
|
|
||||||
def kexists(path: PathIsh, subpath: str) -> bool:
|
|
||||||
try:
|
|
||||||
kopen(path, subpath)
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
import zipfile
|
|
||||||
if sys.version_info[:2] >= (3, 8):
|
|
||||||
# meh... zipfile.Path is not available on 3.7
|
|
||||||
zipfile_Path = zipfile.Path
|
|
||||||
else:
|
|
||||||
if typing.TYPE_CHECKING:
|
|
||||||
zipfile_Path = Any
|
|
||||||
else:
|
|
||||||
zipfile_Path = object
|
|
||||||
|
|
||||||
|
|
||||||
@total_ordering
|
|
||||||
class ZipPath(zipfile_Path):
|
|
||||||
# NOTE: is_dir/is_file might not behave as expected, the base class checks it only based on the slash in path
|
|
||||||
|
|
||||||
# seems that root/at are not exposed in the docs, so might be an implementation detail
|
|
||||||
root: zipfile.ZipFile
|
|
||||||
at: str
|
|
||||||
|
|
||||||
@property
|
|
||||||
def filepath(self) -> Path:
|
|
||||||
res = self.root.filename
|
|
||||||
assert res is not None # make mypy happy
|
|
||||||
return Path(res)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def subpath(self) -> Path:
|
|
||||||
return Path(self.at)
|
|
||||||
|
|
||||||
def absolute(self) -> ZipPath:
|
|
||||||
return ZipPath(self.filepath.absolute(), self.at)
|
|
||||||
|
|
||||||
def expanduser(self) -> ZipPath:
|
|
||||||
return ZipPath(self.filepath.expanduser(), self.at)
|
|
||||||
|
|
||||||
def exists(self) -> bool:
|
|
||||||
if self.at == '':
|
|
||||||
# special case, the base class returns False in this case for some reason
|
|
||||||
return self.filepath.exists()
|
|
||||||
return super().exists() or self._as_dir().exists()
|
|
||||||
|
|
||||||
def _as_dir(self) -> zipfile_Path:
|
|
||||||
# note: seems that zip always uses forward slash, regardless OS?
|
|
||||||
return zipfile_Path(self.root, self.at + '/')
|
|
||||||
|
|
||||||
def rglob(self, glob: str) -> Sequence[ZipPath]:
|
|
||||||
# note: not 100% sure about the correctness, but seem fine?
|
|
||||||
# Path.match() matches from the right, so need to
|
|
||||||
rpaths = [p for p in self.root.namelist() if p.startswith(self.at)]
|
|
||||||
rpaths = [p for p in rpaths if Path(p).match(glob)]
|
|
||||||
return [ZipPath(self.root, p) for p in rpaths]
|
|
||||||
|
|
||||||
def relative_to(self, other: ZipPath) -> Path:
|
|
||||||
assert self.filepath == other.filepath, (self.filepath, other.filepath)
|
|
||||||
return self.subpath.relative_to(other.subpath)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def parts(self) -> Sequence[str]:
|
|
||||||
# messy, but might be ok..
|
|
||||||
return self.filepath.parts + self.subpath.parts
|
|
||||||
|
|
||||||
def __truediv__(self, key) -> ZipPath:
|
|
||||||
# need to implement it so the return type is not zipfile.Path
|
|
||||||
tmp = zipfile_Path(self.root) / self.at / key
|
|
||||||
return ZipPath(self.root, tmp.at)
|
|
||||||
|
|
||||||
def iterdir(self) -> Iterator[ZipPath]:
|
|
||||||
for s in self._as_dir().iterdir():
|
|
||||||
yield ZipPath(s.root, s.at) # type: ignore[attr-defined]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def stem(self) -> str:
|
|
||||||
return self.subpath.stem
|
|
||||||
|
|
||||||
@property # type: ignore[misc]
|
|
||||||
def __class__(self):
|
|
||||||
return Path
|
|
||||||
|
|
||||||
def __eq__(self, other) -> bool:
|
|
||||||
# hmm, super class doesn't seem to treat as equals unless they are the same object
|
|
||||||
if not isinstance(other, ZipPath):
|
|
||||||
return False
|
|
||||||
return (self.filepath, self.subpath) == (other.filepath, other.subpath)
|
|
||||||
|
|
||||||
def __lt__(self, other) -> bool:
|
|
||||||
if not isinstance(other, ZipPath):
|
|
||||||
return False
|
|
||||||
return (self.filepath, self.subpath) < (other.filepath, other.subpath)
|
|
||||||
|
|
||||||
def __hash__(self) -> int:
|
|
||||||
return hash((self.filepath, self.subpath))
|
|
||||||
|
|
||||||
def stat(self) -> os.stat_result:
|
|
||||||
# NOTE: zip datetimes have no notion of time zone, usually they just keep local time?
|
|
||||||
# see https://en.wikipedia.org/wiki/ZIP_(file_format)#Structure
|
|
||||||
dt = datetime(*self.root.getinfo(self.at).date_time)
|
|
||||||
ts = int(dt.timestamp())
|
|
||||||
params = dict(
|
|
||||||
st_mode=0,
|
|
||||||
st_ino=0,
|
|
||||||
st_dev=0,
|
|
||||||
st_nlink=1,
|
|
||||||
st_uid=1000,
|
|
||||||
st_gid=1000,
|
|
||||||
st_size=0, # todo compute it properly?
|
|
||||||
st_atime=ts,
|
|
||||||
st_mtime=ts,
|
|
||||||
st_ctime=ts,
|
|
||||||
)
|
|
||||||
return os.stat_result(tuple(params.values()))
|
|
||||||
|
|
|
@ -1,128 +0,0 @@
|
||||||
from pathlib import Path
|
|
||||||
import lzma
|
|
||||||
import sys
|
|
||||||
import zipfile
|
|
||||||
|
|
||||||
from ..kompress import kopen, kexists, CPath, ZipPath
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
structure_data: Path = Path(__file__).parent / "structure_data"
|
|
||||||
|
|
||||||
|
|
||||||
def test_kopen(tmp_path: Path) -> None:
|
|
||||||
"Plaintext handled transparently"
|
|
||||||
# fmt: off
|
|
||||||
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
|
|
||||||
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
|
|
||||||
# fmt: on
|
|
||||||
|
|
||||||
"For zips behaviour is a bit different (not sure about all this, tbh...)"
|
|
||||||
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
|
|
||||||
|
|
||||||
|
|
||||||
def test_kexists(tmp_path: Path) -> None:
|
|
||||||
# TODO also test top level?
|
|
||||||
# fmt: off
|
|
||||||
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
|
|
||||||
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
|
|
||||||
# fmt: on
|
|
||||||
|
|
||||||
# TODO not sure about this?
|
|
||||||
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
|
|
||||||
|
|
||||||
|
|
||||||
def test_cpath(tmp_path: Path) -> None:
|
|
||||||
# fmt: off
|
|
||||||
CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext'
|
|
||||||
CPath( tmp_path / 'file.xz').read_text() == 'compressed text'
|
|
||||||
# fmt: on
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def prepare(tmp_path: Path):
|
|
||||||
(tmp_path / 'file').write_text('just plaintext')
|
|
||||||
with (tmp_path / 'file.xz').open('wb') as f:
|
|
||||||
with lzma.open(f, 'w') as lzf:
|
|
||||||
lzf.write(b'compressed text')
|
|
||||||
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
|
|
||||||
zf.writestr('path/in/archive', 'data in zip')
|
|
||||||
try:
|
|
||||||
yield None
|
|
||||||
finally:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def test_zippath() -> None:
|
|
||||||
target = structure_data / 'gdpr_export.zip'
|
|
||||||
assert target.exists(), target # precondition
|
|
||||||
|
|
||||||
zp = ZipPath(target)
|
|
||||||
|
|
||||||
# magic! convenient to make third party libraries agnostic of ZipPath
|
|
||||||
assert isinstance(zp, Path)
|
|
||||||
assert isinstance(zp, ZipPath)
|
|
||||||
assert isinstance(zp / 'subpath', Path)
|
|
||||||
# TODO maybe change __str__/__repr__? since it's a bit misleading:
|
|
||||||
# Path('/code/hpi/tests/core/structure_data/gdpr_export.zip', 'gdpr_export/')
|
|
||||||
|
|
||||||
assert ZipPath(target) == ZipPath(target)
|
|
||||||
assert zp.absolute() == zp
|
|
||||||
|
|
||||||
# shouldn't crash
|
|
||||||
hash(zp)
|
|
||||||
|
|
||||||
assert zp.exists()
|
|
||||||
assert (zp / 'gdpr_export' / 'comments').exists()
|
|
||||||
# check str constructor just in case
|
|
||||||
assert (ZipPath(str(target)) / 'gdpr_export' / 'comments').exists()
|
|
||||||
assert not (ZipPath(str(target)) / 'whatever').exists()
|
|
||||||
|
|
||||||
matched = list(zp.rglob('*'))
|
|
||||||
assert len(matched) > 0
|
|
||||||
assert all(p.filepath == target for p in matched), matched
|
|
||||||
|
|
||||||
rpaths = [p.relative_to(zp) for p in matched]
|
|
||||||
gdpr_export = Path('gdpr_export')
|
|
||||||
# fmt: off
|
|
||||||
assert rpaths == [
|
|
||||||
gdpr_export,
|
|
||||||
gdpr_export / 'comments',
|
|
||||||
gdpr_export / 'comments' / 'comments.json',
|
|
||||||
gdpr_export / 'profile',
|
|
||||||
gdpr_export / 'profile' / 'settings.json',
|
|
||||||
gdpr_export / 'messages',
|
|
||||||
gdpr_export / 'messages' / 'index.csv',
|
|
||||||
], rpaths
|
|
||||||
# fmt: on
|
|
||||||
|
|
||||||
# TODO hmm this doesn't work atm, whereas Path does
|
|
||||||
# not sure if it should be defensive or something...
|
|
||||||
# ZipPath('doesnotexist')
|
|
||||||
# same for this one
|
|
||||||
# assert ZipPath(Path('test'), 'whatever').absolute() == ZipPath(Path('test').absolute(), 'whatever')
|
|
||||||
|
|
||||||
assert (ZipPath(target) / 'gdpr_export' / 'comments').exists()
|
|
||||||
|
|
||||||
jsons = [p.relative_to(zp / 'gdpr_export') for p in zp.rglob('*.json')]
|
|
||||||
# fmt: off
|
|
||||||
assert jsons == [
|
|
||||||
Path('comments', 'comments.json'),
|
|
||||||
Path('profile' , 'settings.json'),
|
|
||||||
]
|
|
||||||
# fmt: on
|
|
||||||
|
|
||||||
# NOTE: hmm interesting, seems that ZipPath is happy with forward slash regardless OS?
|
|
||||||
assert list(zp.rglob('mes*')) == [ZipPath(target, 'gdpr_export/messages')]
|
|
||||||
|
|
||||||
iterdir_res = list((zp / 'gdpr_export').iterdir())
|
|
||||||
assert len(iterdir_res) == 3
|
|
||||||
assert all(isinstance(p, Path) for p in iterdir_res)
|
|
||||||
|
|
||||||
# date recorded in the zip archive
|
|
||||||
assert (zp / 'gdpr_export' / 'comments' / 'comments.json').stat().st_mtime > 1625000000
|
|
||||||
# TODO ugh.
|
|
||||||
# unzip -l shows the date as 2021-07-01 09:43
|
|
||||||
# however, python reads it as 2021-07-01 01:43 ??
|
|
||||||
# don't really feel like dealing with this for now, it's not tz aware anyway
|
|
|
@ -1 +0,0 @@
|
||||||
../core/kompress.py
|
|
6
my/kython/kompress.py
Normal file
6
my/kython/kompress.py
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
from my.core import __NOT_HPI_MODULE__
|
||||||
|
from my.core import warnings
|
||||||
|
|
||||||
|
warnings.high('my.kython.kompress is deprecated, please use "kompress" library directly. See https://github.com/karlicoss/kompress')
|
||||||
|
|
||||||
|
from my.core.kompress import *
|
1
setup.py
1
setup.py
|
@ -9,6 +9,7 @@ INSTALL_REQUIRES = [
|
||||||
'more-itertools', # it's just too useful and very common anyway
|
'more-itertools', # it's just too useful and very common anyway
|
||||||
'decorator' , # less pain in writing correct decorators. very mature and stable, so worth keeping in core
|
'decorator' , # less pain in writing correct decorators. very mature and stable, so worth keeping in core
|
||||||
'click>=8.1' , # for the CLI, printing colors, decorator-based - may allow extensions to CLI
|
'click>=8.1' , # for the CLI, printing colors, decorator-based - may allow extensions to CLI
|
||||||
|
'kompress' , # for transparent access to compressed files via pathlib.Path
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue