diff --git a/my/core/structure.py b/my/core/structure.py new file mode 100644 index 0000000..1c04420 --- /dev/null +++ b/my/core/structure.py @@ -0,0 +1,151 @@ +import os +import shutil +import tempfile +import zipfile + +from typing import Sequence, Generator, List, Union, Tuple +from contextlib import contextmanager +from pathlib import Path + +from . import warnings as core_warnings + + +def _structure_exists(base_dir: Path, paths: Sequence[str]) -> bool: + """ + Helper function for match_structure to check if + all subpaths exist at some base directory + + For example: + + dir1 + ├── index.json + └── messages + └── messages.csv + + _structure_exists(Path("dir1"), ["index.json", "messages/messages.csv"]) + """ + for p in paths: + target: Path = base_dir / p + if not target.exists(): + return False + return True + + +ZIP_EXT = {".zip"} + + +@contextmanager +def match_structure( + base: Path, + expected: Union[str, Sequence[str]], +) -> Generator[Tuple[Path, ...], None, None]: + """ + Given a 'base' directory or zipfile, recursively search for one or more paths that match the + pattern described in 'expected'. That can be a single string, or a list + of relative paths (as strings) you expect at the same directory. + + This reduces the chances of the user misconfiguring gdpr exports, e.g. + if they zipped the folders instead of the parent directory or vice-versa + + When this finds a matching directory structure, it stops searching in that subdirectory + and continues onto other possible subdirectories which could match + + If base is a zipfile, this extracts the zipfile into a temporary directory + (configured by core_config.config.get_tmp_dir), and then searches the extracted + folder for matching structures + + This returns the top of every matching folder structure it finds + + As an example: + + export_dir + ├── exp_2020 + │   ├── channel_data + │   │   ├── data1 + │   │   └── data2 + │   ├── index.json + │   ├── messages + │   │   └── messages.csv + │   └── profile + │   └── settings.json + └── exp_2021 + ├── channel_data + │   ├── data1 + │   └── data2 + ├── index.json + ├── messages + │   └── messages.csv + └── profile + └── settings.json + + Giving the top directory as the base, and some expected relative path like: + + with match_structure(Path("export_dir"), expected=("messages/messages.csv", "index.json")) as results: + # results in this block is (Path("export_dir/exp_2020"), Path("export_dir/exp_2021")) + + This doesn't require an exhaustive list of expected values, but its a good idea to supply + a complete picture of the expected structure to avoid false-positives + + This does not recursively unzip zipfiles in the subdirectories, + it only unzips into a temporary directory if 'base' is a zipfile + + A common pattern for using this might be to use get_files to get a list + of zipfiles or top-level gdpr export directories, and use match_structure + to search the resulting paths for a export structure you're expecting + """ + from . import core_config as CC + + tdir = CC.config.get_tmp_dir() + + if isinstance(expected, str): + expected = (expected,) + + is_zip: bool = base.suffix in ZIP_EXT + + searchdir: Path = base.absolute() + try: + # if the file given by the user is a zipfile, create a temporary + # directory and extract the zipfile to that temporary directory + # + # this temporary directory is removed in the finally block + if is_zip: + # sanity check before we start creating directories/rm-tree'ing things + assert base.exists(), f"zipfile at {base} doesn't exist" + + sd = Path(tempfile.mkdtemp(dir=tdir)) + searchdir = Path(sd) + + zf = zipfile.ZipFile(base) + zf.extractall(path=sd) + + matches: List[Path] = [] + possible_targets: List[Path] = [searchdir] + + while len(possible_targets) > 0: + p = possible_targets.pop(0) + + # factored out into a function to avoid weird stuff with continues/loop state + if _structure_exists(p, expected): + matches.append(p) + else: + # extend the list of possible targets with any subdirectories + for f in os.scandir(p): + if f.is_dir(): + possible_targets.append(p / f.name) + + if len(matches) == 0: + core_warnings.high( + f"""While searching {base}, could not find a matching folder structure. Expected {expected}. You're probably missing required files in the gdpr/export""" + ) + + yield tuple(matches) + + finally: + + if is_zip: + # make sure we're not mistakenly deleting data + assert str(searchdir).startswith( + str(tdir) + ), f"Expected the temporary directory for extracting zip to start with the temporary directory prefix ({tdir}), found {searchdir}" + + shutil.rmtree(str(searchdir)) diff --git a/tests/core/structure_data/gdpr_export.zip b/tests/core/structure_data/gdpr_export.zip new file mode 100644 index 0000000..e0c8857 Binary files /dev/null and b/tests/core/structure_data/gdpr_export.zip differ diff --git a/tests/core/structure_data/gdpr_subdirs/broken_export/comments/comments.json b/tests/core/structure_data/gdpr_subdirs/broken_export/comments/comments.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/structure_data/gdpr_subdirs/broken_export/messages/index.csv b/tests/core/structure_data/gdpr_subdirs/broken_export/messages/index.csv new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/structure_data/gdpr_subdirs/gdpr_export/comments/comments.json b/tests/core/structure_data/gdpr_subdirs/gdpr_export/comments/comments.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/structure_data/gdpr_subdirs/gdpr_export/messages/index.csv b/tests/core/structure_data/gdpr_subdirs/gdpr_export/messages/index.csv new file mode 100644 index 0000000..da5a1d5 --- /dev/null +++ b/tests/core/structure_data/gdpr_subdirs/gdpr_export/messages/index.csv @@ -0,0 +1 @@ +test message diff --git a/tests/core/structure_data/gdpr_subdirs/gdpr_export/profile/settings.json b/tests/core/structure_data/gdpr_subdirs/gdpr_export/profile/settings.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/test_structure.py b/tests/core/test_structure.py new file mode 100644 index 0000000..9250957 --- /dev/null +++ b/tests/core/test_structure.py @@ -0,0 +1,27 @@ +from my.core.structure import match_structure + +from pathlib import Path + +structure_data: Path = Path(__file__).parent / "structure_data" + +gdpr_expected = ("comments", "messages/index.csv", "profile") + + +def test_gdpr_structure_exists() -> None: + with match_structure(structure_data, expected=gdpr_expected) as results: + assert results == (structure_data / "gdpr_subdirs" / "gdpr_export",) + + +def test_gdpr_unzip() -> None: + + with match_structure( + structure_data / "gdpr_export.zip", expected=gdpr_expected + ) as results: + assert len(results) == 1 + extracted = results[0] + index_file = extracted / "messages" / "index.csv" + assert index_file.read_text().strip() == "test message" + + # make sure the temporary directory this created no longer exists + assert not extracted.exists() +