general: move most core tests inside my.core.tests package

- distributes tests alongside the package, might be convenient for package users
- removes some weird indirection (e.g. dummy test files improting tests from modules)
- makes the command line for tests cleaner (e.g. no need to remember to manually add files to tox.ini)
- tests automatically covered by mypy (so makes mypy runs cleaner and ultimately better coverage)

The (vague) convention is

- tests/somemodule.py -- testing my.core.somemodule, contains tests directly re
- tests/test_something.py -- testing a specific feature, e.g. test_get_files.py tests get_files methon only
This commit is contained in:
Dima Gerasimov 2023-05-24 23:39:21 +01:00 committed by karlicoss
parent 04d976f937
commit 9594caa1cd
18 changed files with 77 additions and 102 deletions

View file

@ -1,14 +0,0 @@
import os
from subprocess import check_call
def test_lists_modules() -> None:
# hack PYTHONUTF8 for windows
# see https://github.com/karlicoss/promnesia/issues/274
# https://memex.zulipchat.com/#narrow/stream/279600-promnesia/topic/indexing.3A.20utf8.28emoji.29.20filenames.20in.20Windows
# necessary for this test cause emooji is causing trouble
# TODO need to fix it properly
env = {
**os.environ,
'PYTHONUTF8': '1',
}
check_call(['hpi', 'modules'], env=env)

View file

@ -1,25 +0,0 @@
'''
NOTE: Sigh. it's nice to be able to define the tests next to the source code (so it serves as documentation).
However, if you run 'pytest --pyargs my.core', it detects 'core' package name (because there is no my/__init__.py)
(see https://docs.pytest.org/en/latest/goodpractices.html#tests-as-part-of-application-code)
This results in relative imports failing (e.g. from ..core import...).
By using this helper file, pytest can detect the package name properly. A bit meh, but perhaps later,
we can run against the tests in my.core directly.
'''
from my.core.cfg import *
from my.core.common import *
from my.core.core_config import *
from my.core.error import *
from my.core.util import *
from my.core.discovery_pure import *
from my.core.freezer import *
from my.core.stats import *
from my.core.query import *
from my.core.query_range import *
from my.core.serialize import test_serialize_fallback
from my.core.sqlite import *
from my.core.__main__ import *

View file

@ -1 +0,0 @@
test message
1 test message

View file

@ -1,106 +0,0 @@
import warnings
import json
from pathlib import Path
from datetime import datetime
from typing import NamedTuple, Iterator
from my.core.denylist import DenyList
class IP(NamedTuple):
addr: str
dt: datetime
def data() -> Iterator[IP]:
# random IP addresses
yield IP(addr="67.98.113.0", dt=datetime(2020, 1, 1))
yield IP(addr="59.40.113.87", dt=datetime(2020, 2, 1))
yield IP(addr="161.235.192.228", dt=datetime(2020, 3, 1))
yield IP(addr="165.243.139.87", dt=datetime(2020, 4, 1))
yield IP(addr="69.69.141.154", dt=datetime(2020, 5, 1))
yield IP(addr="50.72.224.80", dt=datetime(2020, 6, 1))
yield IP(addr="221.67.89.168", dt=datetime(2020, 7, 1))
yield IP(addr="177.113.119.251", dt=datetime(2020, 8, 1))
yield IP(addr="93.200.246.215", dt=datetime(2020, 9, 1))
yield IP(addr="127.105.171.61", dt=datetime(2020, 10, 1))
def test_denylist(tmp_path: Path) -> None:
tf = (tmp_path / "denylist.json").absolute()
with warnings.catch_warnings(record=True):
# create empty denylist (though file does not have to exist for denylist to work)
tf.write_text("[]")
d = DenyList(tf)
d.load()
assert dict(d._deny_map) == {}
assert d._deny_raw_list == []
assert list(d.filter(data())) == list(data())
# no data in denylist yet
assert len(d._deny_map) == 0
assert len(d._deny_raw_list) == 0
# add some data
d.deny(key="addr", value="67.98.113.0")
# write and reload to update _deny_map, _deny_raw_list
d.write()
d.load()
assert len(d._deny_map) == 1
assert len(d._deny_raw_list) == 1
assert d._deny_raw_list == [{"addr": "67.98.113.0"}]
filtered = list(d.filter(data()))
assert len(filtered) == 9
assert "67.98.113.0" not in [i.addr for i in filtered]
assert dict(d._deny_map) == {"addr": {"67.98.113.0"}}
denied = list(d.filter(data(), invert=True))
assert len(denied) == 1
assert denied[0] == IP(addr="67.98.113.0", dt=datetime(2020, 1, 1))
# add some non-JSON primitive data
d.deny(key="dt", value=datetime(2020, 2, 1))
# test internal behavior, _deny_raw_list should have been updated,
# but _deny_map doesn't get updated by a call to .deny
#
# if we change this just update the test, is just here to ensure
# this is the behaviour
assert len(d._deny_map) == 1
# write and load to update _deny_map
d.write()
d.load()
assert len(d._deny_map) == 2
assert len(d._deny_raw_list) == 2
assert d._deny_raw_list[-1] == {"dt": "2020-02-01T00:00:00"}
filtered = list(d.filter(data()))
assert len(filtered) == 8
assert "59.40.113.87" not in [i.addr for i in filtered]
with open(tf, "r") as f:
data_json = json.loads(f.read())
assert data_json == [
{
"addr": "67.98.113.0",
},
{
"dt": "2020-02-01T00:00:00",
},
]

View file

@ -1,127 +0,0 @@
from datetime import datetime
import lzma
from pathlib import Path
import sys
import zipfile
from my.core.kompress import kopen, kexists, CPath
import pytest
structure_data: Path = Path(__file__).parent / "structure_data"
def test_kopen(tmp_path: Path) -> None:
"Plaintext handled transparently"
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
"For zips behaviour is a bit different (not sure about all this, tbh...)"
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
# TODO here?
def test_kexists(tmp_path: Path) -> None:
# TODO also test top level?
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
# TODO not sure about this?
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
def test_cpath(tmp_path: Path) -> None:
CPath(str(tmp_path / 'file' )).read_text() == 'just plaintext'
CPath( tmp_path / 'file.xz').read_text() == 'compressed text'
# TODO not sure about zip files??
@pytest.fixture(autouse=True)
def prepare(tmp_path: Path):
(tmp_path / 'file').write_text('just plaintext')
with (tmp_path / 'file.xz').open('wb') as f:
with lzma.open(f, 'w') as lzf:
lzf.write(b'compressed text')
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
zf.writestr('path/in/archive', 'data in zip')
try:
yield None
finally:
pass
@pytest.mark.skipif(
sys.version_info[:2] < (3, 8),
reason=f"ZipFile.Path is only available since 3.8",
)
def test_zippath() -> None:
from my.core.kompress import ZipPath
target = structure_data / 'gdpr_export.zip'
assert target.exists(), target # precondition
zp = ZipPath(target)
# magic! convenient to make third party libraries agnostic of ZipPath
assert isinstance(zp, Path)
assert isinstance(zp, ZipPath)
assert isinstance(zp / 'subpath', Path)
# TODO maybe change __str__/__repr__? since it's a bit misleading:
# Path('/code/hpi/tests/core/structure_data/gdpr_export.zip', 'gdpr_export/')
assert ZipPath(target) == ZipPath(target)
assert zp.absolute() == zp
# shouldn't crash
hash(zp)
assert zp.exists()
assert (zp / 'gdpr_export' / 'comments').exists()
# check str constructor just in case
assert (ZipPath(str(target)) / 'gdpr_export' / 'comments').exists()
assert not (ZipPath(str(target)) / 'whatever').exists()
matched = list(zp.rglob('*'))
assert len(matched) > 0
assert all(p.filepath == target for p in matched), matched
rpaths = [p.relative_to(zp) for p in matched]
gdpr_export = Path('gdpr_export')
assert rpaths == [
gdpr_export,
gdpr_export / 'comments',
gdpr_export / 'comments' / 'comments.json',
gdpr_export / 'profile',
gdpr_export / 'profile' / 'settings.json',
gdpr_export / 'messages',
gdpr_export / 'messages' / 'index.csv',
], rpaths
# TODO hmm this doesn't work atm, whereas Path does
# not sure if it should be defensive or something...
# ZipPath('doesnotexist')
# same for this one
# assert ZipPath(Path('test'), 'whatever').absolute() == ZipPath(Path('test').absolute(), 'whatever')
assert (ZipPath(target) / 'gdpr_export' / 'comments').exists()
jsons = [p.relative_to(zp / 'gdpr_export') for p in zp.rglob('*.json')]
assert jsons == [
Path('comments','comments.json'),
Path('profile','settings.json'),
]
# NOTE: hmm interesting, seems that ZipPath is happy with forward slash regardless OS?
assert list(zp.rglob('mes*')) == [ZipPath(target, 'gdpr_export/messages')]
iterdir_res = list((zp / 'gdpr_export').iterdir())
assert len(iterdir_res) == 3
assert all(isinstance(p, Path) for p in iterdir_res)
# date recorded in the zip archive
assert (zp / 'gdpr_export' / 'comments' / 'comments.json').stat().st_mtime > 1625000000
# TODO ugh.
# unzip -l shows the date as 2021-07-01 09:43
# however, python reads it as 2021-07-01 01:43 ??
# don't really feel like dealing with this for now, it's not tz aware anyway

View file

@ -1 +0,0 @@
from my.core.pandas import *

View file

@ -1,45 +0,0 @@
import pytest
from pathlib import Path
from my.core.structure import match_structure
structure_data: Path = Path(__file__).parent / "structure_data"
gdpr_expected = ("comments", "messages/index.csv", "profile")
def test_gdpr_structure_exists() -> None:
with match_structure(structure_data, expected=gdpr_expected) as results:
assert results == (structure_data / "gdpr_subdirs" / "gdpr_export",)
def test_gdpr_unzip() -> None:
with match_structure(
structure_data / "gdpr_export.zip", expected=gdpr_expected
) as results:
assert len(results) == 1
extracted = results[0]
index_file = extracted / "messages" / "index.csv"
assert index_file.read_text().strip() == "test message"
# make sure the temporary directory this created no longer exists
assert not extracted.exists()
def test_match_partial() -> None:
# a partial match should match both the 'broken' and 'gdpr_export' directories
with match_structure(
structure_data / "gdpr_subdirs", expected=gdpr_expected, partial=True
) as results:
assert len(results) == 2
def test_not_directory() -> None:
with pytest.raises(NotADirectoryError, match=r"Expected either a zipfile or a directory"):
with match_structure(
structure_data / "messages/index.csv", expected=gdpr_expected
):
pass

View file

@ -1,164 +0,0 @@
import os
from pathlib import Path
from typing import TYPE_CHECKING
from my.core.compat import windows
from my.core.common import get_files
import pytest
# hack to replace all /tmp with 'real' tmp dir
# not ideal, but makes tests more concise
def _get_files(x, *args, **kwargs):
import my.core.common as C
def repl(x):
if isinstance(x, str):
return x.replace('/tmp', TMP)
elif isinstance(x, Path):
assert x.parts[:2] == (os.sep, 'tmp') # meh
return Path(TMP) / Path(*x.parts[2:])
else:
# iterable?
return [repl(i) for i in x]
x = repl(x)
res = C.get_files(x, *args, **kwargs)
return tuple(Path(str(i).replace(TMP, '/tmp')) for i in res) # hack back for asserts..
if not TYPE_CHECKING:
get_files = _get_files
def test_single_file() -> None:
'''
Regular file path is just returned as is.
'''
"Exception if it doesn't exist"
with pytest.raises(Exception):
get_files('/tmp/hpi_test/file.ext')
create('/tmp/hpi_test/file.ext')
'''
Couple of things:
1. Return type is a tuple, it's friendlier for hashing/caching
2. It always return pathlib.Path instead of plain strings
'''
assert get_files('/tmp/hpi_test/file.ext') == (
Path('/tmp/hpi_test/file.ext'),
)
"if the path starts with ~, we expand it"
if not windows: # windows dowsn't have bashrc.. ugh
assert get_files('~/.bashrc') == (
Path('~').expanduser() / '.bashrc',
)
def test_multiple_files() -> None:
'''
If you pass a directory/multiple directories, it flattens the contents
'''
create('/tmp/hpi_test/dir1/')
create('/tmp/hpi_test/dir1/zzz')
create('/tmp/hpi_test/dir1/yyy')
# create('/tmp/hpi_test/dir1/whatever/') # TODO not sure about this... should really allow extra dirs
create('/tmp/hpi_test/dir2/')
create('/tmp/hpi_test/dir2/mmm')
create('/tmp/hpi_test/dir2/nnn')
create('/tmp/hpi_test/dir3/')
create('/tmp/hpi_test/dir3/ttt')
assert get_files([
Path('/tmp/hpi_test/dir3'), # it takes in Path as well as str
'/tmp/hpi_test/dir1',
]) == (
# the paths are always returned in sorted order (unless you pass sort=False)
Path('/tmp/hpi_test/dir1/yyy'),
Path('/tmp/hpi_test/dir1/zzz'),
Path('/tmp/hpi_test/dir3/ttt'),
)
def test_explicit_glob() -> None:
'''
You can pass a glob to restrict the extensions
'''
create('/tmp/hpi_test/file_3.zip')
create('/tmp/hpi_test/file_2.zip')
create('/tmp/hpi_test/ignoreme')
create('/tmp/hpi_test/file.zip')
# todo walrus operator would be great here...
expected = (
Path('/tmp/hpi_test/file_2.zip'),
Path('/tmp/hpi_test/file_3.zip'),
)
assert get_files('/tmp/hpi_test', 'file_*.zip') == expected
"named argument should work too"
assert get_files('/tmp/hpi_test', glob='file_*.zip') == expected
def test_implicit_glob() -> None:
'''
Asterisc in the path results in globing too.
'''
# todo hopefully that makes sense? dunno why would anyone actually rely on asteriscs in names..
# this is very convenient in configs, so people don't have to use some special types
create('/tmp/hpi_test/123/')
create('/tmp/hpi_test/123/dummy')
create('/tmp/hpi_test/123/file.zip')
create('/tmp/hpi_test/456/')
create('/tmp/hpi_test/456/dummy')
create('/tmp/hpi_test/456/file.zip')
assert get_files(['/tmp/hpi_test/*/*.zip']) == (
Path('/tmp/hpi_test/123/file.zip'),
Path('/tmp/hpi_test/456/file.zip'),
)
def test_no_files() -> None:
'''
Test for empty matches. They work, but should result in warning
'''
assert get_files('') == ()
# todo test these for warnings?
assert get_files([]) == ()
assert get_files('bad*glob') == ()
# TODO not sure if should uniquify if the filenames end up same?
# TODO not sure about the symlinks? and hidden files?
import tempfile
TMP = tempfile.gettempdir()
test_path = Path(TMP) / 'hpi_test'
def setup():
teardown()
test_path.mkdir()
def teardown():
import shutil
if test_path.is_dir():
shutil.rmtree(test_path)
def create(f: str) -> None:
# in test body easier to use /tmp regardless the OS...
f = f.replace('/tmp', TMP)
if f.endswith('/'):
Path(f).mkdir()
else:
Path(f).touch()

View file

@ -1,66 +0,0 @@
from pathlib import Path
import shutil
import sqlite3
from tempfile import TemporaryDirectory
from my.core.sqlite import sqlite_connect_immutable, sqlite_copy_and_open
def test_sqlite_read_with_wal(tmp_path: Path) -> None:
db = tmp_path / 'db.sqlite'
# write a bit
with sqlite3.connect(str(db)) as conn:
conn.execute('CREATE TABLE testtable (col)')
for i in range(5):
conn.execute('INSERT INTO testtable (col) VALUES (?)', str(i))
# write more in WAL mode
with sqlite3.connect(str(db)) as conn_db:
conn.execute('PRAGMA journal_mode=wal;')
for i in range(5, 10):
conn_db.execute('INSERT INTO testtable (col) VALUES (?)', str(i))
conn_db.execute('COMMIT')
# make sure it has unflushed stuff in wal
wals = list(db.parent.glob('*-wal'))
assert len(wals) == 1
## now run the tests in separate process to ensure there is no potential for reusing sqlite connections or something
from concurrent.futures import ProcessPoolExecutor as Pool
with Pool(1) as pool:
# merely using it for ctx manager..
pool.submit(_test_do_copy , db).result()
pool.submit(_test_do_immutable , db).result()
pool.submit(_test_do_copy_and_open, db).result()
pool.submit(_test_open_asis , db).result()
def _test_do_copy(db: Path) -> None:
# from a copy without journal can only read previously committed stuff
with TemporaryDirectory() as tdir:
cdb = Path(tdir) / 'dbcopy.sqlite'
shutil.copy(db, cdb)
with sqlite3.connect(str(cdb)) as conn_copy:
assert len(list(conn_copy.execute('SELECT * FROM testtable'))) == 5
conn_copy.close()
def _test_do_immutable(db: Path) -> None:
# in readonly mode doesn't touch
with sqlite_connect_immutable(db) as conn_imm:
assert len(list(conn_imm.execute('SELECT * FROM testtable'))) == 5
conn_imm.close()
def _test_do_copy_and_open(db: Path) -> None:
with sqlite_copy_and_open(db) as conn_mem:
assert len(list(conn_mem.execute('SELECT * FROM testtable'))) == 10
conn_mem.close()
def _test_open_asis(db: Path) -> None:
# NOTE: this also works... but leaves some potential for DB corruption
with sqlite3.connect(str(db)) as conn_db_2:
assert len(list(conn_db_2.execute('SELECT * FROM testtable'))) == 10
conn_db_2.close()

View file

@ -1,33 +0,0 @@
from pathlib import Path
import tempfile
from my.core.cfg import tmp_config
import pytest
def _init_default_config() -> None:
import my.config
class default_config:
count = 5
my.config.simple = default_config # type: ignore[assignment,misc]
def test_tmp_config() -> None:
## ugh. ideally this would be on the top level (would be a better test)
## but pytest imports eveything first, executes hooks, and some reset_modules() fictures mess stuff up
## later would be nice to be a bit more careful about them
_init_default_config()
from my.simple import items
##
assert len(list(items())) == 5
class config:
class simple:
count = 3
with tmp_config(modules='my.simple', config=config):
assert len(list(items())) == 3
assert len(list(items())) == 5