Coverage for flowr / exporters / json_exporter.py: 99%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 18:42 +0000

1"""JSON export adapter for flowr.""" 

2 

3import argparse 

4import json 

5from typing import Any 

6 

7from flowr.domain.flow_definition import Flow 

8 

9 

10class JsonExporter: 

11 """Export adapter that serializes flow definitions as JSON.""" 

12 

13 def format_name(self) -> str: 

14 """Return the canonical format name.""" 

15 return "json" 

16 

17 def description(self) -> str: 

18 """Return a short human-readable description.""" 

19 return "Export flow definitions as JSON" 

20 

21 def supports_directory(self) -> bool: 

22 """Return True — JSON adapter supports directory-mode export.""" 

23 return True 

24 

25 def accepted_options(self) -> list[str]: 

26 """Return the option keys the JSON adapter consumes.""" 

27 return ["flat", "no_attrs"] 

28 

29 def add_arguments(self, parser: object) -> None: 

30 """Register JSON-specific CLI flags.""" 

31 p: argparse.ArgumentParser = parser # type: ignore[assignment] 

32 p.add_argument("--flat", action="store_true", dest="adapter_flat") 

33 p.add_argument("--no-attrs", action="store_true", dest="adapter_no_attrs") 

34 

35 def _build_subflow_edges( 

36 self, 

37 node_id: str, 

38 child_prefix: str, 

39 child_flow: Flow, 

40 state: object, 

41 ) -> list[dict]: 

42 """Build entry and exit edges for an inlined subflow.""" 

43 from flowr.domain.flow_definition import State 

44 

45 s: State = state # type: ignore[assignment] 

46 edges: list[dict] = [] 

47 for trigger, transition in s.next.items(): 

48 for entry_state in child_flow.states: 

49 has_incoming = any( 

50 t.target == entry_state.id 

51 for st in child_flow.states 

52 for t in st.next.values() 

53 ) 

54 if not has_incoming: 

55 edges.append( 

56 { 

57 "from": node_id, 

58 "to": f"{child_prefix}{entry_state.id}", 

59 "trigger": trigger, 

60 } 

61 ) 

62 for exit_name in child_flow.exits: 

63 if transition.target != exit_name: 

64 edges.append( 

65 { 

66 "from": f"{child_prefix}__exit_{exit_name}", 

67 "to": transition.target, 

68 "trigger": exit_name, 

69 } 

70 ) 

71 return edges 

72 

73 def _inline_subflows( 

74 self, 

75 flow: Flow, 

76 subflows: dict[str, Flow], 

77 prefix: str = "", 

78 ) -> tuple[list[dict], list[dict], set[str]]: 

79 """Recursively inline subflow states with prefixed IDs.""" 

80 include_attrs = True 

81 nodes: list[dict] = [] 

82 edges: list[dict] = [] 

83 exit_ids: set[str] = set() 

84 for s in flow.states: 

85 node_id = f"{prefix}{s.id}" if prefix else s.id 

86 if s.flow and s.flow in subflows: 

87 child_flow = subflows[s.flow] 

88 child_prefix = f"{node_id}::" 

89 child_nodes, child_edges, _child_exits = self._inline_subflows( 

90 child_flow, subflows, child_prefix 

91 ) 

92 nodes.extend(child_nodes) 

93 edges.extend(child_edges) 

94 edges.extend( 

95 self._build_subflow_edges(node_id, child_prefix, child_flow, s) 

96 ) 

97 else: 

98 node: dict[str, Any] = {"id": node_id, "type": "state"} 

99 if include_attrs and s.attrs: 

100 node["attrs"] = s.attrs 

101 nodes.append(node) 

102 exit_ids.update(flow.exits) 

103 for trigger, transition in s.next.items(): 

104 target_id = ( 

105 f"{prefix}{transition.target}" if prefix else transition.target 

106 ) 

107 edge: dict = { 

108 "from": node_id, 

109 "to": target_id, 

110 "trigger": trigger, 

111 } 

112 if transition.conditions: 

113 edge["conditions"] = dict(transition.conditions.conditions) 

114 edges.append(edge) 

115 return nodes, edges, exit_ids 

116 

117 def _flow_to_dict( 

118 self, 

119 flow: Flow, 

120 options: dict, 

121 subflows: dict[str, Flow] | None = None, 

122 ) -> dict: 

123 """Convert a Flow domain object to a JSON-serializable dict.""" 

124 include_attrs = not options.get("no_attrs") 

125 flat = options.get("flat", False) 

126 if flat and subflows: 

127 nodes, edges, _ = self._inline_subflows(flow, subflows) 

128 result: dict = { 

129 "flow": flow.flow, 

130 "nodes": nodes, 

131 "edges": edges, 

132 "flat": True, 

133 } 

134 else: 

135 nodes = [] 

136 for s in flow.states: 

137 node: dict[str, Any] = { 

138 "id": s.id, 

139 "type": "subflow" if s.flow else "state", 

140 } 

141 if s.flow: 

142 node["subflow"] = s.flow 

143 if s.flow_version: 

144 node["subflowVersion"] = s.flow_version 

145 if include_attrs and s.attrs: 

146 node["attrs"] = s.attrs 

147 nodes.append(node) 

148 edges = [] 

149 for state in flow.states: 

150 for trigger, transition in state.next.items(): 

151 edge: dict = { 

152 "from": state.id, 

153 "to": transition.target, 

154 "trigger": trigger, 

155 } 

156 if transition.conditions: 

157 edge["conditions"] = dict(transition.conditions.conditions) 

158 edges.append(edge) 

159 result = { 

160 "flow": flow.flow, 

161 "version": flow.version, 

162 "exits": flow.exits, 

163 "nodes": nodes, 

164 "edges": edges, 

165 } 

166 return result 

167 

168 def export( 

169 self, 

170 flow: Flow, 

171 options: dict, 

172 subflows: dict[str, Flow] | None = None, 

173 ) -> str: 

174 """Export a single flow definition as JSON.""" 

175 result = self._flow_to_dict(flow, options, subflows) 

176 result["defaultFlow"] = flow.flow 

177 return json.dumps(result) 

178 

179 def export_directory(self, flows: list[tuple[str, Flow]], options: dict) -> str: 

180 """Export a collection of flows as a JSON array.""" 

181 entries = [] 

182 for _name, flow in flows: 

183 entry = self._flow_to_dict(flow, options) 

184 entries.append(entry) 

185 flow_names = {e["flow"] for e in entries} 

186 default_flow = "main-flow" if "main-flow" in flow_names else min(flow_names) 

187 return json.dumps({"defaultFlow": default_flow, "flows": entries})