Coverage for pytest_beehave / plugin.py: 24%
130 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 20:14 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-20 20:14 +0000
1"""pytest plugin entry point — orchestrates beehave during the pytest lifecycle."""
3from __future__ import annotations
5import importlib.util
6import sys
7from collections.abc import Generator
8from pathlib import Path
9from typing import Any
11import pytest
12from beehave.check import check_all
13from beehave.config import Config, load_config
14from beehave.generate import generate_stubs
15from beehave.gherkin import parse_feature
16from beehave.models import ScenarioInfo, Violation
18_beehave_config_key: pytest.StashKey[Config] = pytest.StashKey()
19_scenarios_key: pytest.StashKey[dict[str, ScenarioInfo]] = pytest.StashKey()
20_error_violations_key: pytest.StashKey[list[Violation]] = pytest.StashKey()
23class BeehaveViolationItem(pytest.Item):
24 """Synthetic test item that fails, representing a beehave ERROR."""
26 def __init__( # noqa: D107
27 self,
28 *,
29 violation: Violation,
30 **kwargs: Any, # noqa: ANN401
31 ) -> None:
32 safe_name = violation.error_type.replace("-", "_")
33 super().__init__(name=f"beehave_{safe_name}", **kwargs)
34 self.violation = violation
36 def runtest(self) -> None:
37 """Execute the test — always raises AssertionError."""
38 raise AssertionError(f"[beehave] {self.violation}")
40 def repr_failure(
41 self,
42 excinfo: pytest.ExceptionInfo[BaseException],
43 style: str | None = None,
44 ) -> str:
45 """Return a string representation of the failure."""
46 return str(excinfo.value)
48 def reportinfo(self) -> tuple[str, int, str]:
49 """Return location info for the test report."""
50 return (
51 str(self.violation.path),
52 self.violation.line,
53 f"[beehave] {self.violation.error_type}: {self.violation.message}",
54 )
57def _html_available() -> bool:
58 return importlib.util.find_spec("pytest_html") is not None
61def _format_scenario_steps(scenario: ScenarioInfo) -> str:
62 lines = [f"{step.keyword} {step.text}" for step in scenario.steps]
63 return "\n".join(lines)
66def _write_line(config: pytest.Config, text: str) -> None:
67 try:
68 config.get_terminal_writer().line(text)
69 except AssertionError, AttributeError:
70 sys.stderr.write(text + "\n")
71 sys.stderr.flush()
74def _report_violation(config: pytest.Config, v: Violation) -> None:
75 level = "WARNING" if v.is_warning else "ERROR"
76 _write_line(config, f"[beehave] {level}: {v}")
79_SKIP_MARKER = '@pytest.mark.skip(reason="not implemented")'
82def _add_skip_markers(tests_dir: Path) -> None:
83 """Add @pytest.mark.skip(reason='not implemented') to stub functions."""
84 for test_file in tests_dir.rglob("*.py"):
85 source = test_file.read_text(encoding="utf-8")
86 if "..." not in source:
87 continue
89 lines = source.split("\n")
90 changed = False
91 new_lines: list[str] = []
93 i = 0
94 while i < len(lines):
95 line = lines[i]
96 stripped = line.strip()
98 if stripped == "..." and new_lines:
99 prev = new_lines[-1]
100 if prev.lstrip().startswith("def "):
101 def_indent = prev[: len(prev) - len(prev.lstrip())]
102 insert_idx = len(new_lines) - 1
103 while insert_idx > 0 and (
104 new_lines[insert_idx - 1].lstrip().startswith("@")
105 ):
106 insert_idx -= 1
107 already_marked = (
108 insert_idx < len(new_lines)
109 and "@pytest.mark.skip" in new_lines[insert_idx]
110 )
111 if not already_marked:
112 new_lines.insert(insert_idx, f"{def_indent}{_SKIP_MARKER}")
113 changed = True
115 new_lines.append(line)
116 i += 1
118 if not changed:
119 continue
121 source = "\n".join(new_lines)
122 if "import pytest" not in source:
123 source = "import pytest\n\n" + source
125 test_file.write_text(source, encoding="utf-8")
128def _collect_scenarios_and_generate(
129 features_dir: Path,
130 config: Config,
131 pytest_config: pytest.Config,
132) -> dict[str, ScenarioInfo]:
133 scenarios: dict[str, ScenarioInfo] = {}
134 seen_names: dict[str, str] = {}
136 for feature_file in sorted(features_dir.rglob("*.feature")):
137 try:
138 parsed = parse_feature(feature_file, config, seen_names)
139 except Exception as exc:
140 msg = f"[beehave] PARSE ERROR: {feature_file}: {exc}"
141 _write_line(pytest_config, msg)
142 continue
143 scenarios.update(parsed)
145 rel = feature_file.relative_to(features_dir)
146 feature_path_str = str(rel.with_suffix(""))
147 try:
148 generate_stubs(feature_path_str, config)
149 except Exception as exc:
150 msg = f"[beehave] GENERATE ERROR: {feature_file}: {exc}"
151 _write_line(pytest_config, msg)
153 return scenarios
156def pytest_configure(config: pytest.Config) -> None:
157 """Parse features, generate stubs, check violations, register reporters."""
158 rootdir = config.rootpath
159 beehave_config = load_config(rootdir)
160 config.stash[_beehave_config_key] = beehave_config
162 features_dir = rootdir / beehave_config.features_dir
163 if not features_dir.exists():
164 return
166 scenarios = _collect_scenarios_and_generate(
167 features_dir,
168 beehave_config,
169 config,
170 )
171 config.stash[_scenarios_key] = scenarios
173 _add_skip_markers(rootdir / beehave_config.tests_dir)
175 violations = check_all(beehave_config)
176 error_violations: list[Violation] = []
177 for v in violations:
178 _report_violation(config, v)
179 if not v.is_warning:
180 error_violations.append(v)
181 config.stash[_error_violations_key] = error_violations
183 if (config.getoption("verbose", default=0) or 0) >= 1:
184 from pytest_beehave.steps_display import StepsReporter
186 config.pluginmanager.register(
187 StepsReporter(config),
188 "beehave-steps-reporter",
189 )
191 if _html_available():
192 from pytest_beehave.html_column import HtmlStepsPlugin
194 config.pluginmanager.register(
195 HtmlStepsPlugin(),
196 "beehave-html-steps",
197 )
200def pytest_collection_modifyitems(
201 session: pytest.Session,
202 items: list[pytest.Item],
203) -> None:
204 """Inject synthetic failing tests for each ERROR violation."""
205 error_violations = session.config.stash.get(_error_violations_key, [])
206 for v in error_violations:
207 item = BeehaveViolationItem.from_parent(
208 parent=session,
209 violation=v,
210 )
211 items.append(item)
214@pytest.hookimpl(hookwrapper=True)
215def pytest_runtest_makereport(
216 item: pytest.Item,
217 call: pytest.CallInfo[None],
218) -> Generator[None, None, None]:
219 """Attach BDD steps to the test report for display plugins."""
220 outcome: pytest.TestReport = yield # type: ignore[assignment]
221 report = outcome.get_result()
222 stash = item.config.stash
223 scenarios: dict[str, ScenarioInfo] | None = stash.get(
224 _scenarios_key,
225 None,
226 )
227 if scenarios is None:
228 return
229 func_name = getattr(item, "originalname", item.name)
230 scenario = scenarios.get(func_name)
231 if scenario is not None:
232 report._beehave_steps = _format_scenario_steps(scenario)