Coverage for src/_griffe/finder.py: 94.92%

250 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-15 16:47 +0200

1# This module contains the code allowing to find modules. 

2# 

3# Note: It might be possible to replace a good part of this module's logic 

4# with utilities from `importlib` (however the util in question is private): 

5# 

6# ```pycon 

7# >>> from importlib.util import _find_spec 

8# >>> _find_spec("griffe.agents", _find_spec("griffe", None).submodule_search_locations) 

9# ModuleSpec( 

10# name='griffe.agents', 

11# loader=<_frozen_importlib_external.SourceFileLoader object at 0x7fa5f34e8110>, 

12# origin='/media/data/dev/griffe/src/griffe/agents/__init__.py', 

13# submodule_search_locations=['/media/data/dev/griffe/src/griffe/agents'], 

14# ) 

15# ``` 

16 

17from __future__ import annotations 

18 

19import ast 

20import os 

21import re 

22import sys 

23from collections import defaultdict 

24from contextlib import suppress 

25from dataclasses import dataclass 

26from itertools import chain 

27from pathlib import Path 

28from typing import TYPE_CHECKING, ClassVar, Iterator, Sequence, Tuple 

29 

30from _griffe.exceptions import UnhandledEditableModuleError 

31from _griffe.logger import logger 

32 

33if TYPE_CHECKING: 

34 from typing import Pattern 

35 

36 from _griffe.models import Module 

37 

38 

39_editable_editables_patterns = [re.compile(pat) for pat in (r"^__editables_\w+\.py$", r"^_editable_impl_\w+\.py$")] 

40_editable_setuptools_patterns = [re.compile(pat) for pat in (r"^__editable__\w+\.py$",)] 

41_editable_scikit_build_core_patterns = [re.compile(pat) for pat in (r"^_\w+_editable.py$",)] 

42_editable_meson_python_patterns = [re.compile(pat) for pat in (r"^_\w+_editable_loader.py$",)] 

43 

44NamePartsType = Tuple[str, ...] 

45"""Type alias for the parts of a module name.""" 

46NamePartsAndPathType = Tuple[NamePartsType, Path] 

47"""Type alias for the parts of a module name and its path.""" 

48 

49 

50def _match_pattern(string: str, patterns: Sequence[Pattern]) -> bool: 

51 return any(pattern.match(string) for pattern in patterns) 

52 

53 

54@dataclass 

55class Package: 

56 """This class is a simple placeholder used during the process of finding packages. 

57 

58 Parameters: 

59 name: The package name. 

60 path: The package path(s). 

61 stubs: An optional path to the related stubs file (.pyi). 

62 """ 

63 

64 name: str 

65 """Package name.""" 

66 path: Path 

67 """Package folder path.""" 

68 stubs: Path | None = None 

69 """Package stubs file.""" 

70 

71 

72@dataclass 

73class NamespacePackage: 

74 """This class is a simple placeholder used during the process of finding packages. 

75 

76 Parameters: 

77 name: The package name. 

78 path: The package paths. 

79 """ 

80 

81 name: str 

82 """Namespace package name.""" 

83 path: list[Path] 

84 """Namespace package folder paths.""" 

85 

86 

87class ModuleFinder: 

88 """The Griffe finder, allowing to find modules on the file system. 

89 

90 The module finder is generally not used directly. 

91 Each [`GriffeLoader`][griffe.GriffeLoader] instance creates its own module finder instance. 

92 The finder can be configured when instantiating the loader 

93 thanks to the [loader][griffe.GriffeLoader]'s `search_paths` parameter. 

94 """ 

95 

96 accepted_py_module_extensions: ClassVar[list[str]] = [".py", ".pyc", ".pyo", ".pyd", ".pyi", ".so"] 

97 """List of extensions supported by the finder.""" 

98 extensions_set: ClassVar[set[str]] = set(accepted_py_module_extensions) 

99 """Set of extensions supported by the finder.""" 

100 

101 def __init__(self, search_paths: Sequence[str | Path] | None = None) -> None: 

102 """Initialize the finder. 

103 

104 Parameters: 

105 search_paths: Optional paths to search into. 

106 """ 

107 self._paths_contents: dict[Path, list[Path]] = {} 

108 self.search_paths: list[Path] = [] 

109 """The finder search paths.""" 

110 

111 # Optimization: pre-compute Paths to relieve CPU when joining paths. 

112 for path in search_paths or sys.path: 

113 self.append_search_path(Path(path)) 

114 

115 self._always_scan_for: dict[str, list[Path]] = defaultdict(list) 

116 self._extend_from_pth_files() 

117 

118 def append_search_path(self, path: Path) -> None: 

119 """Append a search path. 

120 

121 The path will be resolved (absolute, normalized). 

122 The path won't be appended if it is already in the search paths list. 

123 

124 Parameters: 

125 path: The path to append. 

126 """ 

127 path = path.resolve() 

128 if path not in self.search_paths: 

129 self.search_paths.append(path) 

130 

131 def insert_search_path(self, position: int, path: Path) -> None: 

132 """Insert a search path at the given position. 

133 

134 The path will be resolved (absolute, normalized). 

135 The path won't be inserted if it is already in the search paths list. 

136 

137 Parameters: 

138 position: The insert position in the list. 

139 path: The path to insert. 

140 """ 

141 path = path.resolve() 

142 if path not in self.search_paths: 142 ↛ exitline 142 didn't return from function 'insert_search_path' because the condition on line 142 was always true

143 self.search_paths.insert(position, path) 

144 

145 def find_spec( 

146 self, 

147 module: str | Path, 

148 *, 

149 try_relative_path: bool = True, 

150 find_stubs_package: bool = False, 

151 ) -> tuple[str, Package | NamespacePackage]: 

152 """Find the top-level parent module of a module. 

153 

154 If a Path is passed, only try to find the module as a file path. 

155 If a string is passed, first try to find the module as a file path, 

156 then look into the search paths. 

157 

158 Parameters: 

159 module: The module name or path. 

160 try_relative_path: Whether to try finding the module as a relative path, 

161 when the given module is not already a path. 

162 find_stubs_package: Whether to search for stubs-only package. 

163 If both the package and its stubs are found, they'll be merged together. 

164 If only the stubs are found, they'll be used as the package itself. 

165 

166 Raises: 

167 FileNotFoundError: When a Path was passed and the module could not be found: 

168 

169 - the directory has no `__init__.py` file in it 

170 - the path does not exist 

171 

172 ModuleNotFoundError: When a string was passed and the module could not be found: 

173 

174 - no `module/__init__.py` 

175 - no `module.py` 

176 - no `module.pth` 

177 - no `module` directory (namespace packages) 

178 - or unsupported .pth file 

179 

180 Returns: 

181 The name of the module, and an instance representing its (namespace) package. 

182 """ 

183 module_path: Path | list[Path] 

184 if isinstance(module, Path): 

185 module_name, module_path = self._module_name_path(module) 

186 top_module_name = self._top_module_name(module_path) 

187 elif try_relative_path: 

188 try: 

189 module_name, module_path = self._module_name_path(Path(module)) 

190 except FileNotFoundError: 

191 module_name = module 

192 top_module_name = module.split(".", 1)[0] 

193 else: 

194 top_module_name = self._top_module_name(module_path) 

195 else: 

196 module_name = module 

197 top_module_name = module.split(".", 1)[0] 

198 

199 # Only search for actual package, let exceptions bubble up. 

200 if not find_stubs_package: 

201 return module_name, self.find_package(top_module_name) 

202 

203 # Search for both package and stubs-only package. 

204 try: 

205 package = self.find_package(top_module_name) 

206 except ModuleNotFoundError: 

207 package = None 

208 try: 

209 stubs = self.find_package(top_module_name + "-stubs") 

210 except ModuleNotFoundError: 

211 stubs = None 

212 

213 # None found, raise error. 

214 if package is None and stubs is None: 

215 raise ModuleNotFoundError(top_module_name) 

216 

217 # Both found, assemble them to be merged later. 

218 if package and stubs: 

219 if isinstance(package, Package) and isinstance(stubs, Package): 

220 package.stubs = stubs.path 

221 elif isinstance(package, NamespacePackage) and isinstance(stubs, NamespacePackage): 221 ↛ 223line 221 didn't jump to line 223 because the condition on line 221 was always true

222 package.path += stubs.path 

223 return module_name, package 

224 

225 # Return either one. 

226 return module_name, package or stubs # type: ignore[return-value] 

227 

228 def find_package(self, module_name: str) -> Package | NamespacePackage: 

229 """Find a package or namespace package. 

230 

231 Parameters: 

232 module_name: The module name. 

233 

234 Raises: 

235 ModuleNotFoundError: When the module cannot be found. 

236 

237 Returns: 

238 A package or namespace package wrapper. 

239 """ 

240 filepaths = [ 

241 Path(module_name), 

242 # TODO: Handle .py[cod] and .so files? 

243 # This would be needed for package that are composed 

244 # solely of a file with such an extension. 

245 Path(f"{module_name}.py"), 

246 ] 

247 

248 real_module_name = module_name 

249 if real_module_name.endswith("-stubs"): 

250 real_module_name = real_module_name[:-6] 

251 namespace_dirs = [] 

252 for path in self.search_paths: 

253 path_contents = self._contents(path) 

254 if path_contents: 

255 for choice in filepaths: 

256 abs_path = path / choice 

257 if abs_path in path_contents: 

258 if abs_path.suffix: 

259 stubs = abs_path.with_suffix(".pyi") 

260 return Package(real_module_name, abs_path, stubs if stubs.exists() else None) 

261 init_module = abs_path / "__init__.py" 

262 if init_module.exists() and not _is_pkg_style_namespace(init_module): 

263 stubs = init_module.with_suffix(".pyi") 

264 return Package(real_module_name, init_module, stubs if stubs.exists() else None) 

265 init_module = abs_path / "__init__.pyi" 

266 if init_module.exists(): 

267 # Stubs package 

268 return Package(real_module_name, init_module, None) 

269 namespace_dirs.append(abs_path) 

270 

271 if namespace_dirs: 

272 return NamespacePackage(module_name, namespace_dirs) 

273 

274 raise ModuleNotFoundError(module_name) 

275 

276 def iter_submodules( 

277 self, 

278 path: Path | list[Path], 

279 seen: set | None = None, 

280 ) -> Iterator[NamePartsAndPathType]: 

281 """Iterate on a module's submodules, if any. 

282 

283 Parameters: 

284 path: The module path. 

285 seen: If not none, this set is used to skip some files. 

286 The goal is to replicate the behavior of Python by 

287 only using the first packages (with `__init__` modules) 

288 of the same name found in different namespace packages. 

289 As soon as we find an `__init__` module, we add its parent 

290 path to the `seen` set, which will be reused when scanning 

291 the next namespace packages. 

292 

293 Yields: 

294 name_parts (tuple[str, ...]): The parts of a submodule name. 

295 filepath (Path): A submodule filepath. 

296 """ 

297 if isinstance(path, list): 

298 # We never enter this condition again in recursive calls, 

299 # so we just have to set `seen` once regardless of its value. 

300 seen = set() 

301 for path_elem in path: 

302 yield from self.iter_submodules(path_elem, seen) 

303 return 

304 

305 if path.stem == "__init__": 

306 path = path.parent 

307 # Optimization: just check if the file name ends with .py[icod]/.so 

308 # (to distinguish it from a directory), not if it's an actual file. 

309 elif path.suffix in self.extensions_set: 

310 return 

311 

312 # `seen` is only set when we scan a list of paths (namespace package). 

313 # `skip` is used to prevent yielding modules 

314 # of a regular subpackage that we already yielded 

315 # from another part of the namespace. 

316 skip = set(seen or ()) 

317 

318 for subpath in self._filter_py_modules(path): 

319 rel_subpath = subpath.relative_to(path) 

320 if rel_subpath.parent in skip: 

321 logger.debug(f"Skip {subpath}, another module took precedence") 

322 continue 

323 py_file = rel_subpath.suffix == ".py" 

324 stem = rel_subpath.stem 

325 if not py_file: 

326 # .py[cod] and .so files look like `name.cpython-38-x86_64-linux-gnu.ext` 

327 stem = stem.split(".", 1)[0] 

328 if stem == "__init__": 

329 # Optimization: since it's a relative path, if it has only one part 

330 # and is named __init__, it means it's the starting path 

331 # (no need to compare it against starting path). 

332 if len(rel_subpath.parts) == 1: 

333 continue 

334 yield rel_subpath.parts[:-1], subpath 

335 if seen is not None: 

336 seen.add(rel_subpath.parent) 

337 elif py_file: 

338 yield rel_subpath.with_suffix("").parts, subpath 

339 else: 

340 yield rel_subpath.with_name(stem).parts, subpath 

341 

342 def submodules(self, module: Module) -> list[NamePartsAndPathType]: 

343 """Return the list of a module's submodules. 

344 

345 Parameters: 

346 module: The parent module. 

347 

348 Returns: 

349 A list of tuples containing the parts of the submodule name and its path. 

350 """ 

351 return sorted( 

352 chain( 

353 self.iter_submodules(module.filepath), 

354 self.iter_submodules(self._always_scan_for[module.name]), 

355 ), 

356 key=_module_depth, 

357 ) 

358 

359 def _module_name_path(self, path: Path) -> tuple[str, Path]: 

360 # Always return absolute paths to avoid working-directory-dependent issues. 

361 path = path.absolute() 

362 if path.is_dir(): 

363 for ext in self.accepted_py_module_extensions: 

364 module_path = path / f"__init__{ext}" 

365 if module_path.exists(): 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 return path.name, module_path 

367 return path.name, path 

368 if path.exists(): 

369 if path.stem == "__init__": 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 return path.parent.name, path 

371 return path.stem, path 

372 raise FileNotFoundError 

373 

374 def _contents(self, path: Path) -> list[Path]: 

375 if path not in self._paths_contents: 

376 try: 

377 self._paths_contents[path] = list(path.iterdir()) 

378 except (FileNotFoundError, NotADirectoryError): 

379 self._paths_contents[path] = [] 

380 return self._paths_contents[path] 

381 

382 def _append_search_path(self, path: Path) -> None: 

383 if path not in self.search_paths: 

384 self.search_paths.append(path) 

385 

386 def _extend_from_pth_files(self) -> None: 

387 for path in self.search_paths: 

388 for item in self._contents(path): 

389 if item.suffix == ".pth": 

390 for directory in _handle_pth_file(item): 

391 if scan := directory.always_scan_for: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 self._always_scan_for[scan].append(directory.path.joinpath(scan)) 

393 self.append_search_path(directory.path) 

394 

395 def _filter_py_modules(self, path: Path) -> Iterator[Path]: 

396 for root, dirs, files in os.walk(path, topdown=True): 

397 # Optimization: modify dirs in-place to exclude `__pycache__` directories. 

398 dirs[:] = [dir for dir in dirs if dir != "__pycache__"] 

399 for relfile in files: 

400 if os.path.splitext(relfile)[1] in self.extensions_set: 

401 yield Path(root, relfile) 

402 

403 def _top_module_name(self, path: Path) -> str: 

404 # First find if a parent is in search paths. 

405 parent_path = path if path.is_dir() else path.parent 

406 # Always resolve parent path to compare for relativeness against resolved search paths. 

407 parent_path = parent_path.resolve() 

408 for search_path in self.search_paths: 

409 with suppress(ValueError, IndexError): 

410 rel_path = parent_path.relative_to(search_path.resolve()) 

411 return rel_path.parts[0] 

412 # If not, get the highest directory with an `__init__` module, 

413 # add its parent to search paths and return it. 

414 while parent_path.parent != parent_path and (parent_path.parent / "__init__.py").exists(): 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true

415 parent_path = parent_path.parent 

416 self.insert_search_path(0, parent_path.parent) 

417 return parent_path.name 

418 

419 

420_re_pkgresources = re.compile(r"(?:__import__\([\"']pkg_resources[\"']\).declare_namespace\(__name__\))") 

421_re_pkgutil = re.compile(r"(?:__path__ = __import__\([\"']pkgutil[\"']\).extend_path\(__path__, __name__\))") 

422_re_import_line = re.compile(r"^import[ \t]+\w+$") 

423 

424 

425# TODO: For more robustness, we should load and minify the AST 

426# to search for particular call statements. 

427def _is_pkg_style_namespace(init_module: Path) -> bool: 

428 code = init_module.read_text(encoding="utf8") 

429 return bool(_re_pkgresources.search(code) or _re_pkgutil.search(code)) 

430 

431 

432def _module_depth(name_parts_and_path: NamePartsAndPathType) -> int: 

433 return len(name_parts_and_path[0]) 

434 

435 

436@dataclass 

437class _SP: 

438 path: Path 

439 always_scan_for: str = "" 

440 

441 

442def _handle_pth_file(path: Path) -> list[_SP]: 

443 # Support for .pth files pointing to directories. 

444 # From https://docs.python.org/3/library/site.html: 

445 # A path configuration file is a file whose name has the form name.pth 

446 # and exists in one of the four directories mentioned above; 

447 # its contents are additional items (one per line) to be added to sys.path. 

448 # Non-existing items are never added to sys.path, 

449 # and no check is made that the item refers to a directory rather than a file. 

450 # No item is added to sys.path more than once. 

451 # Blank lines and lines beginning with # are skipped. 

452 # Lines starting with import (followed by space or tab) are executed. 

453 directories: list[_SP] = [] 

454 try: 

455 # It turns out PyTorch recommends its users to use `.pth` as the extension 

456 # when saving models on the disk. These model files are not encoded in UTF8. 

457 # If UTF8 decoding fails, we skip the .pth file. 

458 text = path.read_text(encoding="utf8") 

459 except UnicodeDecodeError: 

460 return directories 

461 for line in text.strip().replace(";", "\n").splitlines(keepends=False): 

462 line = line.strip() # noqa: PLW2901 

463 if _re_import_line.match(line): 

464 editable_module = path.parent / f"{line[len('import'):].lstrip()}.py" 

465 with suppress(UnhandledEditableModuleError): 

466 return _handle_editable_module(editable_module) 

467 if line and not line.startswith("#") and os.path.exists(line): 

468 directories.append(_SP(Path(line))) 

469 return directories 

470 

471 

472def _handle_editable_module(path: Path) -> list[_SP]: 

473 if _match_pattern(path.name, (*_editable_editables_patterns, *_editable_scikit_build_core_patterns)): 

474 # Support for how 'editables' write these files: 

475 # example line: `F.map_module('griffe', '/media/data/dev/griffe/src/griffe/__init__.py')`. 

476 # And how 'scikit-build-core' writes these files: 

477 # example line: `install({'griffe': '/media/data/dev/griffe/src/griffe/__init__.py'}, {'cmake_example': ...}, None, False, True)`. 

478 try: 

479 editable_lines = path.read_text(encoding="utf8").strip().splitlines(keepends=False) 

480 except FileNotFoundError as error: 

481 raise UnhandledEditableModuleError(path) from error 

482 new_path = Path(editable_lines[-1].split("'")[3]) 

483 if new_path.name.startswith("__init__"): 

484 return [_SP(new_path.parent.parent)] 

485 return [_SP(new_path)] 

486 if _match_pattern(path.name, _editable_setuptools_patterns): 

487 # Support for how 'setuptools' writes these files: 

488 # example line: `MAPPING = {'griffe': '/media/data/dev/griffe/src/griffe', 'briffe': '/media/data/dev/griffe/src/briffe'}`. 

489 # with annotation: `MAPPING: dict[str, str] = {...}`. 

490 parsed_module = ast.parse(path.read_text()) 

491 for node in parsed_module.body: 491 ↛ 500line 491 didn't jump to line 500 because the loop on line 491 didn't complete

492 if isinstance(node, ast.Assign): 

493 target = node.targets[0] 

494 elif isinstance(node, ast.AnnAssign): 

495 target = node.target 

496 else: 

497 continue 

498 if isinstance(target, ast.Name) and target.id == "MAPPING" and isinstance(node.value, ast.Dict): # type: ignore[attr-defined] 

499 return [_SP(Path(cst.value).parent) for cst in node.value.values if isinstance(cst, ast.Constant)] # type: ignore[attr-defined] 

500 if _match_pattern(path.name, _editable_meson_python_patterns): 

501 # Support for how 'meson-python' writes these files: 

502 # example line: `install({'package', 'module1'}, '/media/data/dev/griffe/build/cp311', ["path"], False)`. 

503 # Compiled modules then found in the cp311 folder, under src/package. 

504 parsed_module = ast.parse(path.read_text()) 

505 for node in parsed_module.body: 505 ↛ 517line 505 didn't jump to line 517 because the loop on line 505 didn't complete

506 if ( 

507 isinstance(node, ast.Expr) 

508 and isinstance(node.value, ast.Call) 

509 and isinstance(node.value.func, ast.Name) 

510 and node.value.func.id == "install" 

511 and isinstance(node.value.args[1], ast.Constant) 

512 ): 

513 build_path = Path(node.value.args[1].value, "src") 

514 # NOTE: What if there are multiple packages? 

515 pkg_name = next(build_path.iterdir()).name 

516 return [_SP(build_path, always_scan_for=pkg_name)] 

517 raise UnhandledEditableModuleError(path)