my.core.structure: add support for .tar.gz archives

this will be useful to migrate .tar.gz processing to kompress in a backwards compatible way, or to run them against unpacked folder structure if user prefers
This commit is contained in:
Dima Gerasimov 2024-09-16 23:41:58 +01:00 committed by karlicoss
parent 27178c0939
commit 201ddd4d7c
3 changed files with 33 additions and 21 deletions

View file

@ -1,6 +1,8 @@
import atexit import atexit
import os import os
import shutil import shutil
import sys
import tarfile
import tempfile import tempfile
import zipfile import zipfile
from contextlib import contextmanager from contextlib import contextmanager
@ -34,6 +36,7 @@ def _structure_exists(base_dir: Path, paths: Sequence[str], *, partial: bool = F
ZIP_EXT = {".zip"} ZIP_EXT = {".zip"}
TARGZ_EXT = {".tar.gz"}
@contextmanager @contextmanager
@ -44,7 +47,7 @@ def match_structure(
partial: bool = False, partial: bool = False,
) -> Generator[Tuple[Path, ...], None, None]: ) -> Generator[Tuple[Path, ...], None, None]:
""" """
Given a 'base' directory or zipfile, recursively search for one or more paths that match the Given a 'base' directory or archive (zip/tar.gz), recursively search for one or more paths that match the
pattern described in 'expected'. That can be a single string, or a list pattern described in 'expected'. That can be a single string, or a list
of relative paths (as strings) you expect at the same directory. of relative paths (as strings) you expect at the same directory.
@ -52,12 +55,12 @@ def match_structure(
expected be present, not all of them. expected be present, not all of them.
This reduces the chances of the user misconfiguring gdpr exports, e.g. This reduces the chances of the user misconfiguring gdpr exports, e.g.
if they zipped the folders instead of the parent directory or vice-versa if they archived the folders instead of the parent directory or vice-versa
When this finds a matching directory structure, it stops searching in that subdirectory When this finds a matching directory structure, it stops searching in that subdirectory
and continues onto other possible subdirectories which could match and continues onto other possible subdirectories which could match
If base is a zipfile, this extracts the zipfile into a temporary directory If base is an archive, this extracts it into a temporary directory
(configured by core_config.config.get_tmp_dir), and then searches the extracted (configured by core_config.config.get_tmp_dir), and then searches the extracted
folder for matching structures folder for matching structures
@ -93,12 +96,12 @@ def match_structure(
This doesn't require an exhaustive list of expected values, but its a good idea to supply This doesn't require an exhaustive list of expected values, but its a good idea to supply
a complete picture of the expected structure to avoid false-positives a complete picture of the expected structure to avoid false-positives
This does not recursively unzip zipfiles in the subdirectories, This does not recursively decompress archives in the subdirectories,
it only unzips into a temporary directory if 'base' is a zipfile it only unpacks into a temporary directory if 'base' is an archive
A common pattern for using this might be to use get_files to get a list A common pattern for using this might be to use get_files to get a list
of zipfiles or top-level gdpr export directories, and use match_structure of archives or top-level gdpr export directories, and use match_structure
to search the resulting paths for a export structure you're expecting to search the resulting paths for an export structure you're expecting
""" """
from . import core_config as CC from . import core_config as CC
@ -108,26 +111,34 @@ def match_structure(
expected = (expected,) expected = (expected,)
is_zip: bool = base.suffix in ZIP_EXT is_zip: bool = base.suffix in ZIP_EXT
is_targz: bool = any(base.name.endswith(suffix) for suffix in TARGZ_EXT)
searchdir: Path = base.absolute() searchdir: Path = base.absolute()
try: try:
# if the file given by the user is a zipfile, create a temporary # if the file given by the user is an archive, create a temporary
# directory and extract the zipfile to that temporary directory # directory and extract it to that temporary directory
# #
# this temporary directory is removed in the finally block # this temporary directory is removed in the finally block
if is_zip: if is_zip or is_targz:
# sanity check before we start creating directories/rm-tree'ing things # sanity check before we start creating directories/rm-tree'ing things
assert base.exists(), f"zipfile at {base} doesn't exist" assert base.exists(), f"archive at {base} doesn't exist"
searchdir = Path(tempfile.mkdtemp(dir=tdir)) searchdir = Path(tempfile.mkdtemp(dir=tdir))
if is_zip:
# base might already be a ZipPath, and str(base) would end with / # base might already be a ZipPath, and str(base) would end with /
zf = zipfile.ZipFile(str(base).rstrip('/')) zf = zipfile.ZipFile(str(base).rstrip('/'))
zf.extractall(path=str(searchdir)) zf.extractall(path=str(searchdir))
elif is_targz:
with tarfile.open(str(base)) as tar:
# filter is a security feature, will be required param in later python version
mfilter = {'filter': 'data'} if sys.version_info[:2] >= (3, 12) else {}
tar.extractall(path=str(searchdir), **mfilter) # type: ignore[arg-type]
else:
raise RuntimeError("can't happen")
else: else:
if not searchdir.is_dir(): if not searchdir.is_dir():
raise NotADirectoryError(f"Expected either a zipfile or a directory, received {searchdir}") raise NotADirectoryError(f"Expected either a zip/tar.gz archive or a directory, received {searchdir}")
matches: List[Path] = [] matches: List[Path] = []
possible_targets: List[Path] = [searchdir] possible_targets: List[Path] = [searchdir]
@ -150,9 +161,9 @@ def match_structure(
finally: finally:
if is_zip: if is_zip or is_targz:
# make sure we're not mistakenly deleting data # make sure we're not mistakenly deleting data
assert str(searchdir).startswith(str(tdir)), f"Expected the temporary directory for extracting zip to start with the temporary directory prefix ({tdir}), found {searchdir}" assert str(searchdir).startswith(str(tdir)), f"Expected the temporary directory for extracting archive to start with the temporary directory prefix ({tdir}), found {searchdir}"
shutil.rmtree(str(searchdir)) shutil.rmtree(str(searchdir))

View file

@ -14,8 +14,9 @@ def test_gdpr_structure_exists() -> None:
assert results == (structure_data / "gdpr_subdirs" / "gdpr_export",) assert results == (structure_data / "gdpr_subdirs" / "gdpr_export",)
def test_gdpr_unzip() -> None: @pytest.mark.parametrize("archive", ["gdpr_export.zip", "gdpr_export.tar.gz"])
with match_structure(structure_data / "gdpr_export.zip", expected=gdpr_expected) as results: def test_gdpr_unpack(archive: str) -> None:
with match_structure(structure_data / archive, expected=gdpr_expected) as results:
assert len(results) == 1 assert len(results) == 1
extracted = results[0] extracted = results[0]
index_file = extracted / "messages" / "index.csv" index_file = extracted / "messages" / "index.csv"
@ -32,6 +33,6 @@ def test_match_partial() -> None:
def test_not_directory() -> None: def test_not_directory() -> None:
with pytest.raises(NotADirectoryError, match=r"Expected either a zipfile or a directory"): with pytest.raises(NotADirectoryError, match=r"Expected either a zip/tar.gz archive or a directory"):
with match_structure(structure_data / "messages/index.csv", expected=gdpr_expected): with match_structure(structure_data / "messages/index.csv", expected=gdpr_expected):
pass pass

Binary file not shown.