diff --git a/my/core/common.py b/my/core/common.py index fba7efb..92806d2 100644 --- a/my/core/common.py +++ b/my/core/common.py @@ -163,12 +163,6 @@ from .logging import setup_logger, LazyLogger Paths = Union[Sequence[PathIsh], PathIsh] -def _is_compressed(p: Path) -> bool: - # todo kinda lame way for now.. use mime ideally? - # should cooperate with kompress.kopen? - return p.suffix in {'.xz', '.lz4', '.zstd'} - - DEFAULT_GLOB = '*' def get_files( pp: Paths, @@ -233,8 +227,8 @@ def get_files( traceback.print_stack() if guess_compression: - from .kompress import CPath - paths = [CPath(p) if _is_compressed(p) else p for p in paths] + from .kompress import CPath, is_compressed + paths = [CPath(p) if is_compressed(p) else p for p in paths] return tuple(paths) diff --git a/my/core/kompress.py b/my/core/kompress.py index 4fa2840..a9a8576 100644 --- a/my/core/kompress.py +++ b/my/core/kompress.py @@ -9,6 +9,20 @@ import io PathIsh = Union[Path, str] +class Ext: + xz = '.xz' + zip = '.zip' + lz4 = '.lz4' + zstd = '.zstd' + targz = '.tar.gz' + + +def is_compressed(p: Path) -> bool: + # todo kinda lame way for now.. use mime ideally? + # should cooperate with kompress.kopen? + return any(p.name.endswith(ext) for ext in {Ext.xz, Ext.zip, Ext.lz4, Ext.zstd, Ext.targz}) + + def _zstd_open(path: Path, *args, **kwargs) -> IO[str]: import zstandard as zstd # type: ignore fh = path.open('rb') @@ -25,15 +39,15 @@ def kopen(path: PathIsh, *args, mode: str='rt', **kwargs) -> IO[str]: kwargs['encoding'] = encoding pp = Path(path) - suf = pp.suffix - if suf in {'.xz'}: + name = pp.name + if name.endswith(Ext.xz): import lzma r = lzma.open(pp, mode, *args, **kwargs) # should only happen for binary mode? # file:///usr/share/doc/python3/html/library/lzma.html?highlight=lzma#lzma.open assert not isinstance(r, lzma.LZMAFile), r return r - elif suf in {'.zip'}: + elif name.endswith(Ext.zip): # eh. this behaviour is a bit dodgy... from zipfile import ZipFile zfile = ZipFile(pp) @@ -49,11 +63,18 @@ def kopen(path: PathIsh, *args, mode: str='rt', **kwargs) -> IO[str]: # TODO pass all kwargs here?? # todo 'expected "BinaryIO"'?? return io.TextIOWrapper(ifile, encoding=encoding) # type: ignore[arg-type] - elif suf in {'.lz4'}: + elif name.endswith(Ext.lz4): import lz4.frame # type: ignore return lz4.frame.open(str(pp), mode, *args, **kwargs) - elif suf in {'.zstd'}: + elif name.endswith(Ext.zstd): return _zstd_open(pp, mode, *args, **kwargs) + elif name.endswith(Ext.targz): + import tarfile + # FIXME pass mode? + tf = tarfile.open(pp) + # TODO pass encoding? + x = tf.extractfile(*args); assert x is not None + return x # type: ignore[return-value] else: return pp.open(mode, *args, **kwargs)