Coverage for flowr / domain / loader.py: 100%
72 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
1"""YAML parsing for flow definitions."""
3from pathlib import Path
4from typing import Any, Protocol
6import yaml
8from flowr.domain.flow_definition import Flow, GuardCondition, Param, State, Transition
11class FlowParseError(Exception):
12 """Raised when a flow definition cannot be parsed."""
15class FlowParser(Protocol):
16 """Protocol for YAML parsing backends."""
18 def parse(self, yaml_string: str) -> dict[str, Any]:
19 """Parse a YAML string into a dictionary."""
20 ... # pragma: no cover
23def load_flow(yaml_string: str, parser: FlowParser | None = None) -> Flow:
24 """Parse a YAML document into a Flow domain object."""
25 raw: dict[str, Any] = yaml.safe_load(yaml_string)
26 return _dict_to_flow(raw)
29def load_flow_from_file(path: Path) -> Flow:
30 """Load a flow definition from a YAML file."""
31 return load_flow(path.read_text(encoding="utf-8"))
34def resolve_subflows(root_flow: Flow, root_path: Path) -> list[Flow]:
35 """Resolve all subflow references from the root flow's directory."""
36 flows = [root_flow]
37 for state in root_flow.states:
38 if state.flow is not None:
39 subflow_path = root_path.parent / state.flow
40 if subflow_path.exists():
41 flows.append(load_flow_from_file(subflow_path))
42 return flows
45def _dict_to_flow(raw: dict[str, Any]) -> Flow:
46 """Convert a raw dict from YAML into a Flow domain object."""
47 if not isinstance(raw, dict):
48 raise FlowParseError("Flow definition must be a mapping")
49 for field in ("flow", "version"):
50 if field not in raw:
51 raise FlowParseError(f"Missing required field: {field}")
52 states = [_dict_to_state(s) for s in raw.get("states", [])]
53 params = [_dict_to_param(p) for p in raw.get("params", [])]
54 return Flow(
55 flow=raw["flow"],
56 version=raw["version"],
57 exits=raw.get("exits", []),
58 states=states,
59 params=params,
60 attrs=raw.get("attrs"),
61 )
64def _dict_to_state(raw: dict[str, Any]) -> State:
65 """Convert a raw dict into a State."""
66 if not isinstance(raw, dict):
67 raise FlowParseError("State definition must be a mapping")
68 if "id" not in raw:
69 raise FlowParseError("Missing required field in state: id")
71 state_conditions: dict[str, dict[str, str]] | None = None
72 raw_conditions = raw.get("conditions")
73 if raw_conditions is not None:
74 state_conditions = raw_conditions
76 next_map: dict[str, Transition] = {}
77 for trigger, tdef in raw.get("next", {}).items():
78 if isinstance(tdef, str):
79 next_map[trigger] = Transition(trigger=trigger, target=tdef)
80 elif isinstance(tdef, dict):
81 when = tdef.get("when")
82 if when is None:
83 next_map[trigger] = Transition(
84 trigger=trigger,
85 target=tdef["to"],
86 )
87 else:
88 guard, refs = resolve_when_clause(when, state_conditions, raw["id"])
89 next_map[trigger] = Transition(
90 trigger=trigger,
91 target=tdef["to"],
92 conditions=guard,
93 referenced_condition_groups=refs,
94 )
95 return State(
96 id=raw["id"],
97 next=next_map,
98 flow=raw.get("flow"),
99 flow_version=raw.get("flow_version"),
100 attrs=raw.get("attrs"),
101 conditions=state_conditions,
102 )
105def _dict_to_param(raw: Any) -> Param: # noqa: ANN401
106 """Convert a raw param into a Param."""
107 if isinstance(raw, str):
108 return Param(name=raw)
109 if isinstance(raw, dict):
110 return Param(name=raw["name"], default=raw.get("default"))
111 return Param(name=str(raw))
114def resolve_when_clause(
115 when_clause: dict[str, str] | list | str,
116 conditions: dict[str, dict[str, str]] | None,
117 state_id: str,
118) -> tuple[GuardCondition, frozenset[str] | None]:
119 """Resolve a when clause into a GuardCondition and referenced groups."""
120 if isinstance(when_clause, dict):
121 return GuardCondition(conditions=when_clause), None
123 items = [when_clause] if isinstance(when_clause, str) else list(when_clause)
124 resolved: dict[str, str] = {}
125 referenced: list[str] = []
127 for item in items:
128 if isinstance(item, str):
129 _resolve_named_ref(item, conditions, state_id, resolved, referenced)
130 elif isinstance(item, dict):
131 resolved.update(item)
133 return (
134 GuardCondition(conditions=resolved),
135 frozenset(referenced) if referenced else None,
136 )
139def _resolve_named_ref(
140 name: str,
141 conditions: dict[str, dict[str, str]] | None,
142 state_id: str,
143 resolved: dict[str, str],
144 referenced: list[str],
145) -> None:
146 """Resolve a single named condition reference into the resolved dict."""
147 if conditions is None or name not in conditions:
148 raise FlowParseError(
149 f"Unknown condition reference '{name}' in state '{state_id}'"
150 )
151 resolved.update(conditions[name])
152 referenced.append(name)