Coverage for smith / infrastructure / template_source.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 18:48 +0000

1"""Template source adapters — resolve files from bundled, local, or URL sources.""" 

2 

3import importlib.resources 

4import os 

5import tarfile 

6import tempfile 

7import zipfile 

8from pathlib import Path 

9 

10import requests 

11 

12from smith.domain.ports import TemplateSourceError 

13from smith.domain.value_objects import FileSpec, TemplateSource 

14 

15AGENTIC_FILE_PATTERNS = [ 

16 "AGENTS.md", 

17 ".opencode/agents/", 

18 ".opencode/knowledge/", 

19 ".opencode/skills/", 

20 ".opencode/tools/", 

21 ".templates/", 

22 ".flowr/", 

23] 

24 

25GITIGNORE_PATTERNS = ["AGENTS.md", ".opencode/", ".templates/", ".flowr/"] 

26 

27 

28def _is_agentic_path(path: Path) -> bool: 

29 """Return whether *path* belongs to an agentic file pattern.""" 

30 path_str = str(path) 

31 if path_str == "AGENTS.md": 

32 return True 

33 return any(path_str.startswith(pattern) for pattern in AGENTIC_FILE_PATTERNS[1:]) 

34 

35 

36def _collect_specs_from_directory(directory: Path) -> list[FileSpec]: 

37 """Walk *directory* and collect FileSpec for every file found.""" 

38 specs: list[FileSpec] = [] 

39 for root, _dirs, files in os.walk(directory): 

40 for f in files: 

41 full = Path(root) / f 

42 rel = full.relative_to(directory) 

43 specs.append(FileSpec(relative_path=rel, content=full.read_bytes())) 

44 return specs 

45 

46 

47class BundledTemplateSource: 

48 """Resolve templates shipped inside the ``smith.data`` package.""" 

49 

50 def resolve(self) -> list[FileSpec]: 

51 """Return FileSpec list for every agentic file in the bundled data.""" 

52 try: 

53 data_dir = importlib.resources.files("smith.data") 

54 except Exception as exc: 

55 raise TemplateSourceError( 

56 f"Failed to locate bundled template data: {exc}" 

57 ) from exc 

58 

59 if not hasattr(data_dir, "joinpath"): 

60 raise TemplateSourceError("Bundled template data directory not found") 

61 

62 data_path = Path(str(data_dir.joinpath())) 

63 if not data_path.is_dir(): 

64 raise TemplateSourceError( 

65 f"Bundled template data directory not found: {data_path}" 

66 ) 

67 

68 all_specs = _collect_specs_from_directory(data_path) 

69 specs = [s for s in all_specs if _is_agentic_path(s.relative_path)] 

70 

71 if not specs: 

72 raise TemplateSourceError("Bundled template data contains no agentic files") 

73 

74 return specs 

75 

76 def gitignore_patterns(self) -> list[str]: 

77 """Return the standard gitignore patterns for bundled templates.""" 

78 return list(GITIGNORE_PATTERNS) 

79 

80 

81class LocalTemplateSource: 

82 """Resolve templates from a local directory on disk.""" 

83 

84 def __init__(self, path: Path) -> None: 

85 """Initialise with the local template directory path.""" 

86 self._path = path 

87 

88 def resolve(self) -> list[FileSpec]: 

89 """Return FileSpec list for every file in the local directory.""" 

90 if not self._path.is_dir(): 

91 raise TemplateSourceError(f"Template directory not found: {self._path}") 

92 return _collect_specs_from_directory(self._path) 

93 

94 def gitignore_patterns(self) -> list[str]: 

95 """Return gitignore patterns derived from the local directory contents.""" 

96 patterns: list[str] = [] 

97 for item in sorted(self._path.iterdir()): 

98 name = item.name 

99 if item.is_dir(): 

100 patterns.append(f"{name}/") 

101 else: 

102 patterns.append(name) 

103 return patterns 

104 

105 

106class UrlTemplateSource: 

107 """Resolve templates from a remote tar.gz or zip archive.""" 

108 

109 def __init__(self, url: str) -> None: 

110 """Initialise with the URL of the template archive.""" 

111 self._url = url 

112 

113 def resolve(self) -> list[FileSpec]: 

114 """Download, extract, and return FileSpec list for agentic files.""" 

115 try: 

116 response = requests.get(self._url, timeout=30) 

117 response.raise_for_status() 

118 except requests.RequestException as exc: 

119 raise TemplateSourceError( 

120 f"Failed to download template from {self._url}: {exc}" 

121 ) from exc 

122 

123 tmp_dir = tempfile.mkdtemp(prefix="smith_url_") 

124 try: 

125 try: 

126 if self._url.endswith(".zip"): 

127 self._extract_zip(response.content, tmp_dir) 

128 else: 

129 self._extract_tar(response.content, tmp_dir) 

130 except (tarfile.TarError, zipfile.BadZipFile, OSError) as exc: 

131 raise TemplateSourceError( 

132 f"Failed to extract template archive from {self._url}: {exc}" 

133 ) from exc 

134 

135 all_specs = _collect_specs_from_directory(Path(tmp_dir)) 

136 finally: 

137 import shutil 

138 

139 shutil.rmtree(tmp_dir, ignore_errors=True) 

140 

141 specs = [s for s in all_specs if _is_agentic_path(s.relative_path)] 

142 

143 if not specs: 

144 raise TemplateSourceError( 

145 f"Template archive from {self._url} contains no agentic files" 

146 ) 

147 

148 return specs 

149 

150 def gitignore_patterns(self) -> list[str]: 

151 """Return the standard gitignore patterns for URL-sourced templates.""" 

152 return list(GITIGNORE_PATTERNS) 

153 

154 @staticmethod 

155 def _extract_tar(content: bytes, target_dir: str) -> None: 

156 """Extract a tar.gz archive, stripping the top-level directory.""" 

157 import io 

158 

159 with tarfile.open(fileobj=io.BytesIO(content), mode="r:gz") as tar: 

160 members = tar.getmembers() 

161 if not members: 

162 raise TemplateSourceError("Tar archive is empty") 

163 

164 root_dir = members[0].name.split("/")[0] 

165 for member in members: 

166 member_path = Path(member.name) 

167 if member_path.parts[0] == root_dir: 

168 member.name = str(Path(*member_path.parts[1:])) 

169 if member.name: 

170 tar.extract(member, target_dir, filter="data") 

171 

172 @staticmethod 

173 def _extract_zip(content: bytes, target_dir: str) -> None: 

174 """Extract a zip archive, stripping the top-level directory.""" 

175 import io 

176 

177 with zipfile.ZipFile(io.BytesIO(content)) as zf: 

178 names = zf.namelist() 

179 if not names: 

180 raise TemplateSourceError("Zip archive is empty") 

181 

182 root_dir = names[0].split("/")[0] 

183 for name in names: 

184 if name == root_dir + "/": 

185 continue 

186 if name.startswith(root_dir + "/"): 

187 zf.extract(name, target_dir) 

188 

189 

190class TemplateSourceAdapter: 

191 """Dispatch to the correct TemplateSourcePort implementation based on kind.""" 

192 

193 def __init__(self, source: TemplateSource) -> None: 

194 """Initialise with the TemplateSource value object.""" 

195 self._source = source 

196 

197 def resolve(self) -> list[FileSpec]: 

198 """Resolve template files via the appropriate source strategy.""" 

199 if self._source.kind == "bundled": 

200 return BundledTemplateSource().resolve() 

201 if self._source.kind == "local": 

202 return LocalTemplateSource(Path(self._source.location)).resolve() 

203 if self._source.kind == "url": 

204 return UrlTemplateSource(self._source.location).resolve() 

205 raise TemplateSourceError(f"Unknown template source kind: {self._source.kind}") 

206 

207 def gitignore_patterns(self) -> list[str]: 

208 """Return gitignore patterns via the appropriate source strategy.""" 

209 if self._source.kind == "bundled": 

210 return BundledTemplateSource().gitignore_patterns() 

211 if self._source.kind == "local": 

212 return LocalTemplateSource(Path(self._source.location)).gitignore_patterns() 

213 if self._source.kind == "url": 

214 return UrlTemplateSource(self._source.location).gitignore_patterns() 

215 raise TemplateSourceError(f"Unknown template source kind: {self._source.kind}")