Coverage for flowr / server / scanner.py: 79%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 18:42 +0000

1"""Flow file discovery and scanning for the viz server.""" 

2 

3import os 

4import tempfile 

5from dataclasses import dataclass 

6from pathlib import Path 

7 

8import yaml 

9 

10from flowr.domain.loader import load_flow_from_file 

11 

12 

13@dataclass(frozen=True, slots=True) 

14class FlowFile: 

15 """A discovered flow file on disk. 

16 

17 Attributes: 

18 name: The stem of the file (without extension). 

19 relative_path: The path relative to the project root. 

20 """ 

21 

22 name: str 

23 relative_path: str 

24 

25 

26class FlowRegistry: 

27 """Registry of all YAML flow files in a project directory. 

28 

29 Scans the given root for ``*.yaml`` files and exposes CRUD operations 

30 with structural validation. 

31 """ 

32 

33 def __init__(self, root: Path) -> None: 

34 """Initialise the registry and perform the initial scan. 

35 

36 Args: 

37 root: The project directory to scan for flow files. 

38 """ 

39 self._root = root 

40 self._files: list[FlowFile] = [] 

41 self._refresh() 

42 

43 def _refresh(self) -> None: 

44 """Re-scan the root directory for ``*.yaml`` files.""" 

45 self._files = [] 

46 for yf in sorted(self._root.rglob("*.yaml")): 

47 rp = yf.relative_to(self._root) 

48 self._files.append(FlowFile(name=yf.stem, relative_path=str(rp))) 

49 

50 def list_flows(self) -> list[FlowFile]: 

51 """Return a snapshot of all discovered flow files. 

52 

53 Returns: 

54 A new list of :class:`FlowFile` instances. 

55 """ 

56 return list(self._files) 

57 

58 def get_flow(self, flow_id: str) -> FlowFile | None: 

59 """Look up a flow file by its stem name or relative path. 

60 

61 Args: 

62 flow_id: The flow name (stem, no ``.yaml`` extension) or 

63 relative path (e.g. ``workflows/deploy.yaml``). 

64 

65 Returns: 

66 The matching :class:`FlowFile`, or ``None`` if not found. 

67 """ 

68 clean = flow_id.removesuffix(".yaml").removesuffix(".yml") 

69 for f in self._files: 

70 if f.name == clean or f.relative_path == flow_id: 

71 return f 

72 return None 

73 

74 def refresh(self) -> None: 

75 """Re-scan the root directory for updated flow files.""" 

76 self._refresh() 

77 

78 def read_flow(self, flow_id: str) -> str | None: 

79 """Read the raw YAML content of a flow file. 

80 

81 Args: 

82 flow_id: The flow name (stem, no ``.yaml`` extension). 

83 

84 Returns: 

85 The file contents as a string, or ``None`` if not found. 

86 """ 

87 f = self.get_flow(flow_id) 

88 if f is None: 

89 return None 

90 fp = self._root / f.relative_path 

91 if not fp.exists(): 

92 return None 

93 return fp.read_text() 

94 

95 def read_flow_model(self, flow_id: str) -> dict: 

96 """Load and parse a flow file into a domain model dict. 

97 

98 Args: 

99 flow_id: The flow name (stem, no ``.yaml`` extension). 

100 

101 Returns: 

102 A dict with ``success`` and either ``data`` or ``error``. 

103 """ 

104 f = self.get_flow(flow_id) 

105 if f is None: 

106 return {"success": False, "error": "not found"} 

107 fp = self._root / f.relative_path 

108 if not fp.exists(): 

109 return {"success": False, "error": "not found"} 

110 try: 

111 model = load_flow_from_file(fp) 

112 return {"success": True, "data": model} 

113 except Exception: 

114 return {"success": False, "error": "parse error"} 

115 

116 def _validate_yaml(self, content: str) -> tuple[bool, str, list[dict]]: 

117 """Parse raw YAML and check its structure. 

118 

119 Args: 

120 content: Raw YAML string. 

121 

122 Returns: 

123 A 3-tuple of ``(is_valid, message, violations)``. 

124 """ 

125 try: 

126 data = yaml.safe_load(content) 

127 except yaml.YAMLError as e: 

128 return False, str(e), [] 

129 if data is None: 

130 data = {} 

131 violations = self._check_structure(data) 

132 return len(violations) == 0, "ok" if not violations else "invalid", violations 

133 

134 @staticmethod 

135 def _check_structure(data: dict) -> list[dict]: 

136 """Validate that the parsed data has the required top-level keys. 

137 

138 Args: 

139 data: The parsed YAML structure as a dict. 

140 

141 Returns: 

142 A list of violation dicts (empty if valid). 

143 """ 

144 violations: list[dict] = [] 

145 if not isinstance(data, dict): 

146 violations.append({"message": "top-level must be a mapping"}) 

147 return violations 

148 if "flow" not in data: 

149 violations.append({"message": "missing 'flow' key"}) 

150 if "states" not in data: 

151 violations.append({"message": "missing 'states' key"}) 

152 elif not isinstance(data["states"], list): 

153 violations.append({"message": "'states' must be a list"}) 

154 elif len(data["states"]) == 0: 

155 violations.append({"message": "'states' must have at least one entry"}) 

156 return violations 

157 

158 def write_flow(self, flow_id: str, content: str) -> dict: 

159 """Atomically overwrite an existing flow file after validation. 

160 

161 Args: 

162 flow_id: The flow name (stem). 

163 content: The new raw YAML content. 

164 

165 Returns: 

166 A result dict with ``success``, ``valid``, and ``violations`` keys. 

167 """ 

168 f = self.get_flow(flow_id) 

169 if f is None: 

170 return {"success": False, "error": "not found"} 

171 valid, _message, violations = self._validate_yaml(content) 

172 if not valid: 

173 return { 

174 "success": False, 

175 "valid": False, 

176 "violations": violations, 

177 } 

178 fp = self._root / f.relative_path 

179 fd, tmp = tempfile.mkstemp(dir=str(fp.parent), suffix=".tmp") 

180 try: 

181 os.write(fd, content.encode()) 

182 os.fsync(fd) 

183 finally: 

184 os.close(fd) 

185 Path(tmp).replace(fp) 

186 return {"success": True, "valid": True, "violations": []} 

187 

188 def create_flow(self, filename: str, content: str) -> dict: 

189 """Create a new flow file after validation. 

190 

191 Args: 

192 filename: The new file stem (no ``.yaml`` extension). 

193 content: The raw YAML content. 

194 

195 Returns: 

196 A result dict with ``success``, ``valid``, and ``violations`` keys. 

197 """ 

198 if ".." in filename or "/" in filename or "\\" in filename: 

199 return {"success": False, "error": "path traversal"} 

200 valid, _message, violations = self._validate_yaml(content) 

201 if not valid: 

202 return {"success": False, "valid": False, "violations": violations} 

203 fp = self._root / f"{filename}.yaml" 

204 fp.parent.mkdir(parents=True, exist_ok=True) 

205 fp.write_text(content) 

206 self._refresh() 

207 return { 

208 "success": True, 

209 "valid": True, 

210 "violations": [], 

211 "path": str(fp.relative_to(self._root)), 

212 } 

213 

214 def delete_flow(self, flow_id: str) -> bool: 

215 """Delete a flow file from disk. 

216 

217 Args: 

218 flow_id: The flow name (stem). 

219 

220 Returns: 

221 ``True`` if the file was successfully deleted, ``False`` otherwise. 

222 """ 

223 f = self.get_flow(flow_id) 

224 if f is None: 

225 return False 

226 fp = self._root / f.relative_path 

227 if not fp.exists(): 

228 return False 

229 fp.unlink() 

230 self._refresh() 

231 return True 

232 

233 

234def discover_flows(path: Path) -> FlowRegistry: 

235 """Create a :class:`FlowRegistry` that scans the given directory. 

236 

237 Args: 

238 path: The project root directory. 

239 

240 Returns: 

241 A fully initialised :class:`FlowRegistry`. 

242 """ 

243 return FlowRegistry(path)