Coverage for flowr / domain / validation.py: 100%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 18:23 +0000

1"""Validation of flow definitions against the specification.""" 

2 

3from dataclasses import dataclass, field 

4from enum import Enum 

5 

6from flowr.domain.flow_definition import Flow, State 

7 

8 

9class ConformanceLevel(Enum): 

10 """Severity levels for validation violations.""" 

11 

12 MUST = "MUST" 

13 SHOULD = "SHOULD" 

14 

15 

16@dataclass(frozen=True, slots=True) 

17class Violation: 

18 """A validation finding with severity, message, and location.""" 

19 

20 severity: ConformanceLevel 

21 message: str 

22 location: str 

23 

24 

25@dataclass(frozen=True, slots=True) 

26class ValidationResult: 

27 """Result of validating a flow definition.""" 

28 

29 violations: list[Violation] = field(default_factory=list) 

30 

31 @property 

32 def errors(self) -> list[Violation]: 

33 """MUST-level violations.""" 

34 return [v for v in self.violations if v.severity == ConformanceLevel.MUST] 

35 

36 @property 

37 def warnings(self) -> list[Violation]: 

38 """SHOULD-level violations.""" 

39 return [v for v in self.violations if v.severity == ConformanceLevel.SHOULD] 

40 

41 @property 

42 def is_valid(self) -> bool: 

43 """True if no MUST-level violations exist.""" 

44 return len(self.errors) == 0 

45 

46 

47def _check_required_fields( 

48 flow: Flow, 

49 violations: list[Violation], 

50) -> None: 

51 """Check that required fields are present.""" 

52 if not flow.exits: 

53 violations.append( 

54 Violation( 

55 severity=ConformanceLevel.MUST, 

56 message="Flow definition must have at least one exit", 

57 location=f"flow:{flow.flow}", 

58 ) 

59 ) 

60 if not flow.states: 

61 violations.append( 

62 Violation( 

63 severity=ConformanceLevel.MUST, 

64 message="Flow definition must have at least one state", 

65 location=f"flow:{flow.flow}", 

66 ) 

67 ) 

68 

69 

70def _check_next_targets( 

71 flow: Flow, 

72 violations: list[Violation], 

73) -> None: 

74 """Check that next targets resolve unambiguously.""" 

75 state_ids = {s.id for s in flow.states} 

76 exit_names = set(flow.exits) 

77 

78 for state in flow.states: 

79 for trigger, transition in state.next.items(): 

80 target = transition.target 

81 in_states = target in state_ids 

82 in_exits = target in exit_names 

83 

84 if in_states and in_exits: 

85 violations.append( 

86 Violation( 

87 severity=ConformanceLevel.MUST, 

88 message=( 

89 f"Next target '{target}' in state" 

90 f" '{state.id}' is ambiguous:" 

91 " matches both a state and an exit" 

92 ), 

93 location=(f"flow:{flow.flow}/state:{state.id}/next:{trigger}"), 

94 ) 

95 ) 

96 elif not in_states and not in_exits: 

97 violations.append( 

98 Violation( 

99 severity=ConformanceLevel.MUST, 

100 message=( 

101 f"Next target '{target}' in state" 

102 f" '{state.id}' does not match" 

103 " any state or exit" 

104 ), 

105 location=(f"flow:{flow.flow}/state:{state.id}/next:{trigger}"), 

106 ) 

107 ) 

108 

109 

110def _check_subflow_contracts( 

111 flow: Flow, 

112 all_flows: list[Flow], 

113 violations: list[Violation], 

114) -> None: 

115 """Check that parent next keys match child exits.""" 

116 flow_map = {f.flow: f for f in all_flows} 

117 for state in flow.states: 

118 if state.flow is None or all_flows is None: # pragma: no cover 

119 continue 

120 child_flow = flow_map.get(state.flow) 

121 if child_flow is None: # pragma: no cover 

122 continue 

123 next_keys = set(state.next.keys()) 

124 child_exits = set(child_flow.exits) 

125 for key in next_keys - child_exits: 

126 violations.append( 

127 Violation( 

128 severity=ConformanceLevel.MUST, 

129 message=( 

130 f"Next key '{key}' in state" 

131 f" '{state.id}' does not match" 

132 f" any exit in subflow '{state.flow}'" 

133 ), 

134 location=f"flow:{flow.flow}/state:{state.id}", 

135 ) 

136 ) 

137 

138 

139def _check_exit_references( 

140 flow: Flow, 

141 violations: list[Violation], 

142) -> None: 

143 """Check that exits are referenced by at least one transition.""" 

144 for exit_name in flow.exits: 

145 referenced = any( 

146 transition.target == exit_name 

147 for state in flow.states 

148 for transition in state.next.values() 

149 ) 

150 if not referenced: 

151 violations.append( 

152 Violation( 

153 severity=ConformanceLevel.SHOULD, 

154 message=( 

155 f"Exit '{exit_name}' is not referenced by any state transition" 

156 ), 

157 location=f"flow:{flow.flow}/exit:{exit_name}", 

158 ) 

159 ) 

160 

161 

162def _check_cross_flow_cycles( 

163 root_flow: Flow, 

164 all_flows: list[Flow], 

165 violations: list[Violation], 

166) -> None: 

167 """Check for cross-flow cycles via DFS.""" 

168 flow_map = {f.flow: f for f in all_flows} 

169 visited: set[str] = set() 

170 path: set[str] = set() 

171 

172 def dfs(flow_name: str) -> None: 

173 if flow_name in path: 

174 violations.append( 

175 Violation( 

176 severity=ConformanceLevel.MUST, 

177 message=(f"Cross-flow cycle detected involving flow '{flow_name}'"), 

178 location=f"flow:{flow_name}", 

179 ) 

180 ) 

181 return 

182 if flow_name in visited: # pragma: no cover 

183 return 

184 visited.add(flow_name) 

185 path.add(flow_name) 

186 flow = flow_map.get(flow_name) 

187 if flow is not None: 

188 for state in flow.states: 

189 if state.flow is not None: 

190 dfs(state.flow) 

191 path.discard(flow_name) 

192 

193 dfs(root_flow.flow) 

194 

195 

196def _check_condition_references( 

197 flow: Flow, 

198 violations: list[Violation], 

199) -> None: 

200 """Check that all condition references resolve within their state.""" 

201 for state in flow.states: 

202 if state.conditions is None: 

203 continue 

204 for ref in _collect_refs(state): 

205 if ref not in state.conditions: 

206 violations.append( 

207 Violation( 

208 severity=ConformanceLevel.MUST, 

209 message=( 

210 f"Unknown condition reference '{ref}' in state '{state.id}'" 

211 ), 

212 location=f"flow:{flow.flow}/state:{state.id}", 

213 ) 

214 ) 

215 

216 

217def _collect_refs(state: State) -> set[str]: 

218 """Collect all referenced condition group names from a state's transitions.""" 

219 refs: set[str] = set() 

220 for transition in state.next.values(): 

221 if transition.referenced_condition_groups is not None: 

222 refs |= transition.referenced_condition_groups 

223 return refs 

224 

225 

226def _check_unused_condition_groups( 

227 flow: Flow, 

228 violations: list[Violation], 

229) -> None: 

230 """Check that all defined condition groups are referenced.""" 

231 for state in flow.states: 

232 if state.conditions is None: 

233 continue 

234 referenced = _collect_refs(state) 

235 for name in state.conditions: 

236 if name not in referenced: 

237 violations.append( 

238 Violation( 

239 severity=ConformanceLevel.SHOULD, 

240 message=( 

241 f"Condition group '{name}' is defined" 

242 f" but never referenced in state '{state.id}'" 

243 ), 

244 location=f"flow:{flow.flow}/state:{state.id}", 

245 ) 

246 ) 

247 

248 

249def validate( 

250 flow: Flow, 

251 all_flows: list[Flow] | None = None, 

252) -> ValidationResult: 

253 """Validate a flow definition against all specification rules.""" 

254 violations: list[Violation] = [] 

255 

256 _check_required_fields(flow, violations) 

257 if not flow.states: 

258 return ValidationResult(violations=violations) 

259 

260 _check_next_targets(flow, violations) 

261 

262 if all_flows is not None: 

263 _check_subflow_contracts(flow, all_flows, violations) 

264 _check_cross_flow_cycles(flow, all_flows, violations) 

265 

266 _check_exit_references(flow, violations) 

267 _check_condition_references(flow, violations) 

268 _check_unused_condition_groups(flow, violations) 

269 

270 return ValidationResult(violations=violations)