Coverage for flowr / domain / loader.py: 98%
80 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
1"""YAML parsing for flow definitions."""
3from pathlib import Path
4from typing import Any, Protocol
6import yaml
8from flowr.domain.flow_definition import Flow, GuardCondition, Param, State, Transition
11class FlowParseError(Exception):
12 """Raised when a flow definition cannot be parsed."""
15class FlowParser(Protocol):
16 """Protocol for YAML parsing backends."""
18 def parse(self, yaml_string: str) -> dict[str, Any]:
19 """Parse a YAML string into a dictionary."""
20 ... # pragma: no cover
23def load_flow(yaml_string: str, parser: FlowParser | None = None) -> Flow:
24 """Parse a YAML document into a Flow domain object."""
25 raw: dict[str, Any] = yaml.safe_load(yaml_string)
26 return _dict_to_flow(raw)
29def load_flow_from_file(path: Path) -> Flow:
30 """Load a flow definition from a YAML file."""
31 return load_flow(path.read_text(encoding="utf-8"))
34def resolve_subflows(root_flow: Flow, root_path: Path) -> list[Flow]:
35 """Resolve all subflow references from the root flow's directory.
37 Flow references may omit the ``.yaml`` extension. The function tries
38 the path as-is first and, if it does not exist, appends ``.yaml``.
39 """
40 flows = [root_flow]
41 for state in root_flow.states:
42 if state.flow is not None:
43 subflow_path = root_path.parent / state.flow
44 if not subflow_path.exists():
45 subflow_path = root_path.parent / (state.flow + ".yaml")
46 if subflow_path.exists():
47 flows.append(load_flow_from_file(subflow_path))
48 return flows
51def _dict_to_flow(raw: dict[str, Any]) -> Flow:
52 """Convert a raw dict from YAML into a Flow domain object."""
53 if not isinstance(raw, dict):
54 raise FlowParseError("Flow definition must be a mapping")
55 for field in ("flow", "version"):
56 if field not in raw:
57 raise FlowParseError(f"Missing required field: {field}")
58 states = [_dict_to_state(s) for s in raw.get("states", [])]
59 params = [_dict_to_param(p) for p in raw.get("params", [])]
60 return Flow(
61 flow=raw["flow"],
62 version=raw["version"],
63 exits=raw.get("exits", []),
64 states=states,
65 params=params,
66 attrs=raw.get("attrs"),
67 )
70def _dict_to_state(raw: dict[str, Any]) -> State:
71 """Convert a raw dict into a State."""
72 if not isinstance(raw, dict):
73 raise FlowParseError("State definition must be a mapping")
74 if "id" not in raw:
75 raise FlowParseError("Missing required field in state: id")
77 state_conditions: dict[str, dict[str, str]] | None = None
78 raw_conditions = raw.get("conditions")
79 if raw_conditions is not None:
80 state_conditions = raw_conditions
82 next_map: dict[str, Transition] = {}
83 for trigger, tdef in raw.get("next", {}).items():
84 if isinstance(tdef, str):
85 next_map[trigger] = Transition(trigger=trigger, target=tdef)
86 elif isinstance(tdef, dict):
87 when = tdef.get("when")
88 if when is None:
89 next_map[trigger] = Transition(
90 trigger=trigger,
91 target=tdef["to"],
92 )
93 else:
94 guard, refs = resolve_when_clause(when, state_conditions, raw["id"])
95 next_map[trigger] = Transition(
96 trigger=trigger,
97 target=tdef["to"],
98 conditions=guard,
99 referenced_condition_groups=refs,
100 )
101 return State(
102 id=raw["id"],
103 next=next_map,
104 flow=raw.get("flow"),
105 flow_version=raw.get("flow_version"),
106 attrs=raw.get("attrs"),
107 conditions=state_conditions,
108 )
111def _dict_to_param(raw: Any) -> Param: # noqa: ANN401
112 """Convert a raw param into a Param."""
113 if isinstance(raw, str):
114 return Param(name=raw)
115 if isinstance(raw, dict):
116 return Param(name=raw["name"], default=raw.get("default"))
117 return Param(name=str(raw))
120def _validate_condition_operators(conditions: dict[str, str], state_id: str) -> None:
121 """Reject unsupported condition operators in when clauses."""
122 for _key, value in conditions.items():
123 if value.startswith("~="):
124 raise FlowParseError(
125 f"Unsupported condition operator '~=' in state '{state_id}'"
126 )
129def resolve_when_clause(
130 when_clause: dict[str, str] | list | str,
131 conditions: dict[str, dict[str, str]] | None,
132 state_id: str,
133) -> tuple[GuardCondition, frozenset[str] | None]:
134 """Resolve a when clause into a GuardCondition and referenced groups."""
135 if isinstance(when_clause, dict):
136 _validate_condition_operators(when_clause, state_id)
137 return GuardCondition(conditions=when_clause), None
139 items = [when_clause] if isinstance(when_clause, str) else list(when_clause)
140 resolved: dict[str, str] = {}
141 referenced: list[str] = []
143 for item in items:
144 if isinstance(item, str):
145 _resolve_named_ref(item, conditions, state_id, resolved, referenced)
146 elif isinstance(item, dict):
147 resolved.update(item)
149 _validate_condition_operators(resolved, state_id)
151 return (
152 GuardCondition(conditions=resolved),
153 frozenset(referenced) if referenced else None,
154 )
157def _resolve_named_ref(
158 name: str,
159 conditions: dict[str, dict[str, str]] | None,
160 state_id: str,
161 resolved: dict[str, str],
162 referenced: list[str],
163) -> None:
164 """Resolve a single named condition reference into the resolved dict."""
165 if conditions is None or name not in conditions:
166 raise FlowParseError(
167 f"Unknown condition reference '{name}' in state '{state_id}'"
168 )
169 resolved.update(conditions[name])
170 referenced.append(name)