Coverage for flowr / exporters / json_exporter.py: 99%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
1"""JSON export adapter for flowr."""
3import argparse
4import json
5from typing import Any
7from flowr.domain.flow_definition import Flow
10class JsonExporter:
11 """Export adapter that serializes flow definitions as JSON."""
13 def format_name(self) -> str:
14 """Return the canonical format name."""
15 return "json"
17 def description(self) -> str:
18 """Return a short human-readable description."""
19 return "Export flow definitions as JSON"
21 def supports_directory(self) -> bool:
22 """Return True — JSON adapter supports directory-mode export."""
23 return True
25 def accepted_options(self) -> list[str]:
26 """Return the option keys the JSON adapter consumes."""
27 return ["flat", "no_attrs"]
29 def add_arguments(self, parser: object) -> None:
30 """Register JSON-specific CLI flags."""
31 p: argparse.ArgumentParser = parser # type: ignore[assignment]
32 p.add_argument("--flat", action="store_true", dest="adapter_flat")
33 p.add_argument("--no-attrs", action="store_true", dest="adapter_no_attrs")
35 def _build_subflow_edges(
36 self,
37 node_id: str,
38 child_prefix: str,
39 child_flow: Flow,
40 state: object,
41 ) -> list[dict]:
42 """Build entry and exit edges for an inlined subflow."""
43 from flowr.domain.flow_definition import State
45 s: State = state # type: ignore[assignment]
46 edges: list[dict] = []
47 for trigger, transition in s.next.items():
48 for entry_state in child_flow.states:
49 has_incoming = any(
50 t.target == entry_state.id
51 for st in child_flow.states
52 for t in st.next.values()
53 )
54 if not has_incoming:
55 edges.append(
56 {
57 "from": node_id,
58 "to": f"{child_prefix}{entry_state.id}",
59 "trigger": trigger,
60 }
61 )
62 for exit_name in child_flow.exits:
63 if transition.target != exit_name:
64 edges.append(
65 {
66 "from": f"{child_prefix}__exit_{exit_name}",
67 "to": transition.target,
68 "trigger": exit_name,
69 }
70 )
71 return edges
73 def _inline_subflows(
74 self,
75 flow: Flow,
76 subflows: dict[str, Flow],
77 prefix: str = "",
78 ) -> tuple[list[dict], list[dict], set[str]]:
79 """Recursively inline subflow states with prefixed IDs."""
80 include_attrs = True
81 nodes: list[dict] = []
82 edges: list[dict] = []
83 exit_ids: set[str] = set()
84 for s in flow.states:
85 node_id = f"{prefix}{s.id}" if prefix else s.id
86 if s.flow and s.flow in subflows:
87 child_flow = subflows[s.flow]
88 child_prefix = f"{node_id}::"
89 child_nodes, child_edges, _child_exits = self._inline_subflows(
90 child_flow, subflows, child_prefix
91 )
92 nodes.extend(child_nodes)
93 edges.extend(child_edges)
94 edges.extend(
95 self._build_subflow_edges(node_id, child_prefix, child_flow, s)
96 )
97 else:
98 node: dict[str, Any] = {"id": node_id, "type": "state"}
99 if include_attrs and s.attrs:
100 node["attrs"] = s.attrs
101 nodes.append(node)
102 exit_ids.update(flow.exits)
103 for trigger, transition in s.next.items():
104 target_id = (
105 f"{prefix}{transition.target}" if prefix else transition.target
106 )
107 edge: dict = {
108 "from": node_id,
109 "to": target_id,
110 "trigger": trigger,
111 }
112 if transition.conditions:
113 edge["conditions"] = dict(transition.conditions.conditions)
114 edges.append(edge)
115 return nodes, edges, exit_ids
117 def _flow_to_dict(
118 self,
119 flow: Flow,
120 options: dict,
121 subflows: dict[str, Flow] | None = None,
122 ) -> dict:
123 """Convert a Flow domain object to a JSON-serializable dict."""
124 include_attrs = not options.get("no_attrs")
125 flat = options.get("flat", False)
126 if flat and subflows:
127 nodes, edges, _ = self._inline_subflows(flow, subflows)
128 result: dict = {
129 "flow": flow.flow,
130 "nodes": nodes,
131 "edges": edges,
132 "flat": True,
133 }
134 else:
135 nodes = []
136 for s in flow.states:
137 node: dict[str, Any] = {
138 "id": s.id,
139 "type": "subflow" if s.flow else "state",
140 }
141 if s.flow:
142 node["subflow"] = s.flow
143 if s.flow_version:
144 node["subflowVersion"] = s.flow_version
145 if include_attrs and s.attrs:
146 node["attrs"] = s.attrs
147 nodes.append(node)
148 edges = []
149 for state in flow.states:
150 for trigger, transition in state.next.items():
151 edge: dict = {
152 "from": state.id,
153 "to": transition.target,
154 "trigger": trigger,
155 }
156 if transition.conditions:
157 edge["conditions"] = dict(transition.conditions.conditions)
158 edges.append(edge)
159 result = {
160 "flow": flow.flow,
161 "version": flow.version,
162 "exits": flow.exits,
163 "nodes": nodes,
164 "edges": edges,
165 }
166 return result
168 def export(
169 self,
170 flow: Flow,
171 options: dict,
172 subflows: dict[str, Flow] | None = None,
173 ) -> str:
174 """Export a single flow definition as JSON."""
175 result = self._flow_to_dict(flow, options, subflows)
176 result["defaultFlow"] = flow.flow
177 return json.dumps(result)
179 def export_directory(self, flows: list[tuple[str, Flow]], options: dict) -> str:
180 """Export a collection of flows as a JSON array."""
181 entries = []
182 for _name, flow in flows:
183 entry = self._flow_to_dict(flow, options)
184 entries.append(entry)
185 flow_names = {e["flow"] for e in entries}
186 default_flow = "main-flow" if "main-flow" in flow_names else min(flow_names)
187 return json.dumps({"defaultFlow": default_flow, "flows": entries})