Coverage for flowr / server / app.py: 93%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 18:42 +0000

1"""FastAPI application factory for the viz server.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import threading 

7import time 

8from pathlib import Path 

9from typing import Any 

10 

11from fastapi import FastAPI, Request 

12from fastapi.responses import HTMLResponse, JSONResponse 

13from fastapi.staticfiles import StaticFiles 

14 

15from flowr.server.config import ServerConfig 

16from flowr.server.scanner import FlowRegistry 

17 

18 

19def _build_scope(environ: dict[str, Any]) -> tuple[dict[str, Any], bytes]: 

20 content_length = int(environ.get("CONTENT_LENGTH", 0) or 0) 

21 body = environ["wsgi.input"].read(content_length) 

22 

23 raw_headers: list[tuple[bytes, bytes]] = [] 

24 for key, value in environ.items(): 

25 if key.startswith("HTTP_"): 

26 name = key[5:].replace("_", "-").lower().encode() 

27 raw_headers.append((name, value.encode())) 

28 if "CONTENT_TYPE" in environ: 

29 raw_headers.append((b"content-type", environ["CONTENT_TYPE"].encode())) 

30 if "CONTENT_LENGTH" in environ: 

31 raw_headers.append((b"content-length", environ["CONTENT_LENGTH"].encode())) 

32 

33 scheme = environ.get("wsgi.url_scheme", "http") 

34 server_name = environ.get("SERVER_NAME", "testserver") 

35 server_port = int(environ.get("SERVER_PORT", "80")) 

36 path = environ.get("PATH_INFO", "/") 

37 query_string = environ.get("QUERY_STRING", "").encode() 

38 

39 scope: dict[str, Any] = { 

40 "type": "http", 

41 "http_version": "1.1", 

42 "method": environ["REQUEST_METHOD"], 

43 "path": path, 

44 "raw_path": path.encode(), 

45 "root_path": environ.get("SCRIPT_NAME", ""), 

46 "scheme": scheme, 

47 "query_string": query_string, 

48 "headers": raw_headers, 

49 "client": (environ.get("REMOTE_ADDR", "localhost"), 0), 

50 "server": (server_name, server_port), 

51 "state": {}, 

52 } 

53 return scope, body 

54 

55 

56async def _wsgi_to_asgi( 

57 asgi_app: Any, # noqa: ANN401 

58 scope: dict[str, Any], 

59 body: bytes, 

60) -> dict[str, Any]: 

61 response_status: int = 500 

62 response_headers: list[tuple[bytes, bytes]] = [] 

63 response_body_chunks: list[bytes] = [] 

64 

65 body_sent = False 

66 

67 async def receive() -> dict[str, Any]: # noqa: RUF029 

68 nonlocal body_sent 

69 if body_sent: 

70 return {"type": "http.disconnect"} 

71 body_sent = True 

72 return { 

73 "type": "http.request", 

74 "body": body, 

75 "more_body": False, 

76 } 

77 

78 async def send(message: dict[str, Any]) -> None: # noqa: RUF029 

79 nonlocal response_status, response_headers 

80 if message["type"] == "http.response.start": 

81 response_status = message["status"] 

82 response_headers = [(k.lower(), v) for k, v in message["headers"]] 

83 elif message["type"] == "http.response.body": 

84 response_body_chunks.append(message.get("body", b"")) 

85 

86 await asgi_app(scope, receive, send) 

87 

88 return { 

89 "status": response_status, 

90 "headers": response_headers, 

91 "body": b"".join(response_body_chunks), 

92 } 

93 

94 

95class _ASGIWSGIBridge: 

96 def __init__(self, asgi_app: Any) -> None: # noqa: ANN401 

97 self._asgi_app = asgi_app 

98 

99 def __call__(self, environ: dict[str, Any], start_response: Any) -> list[bytes]: # noqa: ANN401 

100 scope, body = _build_scope(environ) 

101 response = asyncio.run(_wsgi_to_asgi(self._asgi_app, scope, body)) 

102 

103 status_str = str(response["status"]) 

104 wsgi_headers: list[tuple[str, str]] = [ 

105 (k.decode(), v.decode()) for k, v in response["headers"] 

106 ] 

107 

108 start_response(status_str + " OK", wsgi_headers) 

109 return [response["body"]] 

110 

111 

112def _make_app(config: ServerConfig) -> FastAPI: # noqa: C901 

113 """Create the raw FastAPI application (no WSGI bridge).""" 

114 app = FastAPI() 

115 registry = FlowRegistry(config.path) 

116 

117 static_dir = Path(__file__).resolve().parent.parent / "static" 

118 if static_dir.is_dir(): 

119 app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") 

120 

121 index_html = (static_dir / "index.html").read_text() 

122 

123 @app.get("/") 

124 async def root() -> HTMLResponse: 

125 return HTMLResponse(content=index_html) 

126 

127 @app.get("/api/flows") 

128 async def list_flows(refresh: str = "false") -> dict[str, Any]: 

129 if refresh == "true": 

130 registry.refresh() 

131 flows: list[dict[str, Any]] = [] 

132 for f in registry.list_flows(): 

133 flows.append( 

134 { 

135 "name": f.name, 

136 "relativePath": f.relative_path, 

137 "status": "ok", 

138 "error": None, 

139 } 

140 ) 

141 return { 

142 "flows": flows, 

143 "directory": str(config.path), 

144 "edit_mode": config.edit_mode, 

145 } 

146 

147 @app.get("/api/flows/{flow_id:path}") 

148 async def get_flow(flow_id: str) -> Any: # noqa: ANN401 

149 result = registry.read_flow_model(flow_id) 

150 if not result["success"]: 

151 return JSONResponse(status_code=404, content={"error": result["error"]}) 

152 model = result["data"] 

153 return { 

154 "flow": model.flow, 

155 "version": model.version, 

156 "exits": list(model.exits), 

157 "params": [{"name": p.name, "default": p.default} for p in model.params], 

158 "attrs": model.attrs or {}, 

159 "states": [ 

160 { 

161 "id": s.id, 

162 "attrs": s.attrs or {}, 

163 "conditions": s.conditions or {}, 

164 "flow": s.flow, 

165 "flow_version": s.flow_version, 

166 "next": { 

167 trigger: { 

168 "target": t.target, 

169 "conditions": ( 

170 t.conditions.conditions if t.conditions else None 

171 ), 

172 } 

173 for trigger, t in s.next.items() 

174 }, 

175 } 

176 for s in model.states 

177 ], 

178 } 

179 

180 @app.put("/api/flows/{flow_id:path}") 

181 async def put_flow(flow_id: str, request: Request) -> Any: # noqa: ANN401 

182 if not config.edit_mode: 

183 return JSONResponse( 

184 status_code=405, content={"error": "method not allowed"} 

185 ) 

186 content = (await request.body()).decode() 

187 result = registry.write_flow(flow_id, content) 

188 if result["success"]: 

189 return JSONResponse(status_code=200, content=result) 

190 if result.get("error"): 

191 return JSONResponse(status_code=404, content=result) 

192 return JSONResponse(status_code=422, content=result) 

193 

194 @app.post("/api/flows") 

195 async def post_flow(request: Request) -> Any: # noqa: ANN401 

196 if not config.edit_mode: 

197 return JSONResponse( 

198 status_code=405, content={"error": "method not allowed"} 

199 ) 

200 body = await request.json() 

201 filename = body.get("filename") 

202 content = body.get("content", "") 

203 if not filename: 

204 return JSONResponse(status_code=422, content={"error": "filename required"}) 

205 result = registry.create_flow(filename, content) 

206 if result.get("error") == "path traversal": 

207 return JSONResponse(status_code=422, content=result) 

208 if result["success"]: 

209 return JSONResponse(status_code=201, content=result) 

210 return JSONResponse(status_code=422, content=result) 

211 

212 @app.delete("/api/flows/{flow_id:path}") 

213 async def delete_flow(flow_id: str) -> Any: # noqa: ANN401 

214 if not config.edit_mode: 

215 return JSONResponse( 

216 status_code=405, content={"error": "method not allowed"} 

217 ) 

218 if registry.delete_flow(flow_id): 

219 return JSONResponse(status_code=200, content={"status": "ok"}) 

220 return JSONResponse(status_code=404, content={"error": "not found"}) 

221 

222 return app 

223 

224 

225def create_app(config: ServerConfig) -> _ASGIWSGIBridge: 

226 """Create the application, wrapped for WSGI compatibility.""" 

227 return _ASGIWSGIBridge(_make_app(config)) 

228 

229 

230def start_server(config: ServerConfig) -> tuple[threading.Thread, int]: 

231 """Start the uvicorn server in a background thread. 

232 

233 Returns: 

234 (thread, actual_port) — the port uvicorn bound to. 

235 """ 

236 import uvicorn 

237 

238 app = _make_app(config) 

239 srv = uvicorn.Server( 

240 uvicorn.Config(app, host=config.host, port=config.port, log_level="error") 

241 ) 

242 

243 # Give uvicorn a moment to bind, then capture the actual port 

244 def run_and_store() -> None: 

245 srv.run() 

246 

247 t = threading.Thread(target=run_and_store, daemon=True) 

248 t.start() 

249 # Wait briefly for the server to bind 

250 timeout = 2.0 

251 start = time.monotonic() 

252 while not srv.started and (time.monotonic() - start) < timeout: 

253 time.sleep(0.05) 

254 if not srv.started: 

255 raise RuntimeError("server did not start within timeout") 

256 

257 if srv.servers: 

258 actual_port = srv.servers[0].sockets[0].getsockname()[1] # type: ignore[union-attr] 

259 else: 

260 actual_port = config.port 

261 return t, actual_port 

262 

263 

264def run_server(config: ServerConfig) -> None: 

265 """Start the uvicorn server and block until stopped.""" 

266 import uvicorn 

267 

268 app = _make_app(config) 

269 print(f"http://{config.host}:{config.port}", flush=True) # noqa: T201 

270 uvicorn.run(app, host=config.host, port=config.port, log_level="error")