Coverage for flowr / domain / validation.py: 100%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
1"""Validation of flow definitions against the specification."""
3from dataclasses import dataclass, field
4from enum import Enum
6from flowr.domain.flow_definition import Flow, State
9class ConformanceLevel(Enum):
10 """Severity levels for validation violations."""
12 MUST = "MUST"
13 SHOULD = "SHOULD"
16@dataclass(frozen=True, slots=True)
17class Violation:
18 """A validation finding with severity, message, and location."""
20 severity: ConformanceLevel
21 message: str
22 location: str
25@dataclass(frozen=True, slots=True)
26class ValidationResult:
27 """Result of validating a flow definition."""
29 violations: list[Violation] = field(default_factory=list)
31 @property
32 def errors(self) -> list[Violation]:
33 """MUST-level violations."""
34 return [v for v in self.violations if v.severity == ConformanceLevel.MUST]
36 @property
37 def warnings(self) -> list[Violation]:
38 """SHOULD-level violations."""
39 return [v for v in self.violations if v.severity == ConformanceLevel.SHOULD]
41 @property
42 def is_valid(self) -> bool:
43 """True if no MUST-level violations exist."""
44 return len(self.errors) == 0
47def _check_required_fields(
48 flow: Flow,
49 violations: list[Violation],
50) -> None:
51 """Check that required fields are present."""
52 if not flow.exits:
53 violations.append(
54 Violation(
55 severity=ConformanceLevel.MUST,
56 message="Flow definition must have at least one exit",
57 location=f"flow:{flow.flow}",
58 )
59 )
60 if not flow.states:
61 violations.append(
62 Violation(
63 severity=ConformanceLevel.MUST,
64 message="Flow definition must have at least one state",
65 location=f"flow:{flow.flow}",
66 )
67 )
70def _check_next_targets(
71 flow: Flow,
72 violations: list[Violation],
73) -> None:
74 """Check that next targets resolve unambiguously."""
75 state_ids = {s.id for s in flow.states}
76 exit_names = set(flow.exits)
78 for state in flow.states:
79 for trigger, transition in state.next.items():
80 target = transition.target
81 in_states = target in state_ids
82 in_exits = target in exit_names
84 if in_states and in_exits:
85 violations.append(
86 Violation(
87 severity=ConformanceLevel.MUST,
88 message=(
89 f"Next target '{target}' in state"
90 f" '{state.id}' is ambiguous:"
91 " matches both a state and an exit"
92 ),
93 location=(f"flow:{flow.flow}/state:{state.id}/next:{trigger}"),
94 )
95 )
96 elif not in_states and not in_exits:
97 violations.append(
98 Violation(
99 severity=ConformanceLevel.MUST,
100 message=(
101 f"Next target '{target}' in state"
102 f" '{state.id}' does not match"
103 " any state or exit"
104 ),
105 location=(f"flow:{flow.flow}/state:{state.id}/next:{trigger}"),
106 )
107 )
110def _check_subflow_contracts(
111 flow: Flow,
112 all_flows: list[Flow],
113 violations: list[Violation],
114) -> None:
115 """Check that parent next keys match child exits."""
116 flow_map = {f.flow: f for f in all_flows}
117 for state in flow.states:
118 if state.flow is None or all_flows is None: # pragma: no cover
119 continue
120 child_flow = flow_map.get(state.flow)
121 if child_flow is None: # pragma: no cover
122 continue
123 next_keys = set(state.next.keys())
124 child_exits = set(child_flow.exits)
125 for key in next_keys - child_exits:
126 violations.append(
127 Violation(
128 severity=ConformanceLevel.MUST,
129 message=(
130 f"Next key '{key}' in state"
131 f" '{state.id}' does not match"
132 f" any exit in subflow '{state.flow}'"
133 ),
134 location=f"flow:{flow.flow}/state:{state.id}",
135 )
136 )
139def _check_exit_references(
140 flow: Flow,
141 violations: list[Violation],
142) -> None:
143 """Check that exits are referenced by at least one transition."""
144 for exit_name in flow.exits:
145 referenced = any(
146 transition.target == exit_name
147 for state in flow.states
148 for transition in state.next.values()
149 )
150 if not referenced:
151 violations.append(
152 Violation(
153 severity=ConformanceLevel.SHOULD,
154 message=(
155 f"Exit '{exit_name}' is not referenced by any state transition"
156 ),
157 location=f"flow:{flow.flow}/exit:{exit_name}",
158 )
159 )
162def _check_cross_flow_cycles(
163 root_flow: Flow,
164 all_flows: list[Flow],
165 violations: list[Violation],
166) -> None:
167 """Check for cross-flow cycles via DFS."""
168 flow_map = {f.flow: f for f in all_flows}
169 visited: set[str] = set()
170 path: set[str] = set()
172 def dfs(flow_name: str) -> None:
173 if flow_name in path:
174 violations.append(
175 Violation(
176 severity=ConformanceLevel.MUST,
177 message=(f"Cross-flow cycle detected involving flow '{flow_name}'"),
178 location=f"flow:{flow_name}",
179 )
180 )
181 return
182 if flow_name in visited: # pragma: no cover
183 return
184 visited.add(flow_name)
185 path.add(flow_name)
186 flow = flow_map.get(flow_name)
187 if flow is not None:
188 for state in flow.states:
189 if state.flow is not None:
190 dfs(state.flow)
191 path.discard(flow_name)
193 dfs(root_flow.flow)
196def _check_condition_references(
197 flow: Flow,
198 violations: list[Violation],
199) -> None:
200 """Check that all condition references resolve within their state."""
201 for state in flow.states:
202 if state.conditions is None:
203 continue
204 for ref in _collect_refs(state):
205 if ref not in state.conditions:
206 violations.append(
207 Violation(
208 severity=ConformanceLevel.MUST,
209 message=(
210 f"Unknown condition reference '{ref}' in state '{state.id}'"
211 ),
212 location=f"flow:{flow.flow}/state:{state.id}",
213 )
214 )
217def _collect_refs(state: State) -> set[str]:
218 """Collect all referenced condition group names from a state's transitions."""
219 refs: set[str] = set()
220 for transition in state.next.values():
221 if transition.referenced_condition_groups is not None:
222 refs |= transition.referenced_condition_groups
223 return refs
226def _check_unused_condition_groups(
227 flow: Flow,
228 violations: list[Violation],
229) -> None:
230 """Check that all defined condition groups are referenced."""
231 for state in flow.states:
232 if state.conditions is None:
233 continue
234 referenced = _collect_refs(state)
235 for name in state.conditions:
236 if name not in referenced:
237 violations.append(
238 Violation(
239 severity=ConformanceLevel.SHOULD,
240 message=(
241 f"Condition group '{name}' is defined"
242 f" but never referenced in state '{state.id}'"
243 ),
244 location=f"flow:{flow.flow}/state:{state.id}",
245 )
246 )
249def validate(
250 flow: Flow,
251 all_flows: list[Flow] | None = None,
252) -> ValidationResult:
253 """Validate a flow definition against all specification rules."""
254 violations: list[Violation] = []
256 _check_required_fields(flow, violations)
257 if not flow.states:
258 return ValidationResult(violations=violations)
260 _check_next_targets(flow, violations)
262 if all_flows is not None:
263 _check_subflow_contracts(flow, all_flows, violations)
264 _check_cross_flow_cycles(flow, all_flows, violations)
266 _check_exit_references(flow, violations)
267 _check_condition_references(flow, violations)
268 _check_unused_condition_groups(flow, violations)
270 return ValidationResult(violations=violations)