Coverage for flowr / domain / loader.py: 100%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 18:23 +0000

1"""YAML parsing for flow definitions.""" 

2 

3from pathlib import Path 

4from typing import Any, Protocol 

5 

6import yaml 

7 

8from flowr.domain.flow_definition import Flow, GuardCondition, Param, State, Transition 

9 

10 

11class FlowParseError(Exception): 

12 """Raised when a flow definition cannot be parsed.""" 

13 

14 

15class FlowParser(Protocol): 

16 """Protocol for YAML parsing backends.""" 

17 

18 def parse(self, yaml_string: str) -> dict[str, Any]: 

19 """Parse a YAML string into a dictionary.""" 

20 ... # pragma: no cover 

21 

22 

23def load_flow(yaml_string: str, parser: FlowParser | None = None) -> Flow: 

24 """Parse a YAML document into a Flow domain object.""" 

25 raw: dict[str, Any] = yaml.safe_load(yaml_string) 

26 return _dict_to_flow(raw) 

27 

28 

29def load_flow_from_file(path: Path) -> Flow: 

30 """Load a flow definition from a YAML file.""" 

31 return load_flow(path.read_text(encoding="utf-8")) 

32 

33 

34def resolve_subflows(root_flow: Flow, root_path: Path) -> list[Flow]: 

35 """Resolve all subflow references from the root flow's directory.""" 

36 flows = [root_flow] 

37 for state in root_flow.states: 

38 if state.flow is not None: 

39 subflow_path = root_path.parent / state.flow 

40 if subflow_path.exists(): 

41 flows.append(load_flow_from_file(subflow_path)) 

42 return flows 

43 

44 

45def _dict_to_flow(raw: dict[str, Any]) -> Flow: 

46 """Convert a raw dict from YAML into a Flow domain object.""" 

47 if not isinstance(raw, dict): 

48 raise FlowParseError("Flow definition must be a mapping") 

49 for field in ("flow", "version"): 

50 if field not in raw: 

51 raise FlowParseError(f"Missing required field: {field}") 

52 states = [_dict_to_state(s) for s in raw.get("states", [])] 

53 params = [_dict_to_param(p) for p in raw.get("params", [])] 

54 return Flow( 

55 flow=raw["flow"], 

56 version=raw["version"], 

57 exits=raw.get("exits", []), 

58 states=states, 

59 params=params, 

60 attrs=raw.get("attrs"), 

61 ) 

62 

63 

64def _dict_to_state(raw: dict[str, Any]) -> State: 

65 """Convert a raw dict into a State.""" 

66 if not isinstance(raw, dict): 

67 raise FlowParseError("State definition must be a mapping") 

68 if "id" not in raw: 

69 raise FlowParseError("Missing required field in state: id") 

70 

71 state_conditions: dict[str, dict[str, str]] | None = None 

72 raw_conditions = raw.get("conditions") 

73 if raw_conditions is not None: 

74 state_conditions = raw_conditions 

75 

76 next_map: dict[str, Transition] = {} 

77 for trigger, tdef in raw.get("next", {}).items(): 

78 if isinstance(tdef, str): 

79 next_map[trigger] = Transition(trigger=trigger, target=tdef) 

80 elif isinstance(tdef, dict): 

81 when = tdef.get("when") 

82 if when is None: 

83 next_map[trigger] = Transition( 

84 trigger=trigger, 

85 target=tdef["to"], 

86 ) 

87 else: 

88 guard, refs = resolve_when_clause(when, state_conditions, raw["id"]) 

89 next_map[trigger] = Transition( 

90 trigger=trigger, 

91 target=tdef["to"], 

92 conditions=guard, 

93 referenced_condition_groups=refs, 

94 ) 

95 return State( 

96 id=raw["id"], 

97 next=next_map, 

98 flow=raw.get("flow"), 

99 flow_version=raw.get("flow_version"), 

100 attrs=raw.get("attrs"), 

101 conditions=state_conditions, 

102 ) 

103 

104 

105def _dict_to_param(raw: Any) -> Param: # noqa: ANN401 

106 """Convert a raw param into a Param.""" 

107 if isinstance(raw, str): 

108 return Param(name=raw) 

109 if isinstance(raw, dict): 

110 return Param(name=raw["name"], default=raw.get("default")) 

111 return Param(name=str(raw)) 

112 

113 

114def resolve_when_clause( 

115 when_clause: dict[str, str] | list | str, 

116 conditions: dict[str, dict[str, str]] | None, 

117 state_id: str, 

118) -> tuple[GuardCondition, frozenset[str] | None]: 

119 """Resolve a when clause into a GuardCondition and referenced groups.""" 

120 if isinstance(when_clause, dict): 

121 return GuardCondition(conditions=when_clause), None 

122 

123 items = [when_clause] if isinstance(when_clause, str) else list(when_clause) 

124 resolved: dict[str, str] = {} 

125 referenced: list[str] = [] 

126 

127 for item in items: 

128 if isinstance(item, str): 

129 _resolve_named_ref(item, conditions, state_id, resolved, referenced) 

130 elif isinstance(item, dict): 

131 resolved.update(item) 

132 

133 return ( 

134 GuardCondition(conditions=resolved), 

135 frozenset(referenced) if referenced else None, 

136 ) 

137 

138 

139def _resolve_named_ref( 

140 name: str, 

141 conditions: dict[str, dict[str, str]] | None, 

142 state_id: str, 

143 resolved: dict[str, str], 

144 referenced: list[str], 

145) -> None: 

146 """Resolve a single named condition reference into the resolved dict.""" 

147 if conditions is None or name not in conditions: 

148 raise FlowParseError( 

149 f"Unknown condition reference '{name}' in state '{state_id}'" 

150 ) 

151 resolved.update(conditions[name]) 

152 referenced.append(name)