- Convert core→optional imports to lazy imports with try/except fallbacks - cms/media_service: use TYPE_CHECKING for ProductMedia type hints - customers/customer_service: wrap Order imports in try/except - tenancy/admin_platform_users: wrap stats_service import in try/except - Enhance validate_architecture.py to recognize lazy import patterns - Add module_dependency_graph.py script for dependency visualization The lazy import pattern allows optional modules to be truly optional while maintaining type safety through TYPE_CHECKING blocks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
464 lines
16 KiB
Python
Executable File
464 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Module Dependency Graph Generator
|
|
|
|
Analyzes the codebase and outputs a dependency graph showing how modules
|
|
depend on each other. Useful for understanding module coupling and
|
|
identifying architectural issues.
|
|
|
|
Usage:
|
|
python scripts/module_dependency_graph.py [--format FORMAT] [--output FILE]
|
|
|
|
Options:
|
|
--format FORMAT Output format: text (default), dot, json, mermaid
|
|
--output FILE Output file (default: stdout)
|
|
--show-details Show file-level dependencies
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
@dataclass
|
|
class ModuleDependency:
|
|
"""Represents a dependency from one module to another."""
|
|
source_module: str
|
|
target_module: str
|
|
source_files: list[str] = field(default_factory=list)
|
|
import_count: int = 0
|
|
|
|
|
|
@dataclass
|
|
class ModuleInfo:
|
|
"""Information about a module."""
|
|
name: str
|
|
is_core: bool
|
|
dependencies: list[str] = field(default_factory=list)
|
|
dependents: list[str] = field(default_factory=list)
|
|
file_count: int = 0
|
|
|
|
|
|
class ModuleDependencyAnalyzer:
|
|
"""Analyzes module dependencies in the codebase."""
|
|
|
|
# Core modules (always enabled)
|
|
CORE_MODULES = {
|
|
"contracts",
|
|
"core",
|
|
"tenancy",
|
|
"cms",
|
|
"customers",
|
|
"billing",
|
|
"payments",
|
|
"messaging",
|
|
}
|
|
|
|
# Optional modules (can be enabled/disabled per platform)
|
|
OPTIONAL_MODULES = {
|
|
"analytics",
|
|
"cart",
|
|
"catalog",
|
|
"checkout",
|
|
"inventory",
|
|
"loyalty",
|
|
"marketplace",
|
|
"orders",
|
|
"monitoring",
|
|
}
|
|
|
|
def __init__(self, project_root: Path):
|
|
self.project_root = project_root
|
|
self.modules_path = project_root / "app" / "modules"
|
|
self.dependencies: dict[str, dict[str, ModuleDependency]] = defaultdict(dict)
|
|
self.modules: dict[str, ModuleInfo] = {}
|
|
|
|
def analyze(self) -> dict[str, ModuleInfo]:
|
|
"""Analyze all module dependencies."""
|
|
if not self.modules_path.exists():
|
|
print(f"Error: Modules path not found: {self.modules_path}", file=sys.stderr)
|
|
return {}
|
|
|
|
# First pass: discover modules
|
|
module_dirs = [
|
|
d for d in self.modules_path.iterdir()
|
|
if d.is_dir() and d.name != "__pycache__" and not d.name.startswith(".")
|
|
]
|
|
|
|
for module_dir in module_dirs:
|
|
module_name = module_dir.name
|
|
is_core = module_name in self.CORE_MODULES
|
|
self.modules[module_name] = ModuleInfo(
|
|
name=module_name,
|
|
is_core=is_core,
|
|
)
|
|
|
|
# Second pass: analyze imports
|
|
for module_dir in module_dirs:
|
|
module_name = module_dir.name
|
|
self._analyze_module(module_dir, module_name)
|
|
|
|
# Calculate dependents
|
|
for module_name, module_info in self.modules.items():
|
|
for dep in self.dependencies.get(module_name, {}).values():
|
|
module_info.dependencies.append(dep.target_module)
|
|
if dep.target_module in self.modules:
|
|
self.modules[dep.target_module].dependents.append(module_name)
|
|
|
|
return self.modules
|
|
|
|
def _analyze_module(self, module_dir: Path, module_name: str):
|
|
"""Analyze a single module's dependencies."""
|
|
file_count = 0
|
|
for py_file in module_dir.rglob("*.py"):
|
|
if "__pycache__" in str(py_file):
|
|
continue
|
|
|
|
file_count += 1
|
|
try:
|
|
content = py_file.read_text()
|
|
self._analyze_file(py_file, content, module_name)
|
|
except Exception as e:
|
|
print(f"Warning: Could not analyze {py_file}: {e}", file=sys.stderr)
|
|
|
|
if module_name in self.modules:
|
|
self.modules[module_name].file_count = file_count
|
|
|
|
def _analyze_file(self, file_path: Path, content: str, source_module: str):
|
|
"""Analyze imports in a single file."""
|
|
lines = content.split("\n")
|
|
in_docstring = False
|
|
docstring_delimiter = None
|
|
in_type_checking = False
|
|
type_checking_indent = 0
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
stripped = line.strip()
|
|
if stripped.startswith("#") or not stripped:
|
|
continue
|
|
|
|
# Track docstrings
|
|
if '"""' in line or "'''" in line:
|
|
for delim in ['"""', "'''"]:
|
|
count = line.count(delim)
|
|
if count == 1:
|
|
if not in_docstring:
|
|
in_docstring = True
|
|
docstring_delimiter = delim
|
|
elif docstring_delimiter == delim:
|
|
in_docstring = False
|
|
docstring_delimiter = None
|
|
|
|
if in_docstring:
|
|
continue
|
|
|
|
# Track TYPE_CHECKING blocks
|
|
current_indent = len(line) - len(line.lstrip())
|
|
if "if TYPE_CHECKING:" in stripped or "if TYPE_CHECKING" in stripped:
|
|
in_type_checking = True
|
|
type_checking_indent = current_indent
|
|
continue
|
|
if in_type_checking and current_indent <= type_checking_indent and stripped:
|
|
in_type_checking = False
|
|
|
|
# Look for import statements
|
|
import_match = re.match(
|
|
r'^\s*(?:from\s+(app\.modules\.(\w+))|import\s+(app\.modules\.(\w+)))',
|
|
line
|
|
)
|
|
|
|
if not import_match:
|
|
continue
|
|
|
|
target_module = import_match.group(2) or import_match.group(4)
|
|
if not target_module or target_module == source_module:
|
|
continue
|
|
|
|
# Skip noqa comments
|
|
if "noqa:" in line.lower() and "import" in line.lower():
|
|
continue
|
|
|
|
# Record the dependency
|
|
if target_module not in self.dependencies[source_module]:
|
|
self.dependencies[source_module][target_module] = ModuleDependency(
|
|
source_module=source_module,
|
|
target_module=target_module,
|
|
)
|
|
|
|
dep = self.dependencies[source_module][target_module]
|
|
dep.import_count += 1
|
|
rel_path = str(file_path.relative_to(self.project_root))
|
|
if rel_path not in dep.source_files:
|
|
dep.source_files.append(rel_path)
|
|
|
|
def get_dependency_matrix(self) -> dict[str, set[str]]:
|
|
"""Return a simple dependency matrix (module -> set of dependencies)."""
|
|
result = {}
|
|
for module_name in self.modules:
|
|
deps = set()
|
|
for dep in self.dependencies.get(module_name, {}).values():
|
|
deps.add(dep.target_module)
|
|
result[module_name] = deps
|
|
return result
|
|
|
|
def find_circular_dependencies(self) -> list[tuple[str, str]]:
|
|
"""Find bidirectional dependencies (A imports B and B imports A)."""
|
|
circular = []
|
|
checked = set()
|
|
for source in self.dependencies:
|
|
for target in self.dependencies.get(source, {}):
|
|
if (target, source) not in checked:
|
|
if source in self.dependencies.get(target, {}):
|
|
circular.append((source, target))
|
|
checked.add((source, target))
|
|
return circular
|
|
|
|
|
|
def format_text(analyzer: ModuleDependencyAnalyzer, show_details: bool = False) -> str:
|
|
"""Format output as plain text."""
|
|
lines = []
|
|
lines.append("=" * 70)
|
|
lines.append("MODULE DEPENDENCY GRAPH")
|
|
lines.append("=" * 70)
|
|
lines.append("")
|
|
|
|
# Separate core and optional modules
|
|
core_modules = sorted([m for m in analyzer.modules.values() if m.is_core], key=lambda x: x.name)
|
|
optional_modules = sorted([m for m in analyzer.modules.values() if not m.is_core], key=lambda x: x.name)
|
|
|
|
lines.append("CORE MODULES (always enabled):")
|
|
lines.append("-" * 40)
|
|
for module in core_modules:
|
|
deps = analyzer.dependencies.get(module.name, {})
|
|
deps_str = ", ".join(sorted(deps.keys())) if deps else "(none)"
|
|
lines.append(f" {module.name} ({module.file_count} files)")
|
|
lines.append(f" → depends on: {deps_str}")
|
|
if show_details and deps:
|
|
for dep_name, dep in sorted(deps.items()):
|
|
lines.append(f" - {dep_name}: {dep.import_count} imports from {len(dep.source_files)} files")
|
|
lines.append("")
|
|
|
|
lines.append("OPTIONAL MODULES (can be disabled):")
|
|
lines.append("-" * 40)
|
|
for module in optional_modules:
|
|
deps = analyzer.dependencies.get(module.name, {})
|
|
deps_str = ", ".join(sorted(deps.keys())) if deps else "(none)"
|
|
lines.append(f" {module.name} ({module.file_count} files)")
|
|
lines.append(f" → depends on: {deps_str}")
|
|
if show_details and deps:
|
|
for dep_name, dep in sorted(deps.items()):
|
|
lines.append(f" - {dep_name}: {dep.import_count} imports from {len(dep.source_files)} files")
|
|
lines.append("")
|
|
|
|
# Find circular dependencies
|
|
circular = analyzer.find_circular_dependencies()
|
|
if circular:
|
|
lines.append("⚠️ BIDIRECTIONAL DEPENDENCIES (potential design issues):")
|
|
lines.append("-" * 40)
|
|
for a, b in circular:
|
|
lines.append(f" {a} ⟷ {b}")
|
|
lines.append("")
|
|
|
|
# Check for core→optional violations
|
|
violations = []
|
|
for source_module in analyzer.CORE_MODULES:
|
|
deps = analyzer.dependencies.get(source_module, {})
|
|
for target_module in deps:
|
|
if target_module in analyzer.OPTIONAL_MODULES:
|
|
violations.append((source_module, target_module, deps[target_module]))
|
|
|
|
if violations:
|
|
lines.append("❌ ARCHITECTURE VIOLATIONS (core → optional):")
|
|
lines.append("-" * 40)
|
|
for source, target, dep in violations:
|
|
lines.append(f" {source} → {target} ({dep.import_count} imports)")
|
|
if show_details:
|
|
for f in dep.source_files[:5]:
|
|
lines.append(f" - {f}")
|
|
if len(dep.source_files) > 5:
|
|
lines.append(f" ... and {len(dep.source_files) - 5} more files")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_mermaid(analyzer: ModuleDependencyAnalyzer) -> str:
|
|
"""Format output as Mermaid diagram."""
|
|
lines = []
|
|
lines.append("```mermaid")
|
|
lines.append("flowchart TD")
|
|
lines.append(" subgraph core[\"Core Modules\"]")
|
|
for module in sorted(analyzer.CORE_MODULES):
|
|
if module in analyzer.modules:
|
|
lines.append(f" {module}[{module}]")
|
|
lines.append(" end")
|
|
lines.append("")
|
|
lines.append(" subgraph optional[\"Optional Modules\"]")
|
|
for module in sorted(analyzer.OPTIONAL_MODULES):
|
|
if module in analyzer.modules:
|
|
lines.append(f" {module}[{module}]")
|
|
lines.append(" end")
|
|
lines.append("")
|
|
|
|
# Add edges
|
|
for source, deps in analyzer.dependencies.items():
|
|
for target, dep in deps.items():
|
|
# Style violations differently
|
|
source_is_core = source in analyzer.CORE_MODULES
|
|
target_is_optional = target in analyzer.OPTIONAL_MODULES
|
|
if source_is_core and target_is_optional:
|
|
lines.append(f" {source} -->|❌| {target}")
|
|
else:
|
|
lines.append(f" {source} --> {target}")
|
|
|
|
lines.append("```")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_dot(analyzer: ModuleDependencyAnalyzer) -> str:
|
|
"""Format output as GraphViz DOT."""
|
|
lines = []
|
|
lines.append("digraph ModuleDependencies {")
|
|
lines.append(' rankdir=LR;')
|
|
lines.append(' node [shape=box];')
|
|
lines.append('')
|
|
|
|
# Core modules cluster
|
|
lines.append(' subgraph cluster_core {')
|
|
lines.append(' label="Core Modules";')
|
|
lines.append(' style=filled;')
|
|
lines.append(' color=lightblue;')
|
|
for module in sorted(analyzer.CORE_MODULES):
|
|
if module in analyzer.modules:
|
|
lines.append(f' {module};')
|
|
lines.append(' }')
|
|
lines.append('')
|
|
|
|
# Optional modules cluster
|
|
lines.append(' subgraph cluster_optional {')
|
|
lines.append(' label="Optional Modules";')
|
|
lines.append(' style=filled;')
|
|
lines.append(' color=lightyellow;')
|
|
for module in sorted(analyzer.OPTIONAL_MODULES):
|
|
if module in analyzer.modules:
|
|
lines.append(f' {module};')
|
|
lines.append(' }')
|
|
lines.append('')
|
|
|
|
# Edges
|
|
for source, deps in analyzer.dependencies.items():
|
|
for target, dep in deps.items():
|
|
source_is_core = source in analyzer.CORE_MODULES
|
|
target_is_optional = target in analyzer.OPTIONAL_MODULES
|
|
if source_is_core and target_is_optional:
|
|
lines.append(f' {source} -> {target} [color=red, style=dashed, label="violation"];')
|
|
else:
|
|
lines.append(f' {source} -> {target};')
|
|
|
|
lines.append('}')
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_json(analyzer: ModuleDependencyAnalyzer) -> str:
|
|
"""Format output as JSON."""
|
|
data = {
|
|
"modules": {},
|
|
"dependencies": [],
|
|
"circular_dependencies": [],
|
|
"violations": [],
|
|
}
|
|
|
|
for name, module in analyzer.modules.items():
|
|
data["modules"][name] = {
|
|
"name": name,
|
|
"is_core": module.is_core,
|
|
"file_count": module.file_count,
|
|
"dependencies": list(analyzer.dependencies.get(name, {}).keys()),
|
|
}
|
|
|
|
for source, deps in analyzer.dependencies.items():
|
|
for target, dep in deps.items():
|
|
data["dependencies"].append({
|
|
"source": source,
|
|
"target": target,
|
|
"import_count": dep.import_count,
|
|
"source_files": dep.source_files,
|
|
})
|
|
|
|
data["circular_dependencies"] = [
|
|
{"modules": list(pair)} for pair in analyzer.find_circular_dependencies()
|
|
]
|
|
|
|
for source_module in analyzer.CORE_MODULES:
|
|
deps = analyzer.dependencies.get(source_module, {})
|
|
for target_module in deps:
|
|
if target_module in analyzer.OPTIONAL_MODULES:
|
|
data["violations"].append({
|
|
"source": source_module,
|
|
"target": target_module,
|
|
"files": deps[target_module].source_files,
|
|
})
|
|
|
|
return json.dumps(data, indent=2)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate module dependency graph"
|
|
)
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=["text", "dot", "json", "mermaid"],
|
|
default="text",
|
|
help="Output format (default: text)",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
help="Output file (default: stdout)",
|
|
)
|
|
parser.add_argument(
|
|
"--show-details",
|
|
action="store_true",
|
|
help="Show file-level dependency details",
|
|
)
|
|
parser.add_argument(
|
|
"--project-root",
|
|
type=str,
|
|
default=".",
|
|
help="Project root directory (default: current directory)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
project_root = Path(args.project_root).resolve()
|
|
analyzer = ModuleDependencyAnalyzer(project_root)
|
|
analyzer.analyze()
|
|
|
|
# Format output
|
|
if args.format == "text":
|
|
output = format_text(analyzer, show_details=args.show_details)
|
|
elif args.format == "mermaid":
|
|
output = format_mermaid(analyzer)
|
|
elif args.format == "dot":
|
|
output = format_dot(analyzer)
|
|
elif args.format == "json":
|
|
output = format_json(analyzer)
|
|
else:
|
|
output = format_text(analyzer, show_details=args.show_details)
|
|
|
|
# Write output
|
|
if args.output:
|
|
Path(args.output).write_text(output)
|
|
print(f"Output written to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|