Coverage for flowr / server / scanner.py: 79%
112 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
1"""Flow file discovery and scanning for the viz server."""
3import os
4import tempfile
5from dataclasses import dataclass
6from pathlib import Path
8import yaml
10from flowr.domain.loader import load_flow_from_file
13@dataclass(frozen=True, slots=True)
14class FlowFile:
15 """A discovered flow file on disk.
17 Attributes:
18 name: The stem of the file (without extension).
19 relative_path: The path relative to the project root.
20 """
22 name: str
23 relative_path: str
26class FlowRegistry:
27 """Registry of all YAML flow files in a project directory.
29 Scans the given root for ``*.yaml`` files and exposes CRUD operations
30 with structural validation.
31 """
33 def __init__(self, root: Path) -> None:
34 """Initialise the registry and perform the initial scan.
36 Args:
37 root: The project directory to scan for flow files.
38 """
39 self._root = root
40 self._files: list[FlowFile] = []
41 self._refresh()
43 def _refresh(self) -> None:
44 """Re-scan the root directory for ``*.yaml`` files."""
45 self._files = []
46 for yf in sorted(self._root.rglob("*.yaml")):
47 rp = yf.relative_to(self._root)
48 self._files.append(FlowFile(name=yf.stem, relative_path=str(rp)))
50 def list_flows(self) -> list[FlowFile]:
51 """Return a snapshot of all discovered flow files.
53 Returns:
54 A new list of :class:`FlowFile` instances.
55 """
56 return list(self._files)
58 def get_flow(self, flow_id: str) -> FlowFile | None:
59 """Look up a flow file by its stem name or relative path.
61 Args:
62 flow_id: The flow name (stem, no ``.yaml`` extension) or
63 relative path (e.g. ``workflows/deploy.yaml``).
65 Returns:
66 The matching :class:`FlowFile`, or ``None`` if not found.
67 """
68 clean = flow_id.removesuffix(".yaml").removesuffix(".yml")
69 for f in self._files:
70 if f.name == clean or f.relative_path == flow_id:
71 return f
72 return None
74 def refresh(self) -> None:
75 """Re-scan the root directory for updated flow files."""
76 self._refresh()
78 def read_flow(self, flow_id: str) -> str | None:
79 """Read the raw YAML content of a flow file.
81 Args:
82 flow_id: The flow name (stem, no ``.yaml`` extension).
84 Returns:
85 The file contents as a string, or ``None`` if not found.
86 """
87 f = self.get_flow(flow_id)
88 if f is None:
89 return None
90 fp = self._root / f.relative_path
91 if not fp.exists():
92 return None
93 return fp.read_text()
95 def read_flow_model(self, flow_id: str) -> dict:
96 """Load and parse a flow file into a domain model dict.
98 Args:
99 flow_id: The flow name (stem, no ``.yaml`` extension).
101 Returns:
102 A dict with ``success`` and either ``data`` or ``error``.
103 """
104 f = self.get_flow(flow_id)
105 if f is None:
106 return {"success": False, "error": "not found"}
107 fp = self._root / f.relative_path
108 if not fp.exists():
109 return {"success": False, "error": "not found"}
110 try:
111 model = load_flow_from_file(fp)
112 return {"success": True, "data": model}
113 except Exception:
114 return {"success": False, "error": "parse error"}
116 def _validate_yaml(self, content: str) -> tuple[bool, str, list[dict]]:
117 """Parse raw YAML and check its structure.
119 Args:
120 content: Raw YAML string.
122 Returns:
123 A 3-tuple of ``(is_valid, message, violations)``.
124 """
125 try:
126 data = yaml.safe_load(content)
127 except yaml.YAMLError as e:
128 return False, str(e), []
129 if data is None:
130 data = {}
131 violations = self._check_structure(data)
132 return len(violations) == 0, "ok" if not violations else "invalid", violations
134 @staticmethod
135 def _check_structure(data: dict) -> list[dict]:
136 """Validate that the parsed data has the required top-level keys.
138 Args:
139 data: The parsed YAML structure as a dict.
141 Returns:
142 A list of violation dicts (empty if valid).
143 """
144 violations: list[dict] = []
145 if not isinstance(data, dict):
146 violations.append({"message": "top-level must be a mapping"})
147 return violations
148 if "flow" not in data:
149 violations.append({"message": "missing 'flow' key"})
150 if "states" not in data:
151 violations.append({"message": "missing 'states' key"})
152 elif not isinstance(data["states"], list):
153 violations.append({"message": "'states' must be a list"})
154 elif len(data["states"]) == 0:
155 violations.append({"message": "'states' must have at least one entry"})
156 return violations
158 def write_flow(self, flow_id: str, content: str) -> dict:
159 """Atomically overwrite an existing flow file after validation.
161 Args:
162 flow_id: The flow name (stem).
163 content: The new raw YAML content.
165 Returns:
166 A result dict with ``success``, ``valid``, and ``violations`` keys.
167 """
168 f = self.get_flow(flow_id)
169 if f is None:
170 return {"success": False, "error": "not found"}
171 valid, _message, violations = self._validate_yaml(content)
172 if not valid:
173 return {
174 "success": False,
175 "valid": False,
176 "violations": violations,
177 }
178 fp = self._root / f.relative_path
179 fd, tmp = tempfile.mkstemp(dir=str(fp.parent), suffix=".tmp")
180 try:
181 os.write(fd, content.encode())
182 os.fsync(fd)
183 finally:
184 os.close(fd)
185 Path(tmp).replace(fp)
186 return {"success": True, "valid": True, "violations": []}
188 def create_flow(self, filename: str, content: str) -> dict:
189 """Create a new flow file after validation.
191 Args:
192 filename: The new file stem (no ``.yaml`` extension).
193 content: The raw YAML content.
195 Returns:
196 A result dict with ``success``, ``valid``, and ``violations`` keys.
197 """
198 if ".." in filename or "/" in filename or "\\" in filename:
199 return {"success": False, "error": "path traversal"}
200 valid, _message, violations = self._validate_yaml(content)
201 if not valid:
202 return {"success": False, "valid": False, "violations": violations}
203 fp = self._root / f"{filename}.yaml"
204 fp.parent.mkdir(parents=True, exist_ok=True)
205 fp.write_text(content)
206 self._refresh()
207 return {
208 "success": True,
209 "valid": True,
210 "violations": [],
211 "path": str(fp.relative_to(self._root)),
212 }
214 def delete_flow(self, flow_id: str) -> bool:
215 """Delete a flow file from disk.
217 Args:
218 flow_id: The flow name (stem).
220 Returns:
221 ``True`` if the file was successfully deleted, ``False`` otherwise.
222 """
223 f = self.get_flow(flow_id)
224 if f is None:
225 return False
226 fp = self._root / f.relative_path
227 if not fp.exists():
228 return False
229 fp.unlink()
230 self._refresh()
231 return True
234def discover_flows(path: Path) -> FlowRegistry:
235 """Create a :class:`FlowRegistry` that scans the given directory.
237 Args:
238 path: The project root directory.
240 Returns:
241 A fully initialised :class:`FlowRegistry`.
242 """
243 return FlowRegistry(path)