Move pytest-bdd code into separate files

Now that all the tests are passing, this breaks them up into a few
different files to make everything more organized.

Note: Pyflakes is complaining about some unused import statements.
This commit is contained in:
Jonathan Wren 2021-06-19 16:39:30 -07:00
parent 54e5e96ad2
commit c5a7d7027c
6 changed files with 827 additions and 787 deletions

View file

@ -1,96 +1,18 @@
# Copyright (C) 2012-2021 jrnl contributors # Copyright (C) 2012-2021 jrnl contributors
# License: https://www.gnu.org/licenses/gpl-3.0.html # License: https://www.gnu.org/licenses/gpl-3.0.html
import json
import os
from datetime import datetime
from collections import defaultdict
from contextlib import ExitStack
from keyring import backend
from keyring import set_keyring
from keyring import errors
from pathlib import Path
import random
import string
import re
import shutil
import tempfile
import yaml
from unittest.mock import patch
from unittest.mock import MagicMock
from xml.etree import ElementTree
from pytest_bdd import given
from pytest_bdd import then
from pytest_bdd import when
from pytest_bdd.parsers import parse
from pytest_bdd import parsers
from pytest import fixture
from pytest import mark
import toml
from jrnl import __version__
from jrnl.cli import cli
from jrnl.config import load_config
from jrnl.config import scope_config
from jrnl.os_compat import split_args
from jrnl.os_compat import on_windows from jrnl.os_compat import on_windows
from jrnl.time import __get_pdt_calendar from pytest import mark
from .fixtures import *
from .given_steps import *
from .when_steps import *
from .then_steps import *
class TestKeyring(backend.KeyringBackend):
"""A test keyring that just stores its values in a hash"""
priority = 1
keys = defaultdict(dict)
def set_password(self, servicename, username, password):
self.keys[servicename][username] = password
def get_password(self, servicename, username):
return self.keys[servicename].get(username)
def delete_password(self, servicename, username):
self.keys[servicename][username] = None
class NoKeyring(backend.KeyringBackend):
"""A keyring that simulated an environment with no keyring backend."""
priority = 2
keys = defaultdict(dict)
def set_password(self, servicename, username, password):
raise errors.NoKeyringError
def get_password(self, servicename, username):
raise errors.NoKeyringError
def delete_password(self, servicename, username):
raise errors.NoKeyringError
class FailedKeyring(backend.KeyringBackend):
"""
A keyring that cannot be retrieved.
"""
priority = 2
def set_password(self, servicename, username, password):
raise errors.KeyringError
def get_password(self, servicename, username):
raise errors.KeyringError
def delete_password(self, servicename, username):
raise errors.KeyringError
# ----- MARKERS ----- #
def pytest_bdd_apply_tag(tag, function): def pytest_bdd_apply_tag(tag, function):
if tag == "skip_win": if tag == "skip_win":
marker = mark.skipif(on_windows, reason="Skip test on Windows") marker = mark.skipif(on_windows(), reason="Skip test on Windows")
elif tag == "skip_editor": elif tag == "skip_editor":
marker = mark.skip( marker = mark.skip(
reason="Skipping editor-related test. We should come back to this!" reason="Skipping editor-related test. We should come back to this!"
@ -101,705 +23,3 @@ def pytest_bdd_apply_tag(tag, function):
marker(function) marker(function)
return True return True
# ----- UTILS ----- #
def does_directory_contain_files(file_list, directory_path):
if not os.path.isdir(directory_path):
return False
for file in file_list.split("\n"):
fullpath = directory_path + "/" + file
if not os.path.isfile(fullpath):
return False
return True
# ----- FIXTURES ----- #
@fixture
def cli_run():
return {"status": 0, "stdout": None, "stderr": None}
@fixture
def mocks():
return dict()
@fixture
def temp_dir():
return tempfile.TemporaryDirectory()
@fixture
def working_dir(request):
return os.path.join(request.config.rootpath, "tests")
@fixture
def config_path(temp_dir):
os.chdir(temp_dir.name)
return temp_dir.name + "/jrnl.yaml"
@fixture
def toml_version(working_dir):
pyproject = os.path.join(working_dir, "..", "pyproject.toml")
pyproject_contents = toml.load(pyproject)
return pyproject_contents["tool"]["poetry"]["version"]
@fixture
def password():
return ""
@fixture
def input_method():
return ""
@fixture
def cache_dir():
return {"exists": False, "path": ""}
@fixture
def str_value():
return ""
@fixture
def command():
return ""
@fixture
def should_not():
return False
@fixture
def user_input():
return ""
@fixture
def keyring():
set_keyring(NoKeyring())
@fixture
def keyring_type():
return "default"
@fixture
def config_data(config_path):
return load_config(config_path)
@fixture
def journal_name():
return None
@fixture
def which_output_stream():
return None
@fixture
def editor_input():
return None
@fixture
def num_args():
return None
@fixture
def parsed_output():
return {"lang": None, "obj": None}
@fixture
def editor_state():
return {
"command": "",
"intent": {"method": "r", "input": None},
"tmpfile": {"name": None, "content": None},
}
@fixture
def editor(editor_state):
def _mock_editor(editor_command):
tmpfile = editor_command[-1]
editor_state["command"] = editor_command
editor_state["tmpfile"]["name"] = tmpfile
Path(tmpfile).touch()
with open(tmpfile, editor_state["intent"]["method"]) as f:
# Touch the file so jrnl knows it was edited
if editor_state["intent"]["input"] != None:
f.write(editor_state["intent"]["input"])
file_content = f.read()
editor_state["tmpfile"]["content"] = file_content
return _mock_editor
# ----- STEPS ----- #
@given(parse("we {editor_method} to the editor if opened\n{editor_input}"))
@given(parse("we {editor_method} nothing to the editor if opened"))
def we_enter_editor(editor_method, editor_input, editor_state):
file_method = editor_state["intent"]["method"]
if editor_method == "write":
file_method = "w+"
elif editor_method == "append":
file_method = "a+"
else:
assert False, f"Method '{editor_method}' not supported"
editor_state["intent"] = {"method": file_method, "input": editor_input}
@given(parse('now is "<date_str>"'))
@given(parse('now is "{date_str}"'))
def now_is_str(date_str, mocks):
class DatetimeMagicMock(MagicMock):
# needed because jrnl does some reflection on datetime
def __instancecheck__(self, subclass):
return isinstance(subclass, datetime)
def mocked_now(tz=None):
now = datetime.strptime(date_str, "%Y-%m-%d %I:%M:%S %p")
if tz:
time_zone = datetime.utcnow().astimezone().tzinfo
now = now.replace(tzinfo=time_zone)
return now
# jrnl uses two different classes to parse dates, so both must be mocked
datetime_mock = DatetimeMagicMock(wraps=datetime)
datetime_mock.now.side_effect = mocked_now
pdt = __get_pdt_calendar()
calendar_mock = MagicMock(wraps=pdt)
calendar_mock.parse.side_effect = lambda date_str_input: pdt.parse(
date_str_input, mocked_now()
)
mocks["datetime"] = patch("datetime.datetime", new=datetime_mock)
mocks["calendar_parse"] = patch(
"jrnl.time.__get_pdt_calendar", return_value=calendar_mock
)
@then(parse("the editor should have been called"))
@then(parse("the editor should have been called with {num_args} arguments"))
def count_editor_args(num_args, cli_run, editor_state):
assert cli_run["mocks"]["editor"].called
if isinstance(num_args, int):
assert len(editor_state["command"]) == int(num_args)
@then(parse('the editor filename should end with "{suffix}"'))
def editor_filename_suffix(suffix, editor_state):
editor_filename = editor_state["tmpfile"]["name"]
assert editor_state["tmpfile"]["name"].endswith(suffix), (editor_filename, suffix)
@then(parse('the editor file content should {comparison} "{str_value}"'))
@then(parse("the editor file content should {comparison} empty"))
@then(parse("the editor file content should {comparison}\n{str_value}"))
def contains_editor_file(comparison, str_value, editor_state):
content = editor_state["tmpfile"]["content"]
# content = f'\n"""\n{content}\n"""\n'
if comparison == "be":
assert content == str_value
elif comparison == "contain":
assert str_value in content
else:
assert False, f"Comparison '{comparison}' not supported"
@given("we have a keyring", target_fixture="keyring")
@given(parse("we have a {keyring_type} keyring"), target_fixture="keyring")
def we_have_type_of_keyring(keyring_type):
if keyring_type == "failed":
set_keyring(FailedKeyring())
else:
set_keyring(TestKeyring())
@given(parse('we use the config "{config_file}"'), target_fixture="config_path")
@given('we use the config "<config_file>"', target_fixture="config_path")
def we_use_the_config(config_file, temp_dir, working_dir):
# Move into temp dir as cwd
os.chdir(temp_dir.name)
# Copy the config file over
config_source = os.path.join(working_dir, "data", "configs", config_file)
config_dest = os.path.join(temp_dir.name, config_file)
shutil.copy2(config_source, config_dest)
# @todo make this only copy some journals over
# Copy all of the journals over
journal_source = os.path.join(working_dir, "data", "journals")
journal_dest = os.path.join(temp_dir.name, "features", "journals")
shutil.copytree(journal_source, journal_dest)
# @todo get rid of this by using default config values
# merge in version number
if config_file.endswith("yaml") and os.path.exists(config_dest):
# Add jrnl version to file for 2.x journals
with open(config_dest, "a") as cf:
cf.write("version: {}".format(__version__))
return config_dest
@given(parse('we use the password "{pw}" if prompted'), target_fixture="password")
def use_password_forever(pw):
return pw
@when(parse('we run "jrnl {command}" and {input_method}\n{user_input}'))
@when(
parsers.re(
'we run "jrnl (?P<command>[^"]+)" and (?P<input_method>enter|pipe) "(?P<user_input>[^"]+)"'
)
)
@when(parse('we run "jrnl" and {input_method} "{user_input}"'))
@when(parse('we run "jrnl {command}"'))
@when('we run "jrnl <command>"')
@when('we run "jrnl"')
def we_run(
command,
config_path,
user_input,
cli_run,
capsys,
password,
cache_dir,
editor,
keyring,
input_method,
mocks,
):
assert input_method in ["", "enter", "pipe"]
is_tty = input_method != "pipe"
if cache_dir["exists"]:
command = command.format(cache_dir=cache_dir["path"])
args = split_args(command)
status = 0
if user_input:
user_input = user_input.splitlines() if is_tty else [user_input]
if password:
password = password.splitlines()
if not password and user_input:
password = user_input
with ExitStack() as stack:
stack.enter_context(patch("sys.argv", ["jrnl"] + args))
mock_stdin = stack.enter_context(
patch("sys.stdin.read", side_effect=user_input)
)
stack.enter_context(patch("sys.stdin.isatty", return_value=is_tty))
mock_input = stack.enter_context(
patch("builtins.input", side_effect=user_input)
)
mock_getpass = stack.enter_context(
patch("getpass.getpass", side_effect=password)
)
if "datetime" in mocks:
stack.enter_context(mocks["datetime"])
stack.enter_context(mocks["calendar_parse"])
# stack.enter_context(patch("datetime.datetime", new=mocks["datetime"]))
# stack.enter_context(patch("jrnl.time.__get_pdt_calendar", return_value=mocks["calendar_parse"]))
stack.enter_context(
patch("jrnl.install.get_config_path", return_value=config_path)
)
stack.enter_context(
patch("jrnl.config.get_config_path", return_value=config_path)
)
mock_editor = stack.enter_context(patch("subprocess.call", side_effect=editor))
try:
cli(args)
except StopIteration:
# This happens when input is expected, but don't have any input left
pass
except SystemExit as e:
status = e.code
captured = capsys.readouterr()
cli_run["status"] = status
cli_run["stdout"] = captured.out
cli_run["stderr"] = captured.err
cli_run["mocks"] = {
"stdin": mock_stdin,
"input": mock_input,
"getpass": mock_getpass,
"editor": mock_editor,
}
@then("we should get no error")
def should_get_no_error(cli_run):
assert cli_run["status"] == 0, cli_run["status"]
@then(parse('the output should match "{regex}"'))
def output_should_match(regex, cli_run):
out = cli_run["stdout"]
matches = re.findall(regex, out)
assert matches, f"\nRegex didn't match:\n{regex}\n{str(out)}\n{str(matches)}"
@then(parse("the output should contain\n{expected_output}"))
@then(parse('the output should contain "{expected_output}"'))
@then('the output should contain "<expected_output>"')
@then(parse("the {which_output_stream} output should contain\n{expected_output}"))
@then(parse('the {which_output_stream} output should contain "{expected_output}"'))
def output_should_contain(expected_output, which_output_stream, cli_run):
assert expected_output
if which_output_stream is None:
assert (expected_output in cli_run["stdout"]) or (
expected_output in cli_run["stderr"]
)
elif which_output_stream == "standard":
assert expected_output in cli_run["stdout"]
elif which_output_stream == "error":
assert expected_output in cli_run["stderr"]
else:
assert expected_output in cli_run[which_output_stream]
@then(parse("the output should not contain\n{expected_output}"))
@then(parse('the output should not contain "{expected_output}"'))
@then('the output should not contain "<expected_output>"')
def output_should_not_contain(expected_output, cli_run):
assert expected_output not in cli_run["stdout"]
@then(parse("the output should be\n{expected_output}"))
@then(parse('the output should be "{expected_output}"'))
@then('the output should be "<expected_output>"')
def output_should_be(expected_output, cli_run):
actual = cli_run["stdout"].strip()
expected = expected_output.strip()
assert expected == actual
@then("the output should be empty")
def output_should_be_empty(cli_run):
actual = cli_run["stdout"].strip()
assert actual == ""
@then('the output should contain the date "<date>"')
def output_should_contain_date(date, cli_run):
assert date and date in cli_run["stdout"]
@then("the output should contain pyproject.toml version")
def output_should_contain_version(cli_run, toml_version):
out = cli_run["stdout"]
assert toml_version in out, toml_version
@then(parse('we should see the message "{text}"'))
def should_see_the_message(text, cli_run):
out = cli_run["stderr"]
assert text in out, [text, out]
def parse_should_or_should_not(should_or_should_not):
if should_or_should_not == "should":
return True
elif should_or_should_not == "should not":
return False
else:
raise Exception(
"should_or_should_not valid values are 'should' or 'should not'"
)
@then(
parse(
'the config for journal "{journal_name}" {should_or_should_not} contain "{some_yaml}"'
)
)
@then(
parse(
'the config for journal "{journal_name}" {should_or_should_not} contain\n{some_yaml}'
)
)
@then(parse('the config {should_or_should_not} contain "{some_yaml}"'))
@then(parse("the config {should_or_should_not} contain\n{some_yaml}"))
def config_var(config_data, journal_name, should_or_should_not, some_yaml):
we_should = parse_should_or_should_not(should_or_should_not)
actual = config_data
if journal_name:
actual = actual["journals"][journal_name]
expected = yaml.load(some_yaml, Loader=yaml.FullLoader)
actual_slice = actual
if type(actual) is dict:
actual_slice = {key: actual.get(key, None) for key in expected.keys()}
if we_should:
assert expected == actual_slice
else:
assert expected != actual_slice
@then("we should be prompted for a password")
def password_was_called(cli_run):
assert cli_run["mocks"]["getpass"].called
@then("we should not be prompted for a password")
def password_was_not_called(cli_run):
assert not cli_run["mocks"]["getpass"].called
@then(parse("the cache directory should contain the files\n{file_list}"))
def assert_dir_contains_files(file_list, cache_dir):
assert does_directory_contain_files(file_list, cache_dir["path"])
@then(parse("the journal directory should contain\n{file_list}"))
def journal_directory_should_contain(config_data, file_list):
scoped_config = scope_config(config_data, "default")
assert does_directory_contain_files(file_list, scoped_config["journal"])
@then(parse('journal "{journal_name}" should not exist'))
def journal_directory_should_not_exist(config_data, journal_name):
scoped_config = scope_config(config_data, journal_name)
assert not does_directory_contain_files(
scoped_config["journal"], "."
), f'Journal "{journal_name}" does exist'
@when(parse('we change directory to "{directory_name}"'))
def when_we_change_directory(directory_name):
if not os.path.isdir(directory_name):
os.mkdir(directory_name)
os.chdir(directory_name)
@then(parse("the journal {should_or_should_not} exist"))
def journal_should_not_exist(config_data, should_or_should_not):
scoped_config = scope_config(config_data, "default")
expected_path = scoped_config["journal"]
contains_files = does_directory_contain_files(expected_path, ".")
if should_or_should_not == "should":
assert contains_files
elif should_or_should_not == "should not":
assert not contains_files
else:
raise Exception(
"should_or_should_not valid values are 'should' or 'should not'"
)
@given("we create a cache directory", target_fixture="cache_dir")
def create_cache_dir(temp_dir):
random_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=20))
dir_path = os.path.join(temp_dir.name, "cache_" + random_str)
os.mkdir(dir_path)
return {"exists": True, "path": dir_path}
@then(parse('the content of file "{file_path}" in the cache should be\n{file_content}'))
def content_of_file_should_be(file_path, file_content, cache_dir):
assert cache_dir["exists"]
expected_content = file_content.strip().splitlines()
with open(os.path.join(cache_dir["path"], file_path), "r") as f:
actual_content = f.read().strip().splitlines()
for actual_line, expected_line in zip(actual_content, expected_content):
if actual_line.startswith("tags: ") and expected_line.startswith("tags: "):
assert_equal_tags_ignoring_order(
actual_line, expected_line, actual_content, expected_content
)
else:
assert actual_line.strip() == expected_line.strip(), [
[actual_line.strip(), expected_line.strip()],
[actual_content, expected_content],
]
def assert_equal_tags_ignoring_order(
actual_line, expected_line, actual_content, expected_content
):
actual_tags = set(tag.strip() for tag in actual_line[len("tags: ") :].split(","))
expected_tags = set(
tag.strip() for tag in expected_line[len("tags: ") :].split(",")
)
assert actual_tags == expected_tags, [
[actual_tags, expected_tags],
[expected_content, actual_content],
]
@then(parse("the cache should contain the files\n{file_list}"))
def cache_dir_contains_files(file_list, cache_dir):
assert cache_dir["exists"]
actual_files = os.listdir(cache_dir["path"])
expected_files = file_list.split("\n")
# sort to deal with inconsistent default file ordering on different OS's
actual_files.sort()
expected_files.sort()
assert actual_files == expected_files, [actual_files, expected_files]
@then(parse("the output should be valid {language_name}"))
def assert_output_is_valid_language(cli_run, language_name):
language_name = language_name.upper()
if language_name == "XML":
xml_tree = ElementTree.fromstring(cli_run["stdout"])
assert xml_tree, "Invalid XML"
elif language_name == "JSON":
assert json.loads(cli_run["stdout"]), "Invalid JSON"
else:
assert False, f"Language name {language_name} not recognized"
@given(parse("we parse the output as {language_name}"), target_fixture="parsed_output")
def parse_output_as_language(cli_run, language_name):
language_name = language_name.upper()
actual_output = cli_run["stdout"]
if language_name == "XML":
parsed_output = ElementTree.fromstring(actual_output)
elif language_name == "JSON":
parsed_output = json.loads(actual_output)
else:
assert False, f"Language name {language_name} not recognized"
return {"lang": language_name, "obj": parsed_output}
@then(parse('"{node_name}" in the parsed output should have {number:d} elements'))
def assert_parsed_output_item_count(node_name, number, parsed_output):
lang = parsed_output["lang"]
obj = parsed_output["obj"]
if lang == "XML":
xml_node_names = (node.tag for node in obj)
assert node_name in xml_node_names, str(list(xml_node_names))
actual_entry_count = len(obj.find(node_name))
assert actual_entry_count == number, actual_entry_count
elif lang == "JSON":
my_obj = obj
for node in node_name.split("."):
try:
my_obj = my_obj[int(node)]
except ValueError:
assert node in my_obj
my_obj = my_obj[node]
assert len(my_obj) == number, len(my_obj)
else:
assert False, f"Language name {lang} not recognized"
@then(parse('"{field_name}" in the parsed output should {comparison}\n{expected_keys}'))
def assert_output_field_content(field_name, comparison, expected_keys, parsed_output):
lang = parsed_output["lang"]
obj = parsed_output["obj"]
expected_keys = expected_keys.split("\n")
if len(expected_keys) == 1:
expected_keys = expected_keys[0]
if lang == "XML":
xml_node_names = (node.tag for node in obj)
assert field_name in xml_node_names, str(list(xml_node_names))
if field_name == "tags":
actual_tags = set(t.attrib["name"] for t in obj.find("tags"))
assert set(actual_tags) == set(expected_keys), [
actual_tags,
set(expected_keys),
]
else:
assert False, "This test only works for tags in XML"
elif lang == "JSON":
my_obj = obj
for node in field_name.split("."):
try:
my_obj = my_obj[int(node)]
except ValueError:
assert node in my_obj, [my_obj.keys(), node]
my_obj = my_obj[node]
if comparison == "be":
if type(my_obj) is str:
assert expected_keys == my_obj, [my_obj, expected_keys]
else:
assert set(expected_keys) == set(my_obj), [
set(my_obj),
set(expected_keys),
]
elif comparison == "contain":
if type(my_obj) is str:
assert expected_keys in my_obj, [my_obj, expected_keys]
else:
assert all(elem in my_obj for elem in expected_keys), [
my_obj,
expected_keys,
]
else:
assert False, f"Language name {lang} not recognized"
@then(parse('there should be {number:d} "{item}" elements'))
def count_elements(number, item, cli_run):
actual_output = cli_run["stdout"]
xml_tree = ElementTree.fromstring(actual_output)
assert len(xml_tree.findall(".//" + item)) == number

202
tests/step_defs/fixtures.py Normal file
View file

@ -0,0 +1,202 @@
# Copyright (C) 2012-2021 jrnl contributors
# License: https://www.gnu.org/licenses/gpl-3.0.html
import os
from collections import defaultdict
from keyring import backend
from keyring import set_keyring
from keyring import errors
from pathlib import Path
import tempfile
from pytest import fixture
import toml
from jrnl.config import load_config
# --- Keyring --- #
@fixture
def keyring():
set_keyring(NoKeyring())
@fixture
def keyring_type():
return "default"
class TestKeyring(backend.KeyringBackend):
"""A test keyring that just stores its values in a hash"""
priority = 1
keys = defaultdict(dict)
def set_password(self, servicename, username, password):
self.keys[servicename][username] = password
def get_password(self, servicename, username):
return self.keys[servicename].get(username)
def delete_password(self, servicename, username):
self.keys[servicename][username] = None
class NoKeyring(backend.KeyringBackend):
"""A keyring that simulated an environment with no keyring backend."""
priority = 2
keys = defaultdict(dict)
def set_password(self, servicename, username, password):
raise errors.NoKeyringError
def get_password(self, servicename, username):
raise errors.NoKeyringError
def delete_password(self, servicename, username):
raise errors.NoKeyringError
class FailedKeyring(backend.KeyringBackend):
""" A keyring that cannot be retrieved. """
priority = 2
def set_password(self, servicename, username, password):
raise errors.KeyringError
def get_password(self, servicename, username):
raise errors.KeyringError
def delete_password(self, servicename, username):
raise errors.KeyringError
# ----- Misc ----- #
@fixture
def cli_run():
return {"status": 0, "stdout": None, "stderr": None}
@fixture
def mocks():
return dict()
@fixture
def temp_dir():
return tempfile.TemporaryDirectory()
@fixture
def working_dir(request):
return os.path.join(request.config.rootpath, "tests")
@fixture
def config_path(temp_dir):
os.chdir(temp_dir.name)
return temp_dir.name + "/jrnl.yaml"
@fixture
def toml_version(working_dir):
pyproject = os.path.join(working_dir, "..", "pyproject.toml")
pyproject_contents = toml.load(pyproject)
return pyproject_contents["tool"]["poetry"]["version"]
@fixture
def password():
return ""
@fixture
def input_method():
return ""
@fixture
def cache_dir():
return {"exists": False, "path": ""}
@fixture
def str_value():
return ""
@fixture
def command():
return ""
@fixture
def should_not():
return False
@fixture
def user_input():
return ""
@fixture
def config_data(config_path):
return load_config(config_path)
@fixture
def journal_name():
return None
@fixture
def which_output_stream():
return None
@fixture
def editor_input():
return None
@fixture
def num_args():
return None
@fixture
def parsed_output():
return {"lang": None, "obj": None}
@fixture
def editor_state():
return {
"command": "",
"intent": {"method": "r", "input": None},
"tmpfile": {"name": None, "content": None},
}
@fixture
def editor(editor_state):
def _mock_editor(editor_command):
tmpfile = editor_command[-1]
editor_state["command"] = editor_command
editor_state["tmpfile"]["name"] = tmpfile
Path(tmpfile).touch()
with open(tmpfile, editor_state["intent"]["method"]) as f:
# Touch the file so jrnl knows it was edited
if editor_state["intent"]["input"] != None:
f.write(editor_state["intent"]["input"])
file_content = f.read()
editor_state["tmpfile"]["content"] = file_content
return _mock_editor

View file

@ -0,0 +1,133 @@
# Copyright (C) 2012-2021 jrnl contributors
# License: https://www.gnu.org/licenses/gpl-3.0.html
import json
import os
from datetime import datetime
from keyring import set_keyring
import random
import string
import shutil
from unittest.mock import patch
from unittest.mock import MagicMock
from xml.etree import ElementTree
from jrnl import __version__
from jrnl.time import __get_pdt_calendar
from pytest_bdd import given
from pytest_bdd.parsers import parse
from .fixtures import FailedKeyring
from .fixtures import TestKeyring
@given(parse("we {editor_method} to the editor if opened\n{editor_input}"))
@given(parse("we {editor_method} nothing to the editor if opened"))
def we_enter_editor(editor_method, editor_input, editor_state):
file_method = editor_state["intent"]["method"]
if editor_method == "write":
file_method = "w+"
elif editor_method == "append":
file_method = "a+"
else:
assert False, f"Method '{editor_method}' not supported"
editor_state["intent"] = {"method": file_method, "input": editor_input}
@given(parse('now is "<date_str>"'))
@given(parse('now is "{date_str}"'))
def now_is_str(date_str, mocks):
class DatetimeMagicMock(MagicMock):
# needed because jrnl does some reflection on datetime
def __instancecheck__(self, subclass):
return isinstance(subclass, datetime)
def mocked_now(tz=None):
now = datetime.strptime(date_str, "%Y-%m-%d %I:%M:%S %p")
if tz:
time_zone = datetime.utcnow().astimezone().tzinfo
now = now.replace(tzinfo=time_zone)
return now
# jrnl uses two different classes to parse dates, so both must be mocked
datetime_mock = DatetimeMagicMock(wraps=datetime)
datetime_mock.now.side_effect = mocked_now
pdt = __get_pdt_calendar()
calendar_mock = MagicMock(wraps=pdt)
calendar_mock.parse.side_effect = lambda date_str_input: pdt.parse(
date_str_input, mocked_now()
)
mocks["datetime"] = patch("datetime.datetime", new=datetime_mock)
mocks["calendar_parse"] = patch(
"jrnl.time.__get_pdt_calendar", return_value=calendar_mock
)
@given("we have a keyring", target_fixture="keyring")
@given(parse("we have a {keyring_type} keyring"), target_fixture="keyring")
def we_have_type_of_keyring(keyring_type):
if keyring_type == "failed":
set_keyring(FailedKeyring())
else:
set_keyring(TestKeyring())
@given(parse('we use the config "{config_file}"'), target_fixture="config_path")
@given('we use the config "<config_file>"', target_fixture="config_path")
def we_use_the_config(config_file, temp_dir, working_dir):
# Move into temp dir as cwd
os.chdir(temp_dir.name)
# Copy the config file over
config_source = os.path.join(working_dir, "data", "configs", config_file)
config_dest = os.path.join(temp_dir.name, config_file)
shutil.copy2(config_source, config_dest)
# @todo make this only copy some journals over
# Copy all of the journals over
journal_source = os.path.join(working_dir, "data", "journals")
journal_dest = os.path.join(temp_dir.name, "features", "journals")
shutil.copytree(journal_source, journal_dest)
# @todo get rid of this by using default config values
# merge in version number
if config_file.endswith("yaml") and os.path.exists(config_dest):
# Add jrnl version to file for 2.x journals
with open(config_dest, "a") as cf:
cf.write("version: {}".format(__version__))
return config_dest
@given(parse('we use the password "{pw}" if prompted'), target_fixture="password")
def use_password_forever(pw):
return pw
@given("we create a cache directory", target_fixture="cache_dir")
def create_cache_dir(temp_dir):
random_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=20))
dir_path = os.path.join(temp_dir.name, "cache_" + random_str)
os.mkdir(dir_path)
return {"exists": True, "path": dir_path}
@given(parse("we parse the output as {language_name}"), target_fixture="parsed_output")
def parse_output_as_language(cli_run, language_name):
language_name = language_name.upper()
actual_output = cli_run["stdout"]
if language_name == "XML":
parsed_output = ElementTree.fromstring(actual_output)
elif language_name == "JSON":
parsed_output = json.loads(actual_output)
else:
assert False, f"Language name {language_name} not recognized"
return {"lang": language_name, "obj": parsed_output}

View file

@ -0,0 +1,41 @@
# Copyright (C) 2012-2021 jrnl contributors
# License: https://www.gnu.org/licenses/gpl-3.0.html
import os
def does_directory_contain_files(file_list, directory_path):
if not os.path.isdir(directory_path):
return False
for file in file_list.split("\n"):
fullpath = directory_path + "/" + file
if not os.path.isfile(fullpath):
return False
return True
def parse_should_or_should_not(should_or_should_not):
if should_or_should_not == "should":
return True
elif should_or_should_not == "should not":
return False
else:
raise Exception(
"should_or_should_not valid values are 'should' or 'should not'"
)
def assert_equal_tags_ignoring_order(
actual_line, expected_line, actual_content, expected_content
):
actual_tags = set(tag.strip() for tag in actual_line[len("tags: ") :].split(","))
expected_tags = set(
tag.strip() for tag in expected_line[len("tags: ") :].split(",")
)
assert actual_tags == expected_tags, [
[actual_tags, expected_tags],
[expected_content, actual_content],
]

View file

@ -0,0 +1,331 @@
# Copyright (C) 2012-2021 jrnl contributors
# License: https://www.gnu.org/licenses/gpl-3.0.html
import json
import os
import re
import yaml
from xml.etree import ElementTree
from pytest_bdd import then
from pytest_bdd.parsers import parse
from jrnl.config import scope_config
from .helpers import parse_should_or_should_not
from .helpers import does_directory_contain_files
from .helpers import assert_equal_tags_ignoring_order
@then("we should get no error")
def should_get_no_error(cli_run):
assert cli_run["status"] == 0, cli_run["status"]
@then(parse('the output should match "{regex}"'))
def output_should_match(regex, cli_run):
out = cli_run["stdout"]
matches = re.findall(regex, out)
assert matches, f"\nRegex didn't match:\n{regex}\n{str(out)}\n{str(matches)}"
@then(parse("the output should contain\n{expected_output}"))
@then(parse('the output should contain "{expected_output}"'))
@then('the output should contain "<expected_output>"')
@then(parse("the {which_output_stream} output should contain\n{expected_output}"))
@then(parse('the {which_output_stream} output should contain "{expected_output}"'))
def output_should_contain(expected_output, which_output_stream, cli_run):
assert expected_output
if which_output_stream is None:
assert (expected_output in cli_run["stdout"]) or (
expected_output in cli_run["stderr"]
)
elif which_output_stream == "standard":
assert expected_output in cli_run["stdout"]
elif which_output_stream == "error":
assert expected_output in cli_run["stderr"]
else:
assert expected_output in cli_run[which_output_stream]
@then(parse("the output should not contain\n{expected_output}"))
@then(parse('the output should not contain "{expected_output}"'))
@then('the output should not contain "<expected_output>"')
def output_should_not_contain(expected_output, cli_run):
assert expected_output not in cli_run["stdout"]
@then(parse("the output should be\n{expected_output}"))
@then(parse('the output should be "{expected_output}"'))
@then('the output should be "<expected_output>"')
def output_should_be(expected_output, cli_run):
actual = cli_run["stdout"].strip()
expected = expected_output.strip()
assert expected == actual
@then("the output should be empty")
def output_should_be_empty(cli_run):
actual = cli_run["stdout"].strip()
assert actual == ""
@then('the output should contain the date "<date>"')
def output_should_contain_date(date, cli_run):
assert date and date in cli_run["stdout"]
@then("the output should contain pyproject.toml version")
def output_should_contain_version(cli_run, toml_version):
out = cli_run["stdout"]
assert toml_version in out, toml_version
@then(parse('we should see the message "{text}"'))
def should_see_the_message(text, cli_run):
out = cli_run["stderr"]
assert text in out, [text, out]
@then(
parse(
'the config for journal "{journal_name}" {should_or_should_not} contain "{some_yaml}"'
)
)
@then(
parse(
'the config for journal "{journal_name}" {should_or_should_not} contain\n{some_yaml}'
)
)
@then(parse('the config {should_or_should_not} contain "{some_yaml}"'))
@then(parse("the config {should_or_should_not} contain\n{some_yaml}"))
def config_var(config_data, journal_name, should_or_should_not, some_yaml):
we_should = parse_should_or_should_not(should_or_should_not)
actual = config_data
if journal_name:
actual = actual["journals"][journal_name]
expected = yaml.load(some_yaml, Loader=yaml.FullLoader)
actual_slice = actual
if type(actual) is dict:
actual_slice = {key: actual.get(key, None) for key in expected.keys()}
if we_should:
assert expected == actual_slice
else:
assert expected != actual_slice
@then("we should be prompted for a password")
def password_was_called(cli_run):
assert cli_run["mocks"]["getpass"].called
@then("we should not be prompted for a password")
def password_was_not_called(cli_run):
assert not cli_run["mocks"]["getpass"].called
@then(parse("the cache directory should contain the files\n{file_list}"))
def assert_dir_contains_files(file_list, cache_dir):
assert does_directory_contain_files(file_list, cache_dir["path"])
@then(parse("the journal directory should contain\n{file_list}"))
def journal_directory_should_contain(config_data, file_list):
scoped_config = scope_config(config_data, "default")
assert does_directory_contain_files(file_list, scoped_config["journal"])
@then(parse('journal "{journal_name}" should not exist'))
def journal_directory_should_not_exist(config_data, journal_name):
scoped_config = scope_config(config_data, journal_name)
assert not does_directory_contain_files(
scoped_config["journal"], "."
), f'Journal "{journal_name}" does exist'
@then(parse("the journal {should_or_should_not} exist"))
def journal_should_not_exist(config_data, should_or_should_not):
scoped_config = scope_config(config_data, "default")
expected_path = scoped_config["journal"]
contains_files = does_directory_contain_files(expected_path, ".")
if should_or_should_not == "should":
assert contains_files
elif should_or_should_not == "should not":
assert not contains_files
else:
raise Exception(
"should_or_should_not valid values are 'should' or 'should not'"
)
@then(parse('the content of file "{file_path}" in the cache should be\n{file_content}'))
def content_of_file_should_be(file_path, file_content, cache_dir):
assert cache_dir["exists"]
expected_content = file_content.strip().splitlines()
with open(os.path.join(cache_dir["path"], file_path), "r") as f:
actual_content = f.read().strip().splitlines()
for actual_line, expected_line in zip(actual_content, expected_content):
if actual_line.startswith("tags: ") and expected_line.startswith("tags: "):
assert_equal_tags_ignoring_order(
actual_line, expected_line, actual_content, expected_content
)
else:
assert actual_line.strip() == expected_line.strip(), [
[actual_line.strip(), expected_line.strip()],
[actual_content, expected_content],
]
@then(parse("the cache should contain the files\n{file_list}"))
def cache_dir_contains_files(file_list, cache_dir):
assert cache_dir["exists"]
actual_files = os.listdir(cache_dir["path"])
expected_files = file_list.split("\n")
# sort to deal with inconsistent default file ordering on different OS's
actual_files.sort()
expected_files.sort()
assert actual_files == expected_files, [actual_files, expected_files]
@then(parse("the output should be valid {language_name}"))
def assert_output_is_valid_language(cli_run, language_name):
language_name = language_name.upper()
if language_name == "XML":
xml_tree = ElementTree.fromstring(cli_run["stdout"])
assert xml_tree, "Invalid XML"
elif language_name == "JSON":
assert json.loads(cli_run["stdout"]), "Invalid JSON"
else:
assert False, f"Language name {language_name} not recognized"
@then(parse('"{node_name}" in the parsed output should have {number:d} elements'))
def assert_parsed_output_item_count(node_name, number, parsed_output):
lang = parsed_output["lang"]
obj = parsed_output["obj"]
if lang == "XML":
xml_node_names = (node.tag for node in obj)
assert node_name in xml_node_names, str(list(xml_node_names))
actual_entry_count = len(obj.find(node_name))
assert actual_entry_count == number, actual_entry_count
elif lang == "JSON":
my_obj = obj
for node in node_name.split("."):
try:
my_obj = my_obj[int(node)]
except ValueError:
assert node in my_obj
my_obj = my_obj[node]
assert len(my_obj) == number, len(my_obj)
else:
assert False, f"Language name {lang} not recognized"
@then(parse('"{field_name}" in the parsed output should {comparison}\n{expected_keys}'))
def assert_output_field_content(field_name, comparison, expected_keys, parsed_output):
lang = parsed_output["lang"]
obj = parsed_output["obj"]
expected_keys = expected_keys.split("\n")
if len(expected_keys) == 1:
expected_keys = expected_keys[0]
if lang == "XML":
xml_node_names = (node.tag for node in obj)
assert field_name in xml_node_names, str(list(xml_node_names))
if field_name == "tags":
actual_tags = set(t.attrib["name"] for t in obj.find("tags"))
assert set(actual_tags) == set(expected_keys), [
actual_tags,
set(expected_keys),
]
else:
assert False, "This test only works for tags in XML"
elif lang == "JSON":
my_obj = obj
for node in field_name.split("."):
try:
my_obj = my_obj[int(node)]
except ValueError:
assert node in my_obj, [my_obj.keys(), node]
my_obj = my_obj[node]
if comparison == "be":
if type(my_obj) is str:
assert expected_keys == my_obj, [my_obj, expected_keys]
else:
assert set(expected_keys) == set(my_obj), [
set(my_obj),
set(expected_keys),
]
elif comparison == "contain":
if type(my_obj) is str:
assert expected_keys in my_obj, [my_obj, expected_keys]
else:
assert all(elem in my_obj for elem in expected_keys), [
my_obj,
expected_keys,
]
else:
assert False, f"Language name {lang} not recognized"
@then(parse('there should be {number:d} "{item}" elements'))
def count_elements(number, item, cli_run):
actual_output = cli_run["stdout"]
xml_tree = ElementTree.fromstring(actual_output)
assert len(xml_tree.findall(".//" + item)) == number
@then(parse("the editor should have been called"))
@then(parse("the editor should have been called with {num_args} arguments"))
def count_editor_args(num_args, cli_run, editor_state):
assert cli_run["mocks"]["editor"].called
if isinstance(num_args, int):
assert len(editor_state["command"]) == int(num_args)
@then(parse('the editor filename should end with "{suffix}"'))
def editor_filename_suffix(suffix, editor_state):
editor_filename = editor_state["tmpfile"]["name"]
assert editor_state["tmpfile"]["name"].endswith(suffix), (editor_filename, suffix)
@then(parse('the editor file content should {comparison} "{str_value}"'))
@then(parse("the editor file content should {comparison} empty"))
@then(parse("the editor file content should {comparison}\n{str_value}"))
def contains_editor_file(comparison, str_value, editor_state):
content = editor_state["tmpfile"]["content"]
# content = f'\n"""\n{content}\n"""\n'
if comparison == "be":
assert content == str_value
elif comparison == "contain":
assert str_value in content
else:
assert False, f"Comparison '{comparison}' not supported"

View file

@ -0,0 +1,113 @@
# Copyright (C) 2012-2021 jrnl contributors
# License: https://www.gnu.org/licenses/gpl-3.0.html
import os
from contextlib import ExitStack
from unittest.mock import patch
from pytest_bdd import when
from pytest_bdd.parsers import parse
from pytest_bdd import parsers
from jrnl.cli import cli
from jrnl.os_compat import split_args
@when(parse('we change directory to "{directory_name}"'))
def when_we_change_directory(directory_name):
if not os.path.isdir(directory_name):
os.mkdir(directory_name)
os.chdir(directory_name)
@when(parse('we run "jrnl {command}" and {input_method}\n{user_input}'))
@when(
parsers.re(
'we run "jrnl (?P<command>[^"]+)" and (?P<input_method>enter|pipe) "(?P<user_input>[^"]+)"'
)
)
@when(parse('we run "jrnl" and {input_method} "{user_input}"'))
@when(parse('we run "jrnl {command}"'))
@when('we run "jrnl <command>"')
@when('we run "jrnl"')
def we_run(
command,
config_path,
user_input,
cli_run,
capsys,
password,
cache_dir,
editor,
keyring,
input_method,
mocks,
):
assert input_method in ["", "enter", "pipe"]
is_tty = input_method != "pipe"
if cache_dir["exists"]:
command = command.format(cache_dir=cache_dir["path"])
args = split_args(command)
status = 0
if user_input:
user_input = user_input.splitlines() if is_tty else [user_input]
if password:
password = password.splitlines()
if not password and user_input:
password = user_input
with ExitStack() as stack:
stack.enter_context(patch("sys.argv", ["jrnl"] + args))
mock_stdin = stack.enter_context(
patch("sys.stdin.read", side_effect=user_input)
)
stack.enter_context(patch("sys.stdin.isatty", return_value=is_tty))
mock_input = stack.enter_context(
patch("builtins.input", side_effect=user_input)
)
mock_getpass = stack.enter_context(
patch("getpass.getpass", side_effect=password)
)
if "datetime" in mocks:
stack.enter_context(mocks["datetime"])
stack.enter_context(mocks["calendar_parse"])
# stack.enter_context(patch("datetime.datetime", new=mocks["datetime"]))
# stack.enter_context(patch("jrnl.time.__get_pdt_calendar", return_value=mocks["calendar_parse"]))
stack.enter_context(
patch("jrnl.install.get_config_path", return_value=config_path)
)
stack.enter_context(
patch("jrnl.config.get_config_path", return_value=config_path)
)
mock_editor = stack.enter_context(patch("subprocess.call", side_effect=editor))
try:
cli(args)
except StopIteration:
# This happens when input is expected, but don't have any input left
pass
except SystemExit as e:
status = e.code
captured = capsys.readouterr()
cli_run["status"] = status
cli_run["stdout"] = captured.out
cli_run["stderr"] = captured.err
cli_run["mocks"] = {
"stdin": mock_stdin,
"input": mock_input,
"getpass": mock_getpass,
"editor": mock_editor,
}