core/structure: help locate/extract gdpr exports (#175)
* core/structure: help locate/extract gdpr exports * ci: add install-types to install stub packages
This commit is contained in:
parent
8ca88bde2e
commit
821bc08a23
10 changed files with 192 additions and 2 deletions
|
@ -18,6 +18,7 @@ def import_file(p: PathIsh, name: Optional[str]=None) -> types.ModuleType:
|
||||||
name = p.stem
|
name = p.stem
|
||||||
import importlib.util
|
import importlib.util
|
||||||
spec = importlib.util.spec_from_file_location(name, p)
|
spec = importlib.util.spec_from_file_location(name, p)
|
||||||
|
assert spec is not None, f"Fatal error; Could not create module spec from {name} {p}"
|
||||||
foo = importlib.util.module_from_spec(spec)
|
foo = importlib.util.module_from_spec(spec)
|
||||||
loader = spec.loader; assert loader is not None
|
loader = spec.loader; assert loader is not None
|
||||||
loader.exec_module(foo) # type: ignore[attr-defined]
|
loader.exec_module(foo) # type: ignore[attr-defined]
|
||||||
|
|
150
my/core/structure.py
Normal file
150
my/core/structure.py
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
from typing import Sequence, Generator, List, Union, Tuple
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from . import warnings as core_warnings
|
||||||
|
|
||||||
|
|
||||||
|
def _structure_exists(base_dir: Path, paths: Sequence[str]) -> bool:
|
||||||
|
"""
|
||||||
|
Helper function for match_structure to check if
|
||||||
|
all subpaths exist at some base directory
|
||||||
|
|
||||||
|
For example:
|
||||||
|
|
||||||
|
dir1
|
||||||
|
├── index.json
|
||||||
|
└── messages
|
||||||
|
└── messages.csv
|
||||||
|
|
||||||
|
_structure_exists(Path("dir1"), ["index.json", "messages/messages.csv"])
|
||||||
|
"""
|
||||||
|
for p in paths:
|
||||||
|
target: Path = base_dir / p
|
||||||
|
if not target.exists():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
ZIP_EXT = {".zip"}
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def match_structure(
|
||||||
|
base: Path,
|
||||||
|
expected: Union[str, Sequence[str]],
|
||||||
|
) -> Generator[Tuple[Path, ...], None, None]:
|
||||||
|
"""
|
||||||
|
Given a 'base' directory or zipfile, recursively search for one or more paths that match the
|
||||||
|
pattern described in 'expected'. That can be a single string, or a list
|
||||||
|
of relative paths (as strings) you expect at the same directory.
|
||||||
|
|
||||||
|
This reduces the chances of the user misconfiguring gdpr exports, e.g.
|
||||||
|
if they zipped the folders instead of the parent directory or vice-versa
|
||||||
|
|
||||||
|
When this finds a matching directory structure, it stops searching in that subdirectory
|
||||||
|
and continues onto other possible subdirectories which could match
|
||||||
|
|
||||||
|
If base is a zipfile, this extracts the zipfile into a temporary directory
|
||||||
|
(configured by core_config.config.get_tmp_dir), and then searches the extracted
|
||||||
|
folder for matching structures
|
||||||
|
|
||||||
|
This returns the top of every matching folder structure it finds
|
||||||
|
|
||||||
|
As an example:
|
||||||
|
|
||||||
|
export_dir
|
||||||
|
├── exp_2020
|
||||||
|
│ ├── channel_data
|
||||||
|
│ │ ├── data1
|
||||||
|
│ │ └── data2
|
||||||
|
│ ├── index.json
|
||||||
|
│ ├── messages
|
||||||
|
│ │ └── messages.csv
|
||||||
|
│ └── profile
|
||||||
|
│ └── settings.json
|
||||||
|
└── exp_2021
|
||||||
|
├── channel_data
|
||||||
|
│ ├── data1
|
||||||
|
│ └── data2
|
||||||
|
├── index.json
|
||||||
|
├── messages
|
||||||
|
│ └── messages.csv
|
||||||
|
└── profile
|
||||||
|
└── settings.json
|
||||||
|
|
||||||
|
Giving the top directory as the base, and some expected relative path like:
|
||||||
|
|
||||||
|
with match_structure(Path("export_dir"), expected=("messages/messages.csv", "index.json")) as results:
|
||||||
|
# results in this block is (Path("export_dir/exp_2020"), Path("export_dir/exp_2021"))
|
||||||
|
|
||||||
|
This doesn't require an exhaustive list of expected values, but its a good idea to supply
|
||||||
|
a complete picture of the expected structure to avoid false-positives
|
||||||
|
|
||||||
|
This does not recursively unzip zipfiles in the subdirectories,
|
||||||
|
it only unzips into a temporary directory if 'base' is a zipfile
|
||||||
|
|
||||||
|
A common pattern for using this might be to use get_files to get a list
|
||||||
|
of zipfiles or top-level gdpr export directories, and use match_structure
|
||||||
|
to search the resulting paths for a export structure you're expecting
|
||||||
|
"""
|
||||||
|
from . import core_config as CC
|
||||||
|
|
||||||
|
tdir = CC.config.get_tmp_dir()
|
||||||
|
|
||||||
|
if isinstance(expected, str):
|
||||||
|
expected = (expected,)
|
||||||
|
|
||||||
|
is_zip: bool = base.suffix in ZIP_EXT
|
||||||
|
|
||||||
|
searchdir: Path = base.absolute()
|
||||||
|
try:
|
||||||
|
# if the file given by the user is a zipfile, create a temporary
|
||||||
|
# directory and extract the zipfile to that temporary directory
|
||||||
|
#
|
||||||
|
# this temporary directory is removed in the finally block
|
||||||
|
if is_zip:
|
||||||
|
# sanity check before we start creating directories/rm-tree'ing things
|
||||||
|
assert base.exists(), f"zipfile at {base} doesn't exist"
|
||||||
|
|
||||||
|
searchdir = Path(tempfile.mkdtemp(dir=tdir))
|
||||||
|
|
||||||
|
zf = zipfile.ZipFile(base)
|
||||||
|
zf.extractall(path=str(searchdir))
|
||||||
|
|
||||||
|
else:
|
||||||
|
if not searchdir.is_dir():
|
||||||
|
raise NotADirectoryError(f"Expected either a zipfile or a directory, received {searchdir}")
|
||||||
|
|
||||||
|
matches: List[Path] = []
|
||||||
|
possible_targets: List[Path] = [searchdir]
|
||||||
|
|
||||||
|
while len(possible_targets) > 0:
|
||||||
|
p = possible_targets.pop(0)
|
||||||
|
|
||||||
|
# factored out into a function to avoid weird stuff with continues/loop state
|
||||||
|
if _structure_exists(p, expected):
|
||||||
|
matches.append(p)
|
||||||
|
else:
|
||||||
|
# extend the list of possible targets with any subdirectories
|
||||||
|
for f in os.scandir(p):
|
||||||
|
if f.is_dir():
|
||||||
|
possible_targets.append(p / f.name)
|
||||||
|
|
||||||
|
if len(matches) == 0:
|
||||||
|
core_warnings.high(f"""While searching {base}, could not find a matching folder structure. Expected {expected}. You're probably missing required files in the gdpr/export""")
|
||||||
|
|
||||||
|
yield tuple(matches)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
|
||||||
|
if is_zip:
|
||||||
|
# make sure we're not mistakenly deleting data
|
||||||
|
assert str(searchdir).startswith(str(tdir)), f"Expected the temporary directory for extracting zip to start with the temporary directory prefix ({tdir}), found {searchdir}"
|
||||||
|
|
||||||
|
shutil.rmtree(str(searchdir))
|
BIN
tests/core/structure_data/gdpr_export.zip
Normal file
BIN
tests/core/structure_data/gdpr_export.zip
Normal file
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
test message
|
|
37
tests/core/test_structure.py
Normal file
37
tests/core/test_structure.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from my.core.structure import match_structure
|
||||||
|
|
||||||
|
|
||||||
|
structure_data: Path = Path(__file__).parent / "structure_data"
|
||||||
|
|
||||||
|
gdpr_expected = ("comments", "messages/index.csv", "profile")
|
||||||
|
|
||||||
|
|
||||||
|
def test_gdpr_structure_exists() -> None:
|
||||||
|
with match_structure(structure_data, expected=gdpr_expected) as results:
|
||||||
|
assert results == (structure_data / "gdpr_subdirs" / "gdpr_export",)
|
||||||
|
|
||||||
|
|
||||||
|
def test_gdpr_unzip() -> None:
|
||||||
|
|
||||||
|
with match_structure(
|
||||||
|
structure_data / "gdpr_export.zip", expected=gdpr_expected
|
||||||
|
) as results:
|
||||||
|
assert len(results) == 1
|
||||||
|
extracted = results[0]
|
||||||
|
index_file = extracted / "messages" / "index.csv"
|
||||||
|
assert index_file.read_text().strip() == "test message"
|
||||||
|
|
||||||
|
# make sure the temporary directory this created no longer exists
|
||||||
|
assert not extracted.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_not_directory() -> None:
|
||||||
|
with pytest.raises(NotADirectoryError, match=r"Expected either a zipfile or a directory"):
|
||||||
|
with match_structure(
|
||||||
|
structure_data / "messages/index.csv", expected=gdpr_expected
|
||||||
|
):
|
||||||
|
pass
|
5
tox.ini
5
tox.ini
|
@ -59,7 +59,8 @@ commands =
|
||||||
pip install -e .[testing,optional]
|
pip install -e .[testing,optional]
|
||||||
pip install orgparse # used it core.orgmode?
|
pip install orgparse # used it core.orgmode?
|
||||||
# todo add tests?
|
# todo add tests?
|
||||||
python3 -m mypy -p my.core \
|
python3 -m mypy --install-types --non-interactive \
|
||||||
|
-p my.core \
|
||||||
--txt-report .coverage.mypy-core \
|
--txt-report .coverage.mypy-core \
|
||||||
--html-report .coverage.mypy-core \
|
--html-report .coverage.mypy-core \
|
||||||
{posargs}
|
{posargs}
|
||||||
|
@ -89,7 +90,7 @@ commands =
|
||||||
|
|
||||||
# todo fuck. -p my.github isn't checking the subpackages?? wtf...
|
# todo fuck. -p my.github isn't checking the subpackages?? wtf...
|
||||||
# guess it wants .pyi file??
|
# guess it wants .pyi file??
|
||||||
python3 -m mypy \
|
python3 -m mypy --install-types --non-interactive \
|
||||||
-p my.endomondo \
|
-p my.endomondo \
|
||||||
-p my.github.ghexport \
|
-p my.github.ghexport \
|
||||||
-p my.hypothesis \
|
-p my.hypothesis \
|
||||||
|
|
Loading…
Add table
Reference in a new issue