core: more consistent handling of zip archives in get_files + tests

This commit is contained in:
karlicoss 2023-10-14 19:21:58 +01:00
parent 9ffce1b696
commit c63e80ce94
2 changed files with 59 additions and 18 deletions

View file

@ -3,6 +3,7 @@ from pathlib import Path
from datetime import datetime
import functools
from contextlib import contextmanager
import os
import sys
import types
from typing import Union, Callable, Dict, Iterable, TypeVar, Sequence, List, Optional, Any, cast, Tuple, TYPE_CHECKING, NoReturn
@ -161,11 +162,6 @@ from .logging import setup_logger, LazyLogger
Paths = Union[Sequence[PathIsh], PathIsh]
def _is_zippath(p: Path) -> bool:
# weak type check here, don't want to depend on kompress library in get_files
return type(p).__name__ == 'ZipPath'
DEFAULT_GLOB = '*'
def get_files(
pp: Paths,
@ -192,15 +188,12 @@ def get_files(
def caller() -> str:
import traceback
# TODO ugh. very flaky... -3 because [<this function>, get_files(), <actual caller>]
return traceback.extract_stack()[-3].filename
paths: List[Path] = []
for src in sources:
if _is_zippath(src):
paths.append(src)
continue
if src.parts[0] == '~':
src = src.expanduser()
# note: glob handled first, because e.g. on Windows asterisk makes is_dir unhappy
@ -209,15 +202,18 @@ def get_files(
if glob != DEFAULT_GLOB:
warnings.warn(f"{caller()}: treating {gs} as glob path. Explicit glob={glob} argument is ignored!")
paths.extend(map(Path, do_glob(gs)))
elif src.is_dir():
elif os.path.isdir(str(src)):
# NOTE: we're using os.path here on purpose instead of src.is_dir
# the reason is is_dir for archives might return True and then
# this clause would try globbing insize the archives
# this is generally undesirable (since modules handle archives themselves)
# todo not sure if should be recursive?
# note: glob='**/*.ext' works without any changes.. so perhaps it's ok as it is
gp: Iterable[Path] = src.glob(glob)
paths.extend(gp)
else:
if not src.is_file():
# todo not sure, might be race condition?
raise RuntimeError(f"Expected '{src}' to exist")
assert src.exists(), src
# todo assert matches glob??
paths.append(src)
@ -231,11 +227,24 @@ def get_files(
'''.strip())
# traceback is useful to figure out what config caused it?
import traceback
traceback.print_stack()
if guess_compression:
from kompress import CPath, is_compressed
paths = [CPath(p) if is_compressed(p) and not _is_zippath(p) else p for p in paths] # TODO fwtf is going on here?... make sure it's tested
from .kompress import CPath, is_compressed, ZipPath
# NOTE: wrap is just for backwards compat with vendorized kompress
# with kompress library, only is_compressed check and Cpath should be enough
def wrap(p: Path) -> Path:
if isinstance(p, ZipPath):
return p
if p.suffix == '.zip':
return ZipPath(p) # type: ignore[return-value]
if is_compressed(p):
return CPath(p)
return p
paths = [wrap(p) for p in paths]
return tuple(paths)

View file

@ -3,15 +3,18 @@ from pathlib import Path
import shutil
import tempfile
from typing import TYPE_CHECKING
import zipfile
from ..compat import windows
from ..common import get_files
from ..compat import windows
from ..kompress import CPath, ZipPath
import pytest
# hack to replace all /tmp with 'real' tmp dir
# not ideal, but makes tests more concise
# TODO get rid of this, it's super confusing..
def _get_files(x, *args, **kwargs):
from ..common import get_files as get_files_orig
@ -27,9 +30,10 @@ def _get_files(x, *args, **kwargs):
x = repl(x)
res = get_files_orig(x, *args, **kwargs)
return tuple(Path(str(i).replace(TMP, '/tmp')) for i in res) # hack back for asserts..
return tuple(type(i)(str(i).replace(TMP, '/tmp')) for i in res) # hack back for asserts..
get_files_orig = get_files
if not TYPE_CHECKING:
get_files = _get_files
@ -136,6 +140,34 @@ def test_no_files() -> None:
assert get_files('bad*glob') == ()
def test_compressed(tmp_path: Path) -> None:
file1 = tmp_path / 'file_1.zstd'
file2 = tmp_path / 'file_2.zip'
file3 = tmp_path / 'file_3.csv'
file1.touch()
with zipfile.ZipFile(file2, 'w') as zf:
zf.writestr('path/in/archive', 'data in zip')
file3.touch()
results = get_files_orig(tmp_path)
[res1, res2, res3] = results
assert isinstance(res1, CPath)
assert isinstance(res2, ZipPath) # NOTE this didn't work on vendorized kompress, but it's fine, was never used?
assert not isinstance(res3, CPath)
results = get_files_orig(
[CPath(file1), ZipPath(file2), file3],
# sorting a mixture of ZipPath/Path was broken in old kompress
# it almost never happened though (usually it's only a bunch of ZipPath, so not a huge issue)
sort=False,
)
[res1, res2, res3] = results
assert isinstance(res1, CPath)
assert isinstance(res2, ZipPath)
assert not isinstance(res3, CPath)
# TODO not sure if should uniquify if the filenames end up same?
# TODO not sure about the symlinks? and hidden files?