Coverage for flowr / domain / loader.py: 98%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 18:42 +0000

1"""YAML parsing for flow definitions.""" 

2 

3from pathlib import Path 

4from typing import Any, Protocol 

5 

6import yaml 

7 

8from flowr.domain.flow_definition import Flow, GuardCondition, Param, State, Transition 

9 

10 

11class FlowParseError(Exception): 

12 """Raised when a flow definition cannot be parsed.""" 

13 

14 

15class FlowParser(Protocol): 

16 """Protocol for YAML parsing backends.""" 

17 

18 def parse(self, yaml_string: str) -> dict[str, Any]: 

19 """Parse a YAML string into a dictionary.""" 

20 ... # pragma: no cover 

21 

22 

23def load_flow(yaml_string: str, parser: FlowParser | None = None) -> Flow: 

24 """Parse a YAML document into a Flow domain object.""" 

25 raw: dict[str, Any] = yaml.safe_load(yaml_string) 

26 return _dict_to_flow(raw) 

27 

28 

29def load_flow_from_file(path: Path) -> Flow: 

30 """Load a flow definition from a YAML file.""" 

31 return load_flow(path.read_text(encoding="utf-8")) 

32 

33 

34def resolve_subflows(root_flow: Flow, root_path: Path) -> list[Flow]: 

35 """Resolve all subflow references from the root flow's directory. 

36 

37 Flow references may omit the ``.yaml`` extension. The function tries 

38 the path as-is first and, if it does not exist, appends ``.yaml``. 

39 """ 

40 flows = [root_flow] 

41 for state in root_flow.states: 

42 if state.flow is not None: 

43 subflow_path = root_path.parent / state.flow 

44 if not subflow_path.exists(): 

45 subflow_path = root_path.parent / (state.flow + ".yaml") 

46 if subflow_path.exists(): 

47 flows.append(load_flow_from_file(subflow_path)) 

48 return flows 

49 

50 

51def _dict_to_flow(raw: dict[str, Any]) -> Flow: 

52 """Convert a raw dict from YAML into a Flow domain object.""" 

53 if not isinstance(raw, dict): 

54 raise FlowParseError("Flow definition must be a mapping") 

55 for field in ("flow", "version"): 

56 if field not in raw: 

57 raise FlowParseError(f"Missing required field: {field}") 

58 states = [_dict_to_state(s) for s in raw.get("states", [])] 

59 params = [_dict_to_param(p) for p in raw.get("params", [])] 

60 return Flow( 

61 flow=raw["flow"], 

62 version=raw["version"], 

63 exits=raw.get("exits", []), 

64 states=states, 

65 params=params, 

66 attrs=raw.get("attrs"), 

67 ) 

68 

69 

70def _dict_to_state(raw: dict[str, Any]) -> State: 

71 """Convert a raw dict into a State.""" 

72 if not isinstance(raw, dict): 

73 raise FlowParseError("State definition must be a mapping") 

74 if "id" not in raw: 

75 raise FlowParseError("Missing required field in state: id") 

76 

77 state_conditions: dict[str, dict[str, str]] | None = None 

78 raw_conditions = raw.get("conditions") 

79 if raw_conditions is not None: 

80 state_conditions = raw_conditions 

81 

82 next_map: dict[str, Transition] = {} 

83 for trigger, tdef in raw.get("next", {}).items(): 

84 if isinstance(tdef, str): 

85 next_map[trigger] = Transition(trigger=trigger, target=tdef) 

86 elif isinstance(tdef, dict): 

87 when = tdef.get("when") 

88 if when is None: 

89 next_map[trigger] = Transition( 

90 trigger=trigger, 

91 target=tdef["to"], 

92 ) 

93 else: 

94 guard, refs = resolve_when_clause(when, state_conditions, raw["id"]) 

95 next_map[trigger] = Transition( 

96 trigger=trigger, 

97 target=tdef["to"], 

98 conditions=guard, 

99 referenced_condition_groups=refs, 

100 ) 

101 return State( 

102 id=raw["id"], 

103 next=next_map, 

104 flow=raw.get("flow"), 

105 flow_version=raw.get("flow_version"), 

106 attrs=raw.get("attrs"), 

107 conditions=state_conditions, 

108 ) 

109 

110 

111def _dict_to_param(raw: Any) -> Param: # noqa: ANN401 

112 """Convert a raw param into a Param.""" 

113 if isinstance(raw, str): 

114 return Param(name=raw) 

115 if isinstance(raw, dict): 

116 return Param(name=raw["name"], default=raw.get("default")) 

117 return Param(name=str(raw)) 

118 

119 

120def _validate_condition_operators(conditions: dict[str, str], state_id: str) -> None: 

121 """Reject unsupported condition operators in when clauses.""" 

122 for _key, value in conditions.items(): 

123 if value.startswith("~="): 

124 raise FlowParseError( 

125 f"Unsupported condition operator '~=' in state '{state_id}'" 

126 ) 

127 

128 

129def resolve_when_clause( 

130 when_clause: dict[str, str] | list | str, 

131 conditions: dict[str, dict[str, str]] | None, 

132 state_id: str, 

133) -> tuple[GuardCondition, frozenset[str] | None]: 

134 """Resolve a when clause into a GuardCondition and referenced groups.""" 

135 if isinstance(when_clause, dict): 

136 _validate_condition_operators(when_clause, state_id) 

137 return GuardCondition(conditions=when_clause), None 

138 

139 items = [when_clause] if isinstance(when_clause, str) else list(when_clause) 

140 resolved: dict[str, str] = {} 

141 referenced: list[str] = [] 

142 

143 for item in items: 

144 if isinstance(item, str): 

145 _resolve_named_ref(item, conditions, state_id, resolved, referenced) 

146 elif isinstance(item, dict): 

147 resolved.update(item) 

148 

149 _validate_condition_operators(resolved, state_id) 

150 

151 return ( 

152 GuardCondition(conditions=resolved), 

153 frozenset(referenced) if referenced else None, 

154 ) 

155 

156 

157def _resolve_named_ref( 

158 name: str, 

159 conditions: dict[str, dict[str, str]] | None, 

160 state_id: str, 

161 resolved: dict[str, str], 

162 referenced: list[str], 

163) -> None: 

164 """Resolve a single named condition reference into the resolved dict.""" 

165 if conditions is None or name not in conditions: 

166 raise FlowParseError( 

167 f"Unknown condition reference '{name}' in state '{state_id}'" 

168 ) 

169 resolved.update(conditions[name]) 

170 referenced.append(name)