fix: resolve cross-module import violations with lazy import pattern
- Convert core→optional imports to lazy imports with try/except fallbacks - cms/media_service: use TYPE_CHECKING for ProductMedia type hints - customers/customer_service: wrap Order imports in try/except - tenancy/admin_platform_users: wrap stats_service import in try/except - Enhance validate_architecture.py to recognize lazy import patterns - Add module_dependency_graph.py script for dependency visualization The lazy import pattern allows optional modules to be truly optional while maintaining type safety through TYPE_CHECKING blocks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
463
scripts/module_dependency_graph.py
Executable file
463
scripts/module_dependency_graph.py
Executable file
@@ -0,0 +1,463 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Module Dependency Graph Generator
|
||||
|
||||
Analyzes the codebase and outputs a dependency graph showing how modules
|
||||
depend on each other. Useful for understanding module coupling and
|
||||
identifying architectural issues.
|
||||
|
||||
Usage:
|
||||
python scripts/module_dependency_graph.py [--format FORMAT] [--output FILE]
|
||||
|
||||
Options:
|
||||
--format FORMAT Output format: text (default), dot, json, mermaid
|
||||
--output FILE Output file (default: stdout)
|
||||
--show-details Show file-level dependencies
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModuleDependency:
|
||||
"""Represents a dependency from one module to another."""
|
||||
source_module: str
|
||||
target_module: str
|
||||
source_files: list[str] = field(default_factory=list)
|
||||
import_count: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModuleInfo:
|
||||
"""Information about a module."""
|
||||
name: str
|
||||
is_core: bool
|
||||
dependencies: list[str] = field(default_factory=list)
|
||||
dependents: list[str] = field(default_factory=list)
|
||||
file_count: int = 0
|
||||
|
||||
|
||||
class ModuleDependencyAnalyzer:
|
||||
"""Analyzes module dependencies in the codebase."""
|
||||
|
||||
# Core modules (always enabled)
|
||||
CORE_MODULES = {
|
||||
"contracts",
|
||||
"core",
|
||||
"tenancy",
|
||||
"cms",
|
||||
"customers",
|
||||
"billing",
|
||||
"payments",
|
||||
"messaging",
|
||||
}
|
||||
|
||||
# Optional modules (can be enabled/disabled per platform)
|
||||
OPTIONAL_MODULES = {
|
||||
"analytics",
|
||||
"cart",
|
||||
"catalog",
|
||||
"checkout",
|
||||
"inventory",
|
||||
"loyalty",
|
||||
"marketplace",
|
||||
"orders",
|
||||
"monitoring",
|
||||
}
|
||||
|
||||
def __init__(self, project_root: Path):
|
||||
self.project_root = project_root
|
||||
self.modules_path = project_root / "app" / "modules"
|
||||
self.dependencies: dict[str, dict[str, ModuleDependency]] = defaultdict(dict)
|
||||
self.modules: dict[str, ModuleInfo] = {}
|
||||
|
||||
def analyze(self) -> dict[str, ModuleInfo]:
|
||||
"""Analyze all module dependencies."""
|
||||
if not self.modules_path.exists():
|
||||
print(f"Error: Modules path not found: {self.modules_path}", file=sys.stderr)
|
||||
return {}
|
||||
|
||||
# First pass: discover modules
|
||||
module_dirs = [
|
||||
d for d in self.modules_path.iterdir()
|
||||
if d.is_dir() and d.name != "__pycache__" and not d.name.startswith(".")
|
||||
]
|
||||
|
||||
for module_dir in module_dirs:
|
||||
module_name = module_dir.name
|
||||
is_core = module_name in self.CORE_MODULES
|
||||
self.modules[module_name] = ModuleInfo(
|
||||
name=module_name,
|
||||
is_core=is_core,
|
||||
)
|
||||
|
||||
# Second pass: analyze imports
|
||||
for module_dir in module_dirs:
|
||||
module_name = module_dir.name
|
||||
self._analyze_module(module_dir, module_name)
|
||||
|
||||
# Calculate dependents
|
||||
for module_name, module_info in self.modules.items():
|
||||
for dep in self.dependencies.get(module_name, {}).values():
|
||||
module_info.dependencies.append(dep.target_module)
|
||||
if dep.target_module in self.modules:
|
||||
self.modules[dep.target_module].dependents.append(module_name)
|
||||
|
||||
return self.modules
|
||||
|
||||
def _analyze_module(self, module_dir: Path, module_name: str):
|
||||
"""Analyze a single module's dependencies."""
|
||||
file_count = 0
|
||||
for py_file in module_dir.rglob("*.py"):
|
||||
if "__pycache__" in str(py_file):
|
||||
continue
|
||||
|
||||
file_count += 1
|
||||
try:
|
||||
content = py_file.read_text()
|
||||
self._analyze_file(py_file, content, module_name)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not analyze {py_file}: {e}", file=sys.stderr)
|
||||
|
||||
if module_name in self.modules:
|
||||
self.modules[module_name].file_count = file_count
|
||||
|
||||
def _analyze_file(self, file_path: Path, content: str, source_module: str):
|
||||
"""Analyze imports in a single file."""
|
||||
lines = content.split("\n")
|
||||
in_docstring = False
|
||||
docstring_delimiter = None
|
||||
in_type_checking = False
|
||||
type_checking_indent = 0
|
||||
|
||||
for i, line in enumerate(lines, 1):
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#") or not stripped:
|
||||
continue
|
||||
|
||||
# Track docstrings
|
||||
if '"""' in line or "'''" in line:
|
||||
for delim in ['"""', "'''"]:
|
||||
count = line.count(delim)
|
||||
if count == 1:
|
||||
if not in_docstring:
|
||||
in_docstring = True
|
||||
docstring_delimiter = delim
|
||||
elif docstring_delimiter == delim:
|
||||
in_docstring = False
|
||||
docstring_delimiter = None
|
||||
|
||||
if in_docstring:
|
||||
continue
|
||||
|
||||
# Track TYPE_CHECKING blocks
|
||||
current_indent = len(line) - len(line.lstrip())
|
||||
if "if TYPE_CHECKING:" in stripped or "if TYPE_CHECKING" in stripped:
|
||||
in_type_checking = True
|
||||
type_checking_indent = current_indent
|
||||
continue
|
||||
if in_type_checking and current_indent <= type_checking_indent and stripped:
|
||||
in_type_checking = False
|
||||
|
||||
# Look for import statements
|
||||
import_match = re.match(
|
||||
r'^\s*(?:from\s+(app\.modules\.(\w+))|import\s+(app\.modules\.(\w+)))',
|
||||
line
|
||||
)
|
||||
|
||||
if not import_match:
|
||||
continue
|
||||
|
||||
target_module = import_match.group(2) or import_match.group(4)
|
||||
if not target_module or target_module == source_module:
|
||||
continue
|
||||
|
||||
# Skip noqa comments
|
||||
if "noqa:" in line.lower() and "import" in line.lower():
|
||||
continue
|
||||
|
||||
# Record the dependency
|
||||
if target_module not in self.dependencies[source_module]:
|
||||
self.dependencies[source_module][target_module] = ModuleDependency(
|
||||
source_module=source_module,
|
||||
target_module=target_module,
|
||||
)
|
||||
|
||||
dep = self.dependencies[source_module][target_module]
|
||||
dep.import_count += 1
|
||||
rel_path = str(file_path.relative_to(self.project_root))
|
||||
if rel_path not in dep.source_files:
|
||||
dep.source_files.append(rel_path)
|
||||
|
||||
def get_dependency_matrix(self) -> dict[str, set[str]]:
|
||||
"""Return a simple dependency matrix (module -> set of dependencies)."""
|
||||
result = {}
|
||||
for module_name in self.modules:
|
||||
deps = set()
|
||||
for dep in self.dependencies.get(module_name, {}).values():
|
||||
deps.add(dep.target_module)
|
||||
result[module_name] = deps
|
||||
return result
|
||||
|
||||
def find_circular_dependencies(self) -> list[tuple[str, str]]:
|
||||
"""Find bidirectional dependencies (A imports B and B imports A)."""
|
||||
circular = []
|
||||
checked = set()
|
||||
for source in self.dependencies:
|
||||
for target in self.dependencies.get(source, {}):
|
||||
if (target, source) not in checked:
|
||||
if source in self.dependencies.get(target, {}):
|
||||
circular.append((source, target))
|
||||
checked.add((source, target))
|
||||
return circular
|
||||
|
||||
|
||||
def format_text(analyzer: ModuleDependencyAnalyzer, show_details: bool = False) -> str:
|
||||
"""Format output as plain text."""
|
||||
lines = []
|
||||
lines.append("=" * 70)
|
||||
lines.append("MODULE DEPENDENCY GRAPH")
|
||||
lines.append("=" * 70)
|
||||
lines.append("")
|
||||
|
||||
# Separate core and optional modules
|
||||
core_modules = sorted([m for m in analyzer.modules.values() if m.is_core], key=lambda x: x.name)
|
||||
optional_modules = sorted([m for m in analyzer.modules.values() if not m.is_core], key=lambda x: x.name)
|
||||
|
||||
lines.append("CORE MODULES (always enabled):")
|
||||
lines.append("-" * 40)
|
||||
for module in core_modules:
|
||||
deps = analyzer.dependencies.get(module.name, {})
|
||||
deps_str = ", ".join(sorted(deps.keys())) if deps else "(none)"
|
||||
lines.append(f" {module.name} ({module.file_count} files)")
|
||||
lines.append(f" → depends on: {deps_str}")
|
||||
if show_details and deps:
|
||||
for dep_name, dep in sorted(deps.items()):
|
||||
lines.append(f" - {dep_name}: {dep.import_count} imports from {len(dep.source_files)} files")
|
||||
lines.append("")
|
||||
|
||||
lines.append("OPTIONAL MODULES (can be disabled):")
|
||||
lines.append("-" * 40)
|
||||
for module in optional_modules:
|
||||
deps = analyzer.dependencies.get(module.name, {})
|
||||
deps_str = ", ".join(sorted(deps.keys())) if deps else "(none)"
|
||||
lines.append(f" {module.name} ({module.file_count} files)")
|
||||
lines.append(f" → depends on: {deps_str}")
|
||||
if show_details and deps:
|
||||
for dep_name, dep in sorted(deps.items()):
|
||||
lines.append(f" - {dep_name}: {dep.import_count} imports from {len(dep.source_files)} files")
|
||||
lines.append("")
|
||||
|
||||
# Find circular dependencies
|
||||
circular = analyzer.find_circular_dependencies()
|
||||
if circular:
|
||||
lines.append("⚠️ BIDIRECTIONAL DEPENDENCIES (potential design issues):")
|
||||
lines.append("-" * 40)
|
||||
for a, b in circular:
|
||||
lines.append(f" {a} ⟷ {b}")
|
||||
lines.append("")
|
||||
|
||||
# Check for core→optional violations
|
||||
violations = []
|
||||
for source_module in analyzer.CORE_MODULES:
|
||||
deps = analyzer.dependencies.get(source_module, {})
|
||||
for target_module in deps:
|
||||
if target_module in analyzer.OPTIONAL_MODULES:
|
||||
violations.append((source_module, target_module, deps[target_module]))
|
||||
|
||||
if violations:
|
||||
lines.append("❌ ARCHITECTURE VIOLATIONS (core → optional):")
|
||||
lines.append("-" * 40)
|
||||
for source, target, dep in violations:
|
||||
lines.append(f" {source} → {target} ({dep.import_count} imports)")
|
||||
if show_details:
|
||||
for f in dep.source_files[:5]:
|
||||
lines.append(f" - {f}")
|
||||
if len(dep.source_files) > 5:
|
||||
lines.append(f" ... and {len(dep.source_files) - 5} more files")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_mermaid(analyzer: ModuleDependencyAnalyzer) -> str:
|
||||
"""Format output as Mermaid diagram."""
|
||||
lines = []
|
||||
lines.append("```mermaid")
|
||||
lines.append("flowchart TD")
|
||||
lines.append(" subgraph core[\"Core Modules\"]")
|
||||
for module in sorted(analyzer.CORE_MODULES):
|
||||
if module in analyzer.modules:
|
||||
lines.append(f" {module}[{module}]")
|
||||
lines.append(" end")
|
||||
lines.append("")
|
||||
lines.append(" subgraph optional[\"Optional Modules\"]")
|
||||
for module in sorted(analyzer.OPTIONAL_MODULES):
|
||||
if module in analyzer.modules:
|
||||
lines.append(f" {module}[{module}]")
|
||||
lines.append(" end")
|
||||
lines.append("")
|
||||
|
||||
# Add edges
|
||||
for source, deps in analyzer.dependencies.items():
|
||||
for target, dep in deps.items():
|
||||
# Style violations differently
|
||||
source_is_core = source in analyzer.CORE_MODULES
|
||||
target_is_optional = target in analyzer.OPTIONAL_MODULES
|
||||
if source_is_core and target_is_optional:
|
||||
lines.append(f" {source} -->|❌| {target}")
|
||||
else:
|
||||
lines.append(f" {source} --> {target}")
|
||||
|
||||
lines.append("```")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_dot(analyzer: ModuleDependencyAnalyzer) -> str:
|
||||
"""Format output as GraphViz DOT."""
|
||||
lines = []
|
||||
lines.append("digraph ModuleDependencies {")
|
||||
lines.append(' rankdir=LR;')
|
||||
lines.append(' node [shape=box];')
|
||||
lines.append('')
|
||||
|
||||
# Core modules cluster
|
||||
lines.append(' subgraph cluster_core {')
|
||||
lines.append(' label="Core Modules";')
|
||||
lines.append(' style=filled;')
|
||||
lines.append(' color=lightblue;')
|
||||
for module in sorted(analyzer.CORE_MODULES):
|
||||
if module in analyzer.modules:
|
||||
lines.append(f' {module};')
|
||||
lines.append(' }')
|
||||
lines.append('')
|
||||
|
||||
# Optional modules cluster
|
||||
lines.append(' subgraph cluster_optional {')
|
||||
lines.append(' label="Optional Modules";')
|
||||
lines.append(' style=filled;')
|
||||
lines.append(' color=lightyellow;')
|
||||
for module in sorted(analyzer.OPTIONAL_MODULES):
|
||||
if module in analyzer.modules:
|
||||
lines.append(f' {module};')
|
||||
lines.append(' }')
|
||||
lines.append('')
|
||||
|
||||
# Edges
|
||||
for source, deps in analyzer.dependencies.items():
|
||||
for target, dep in deps.items():
|
||||
source_is_core = source in analyzer.CORE_MODULES
|
||||
target_is_optional = target in analyzer.OPTIONAL_MODULES
|
||||
if source_is_core and target_is_optional:
|
||||
lines.append(f' {source} -> {target} [color=red, style=dashed, label="violation"];')
|
||||
else:
|
||||
lines.append(f' {source} -> {target};')
|
||||
|
||||
lines.append('}')
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_json(analyzer: ModuleDependencyAnalyzer) -> str:
|
||||
"""Format output as JSON."""
|
||||
data = {
|
||||
"modules": {},
|
||||
"dependencies": [],
|
||||
"circular_dependencies": [],
|
||||
"violations": [],
|
||||
}
|
||||
|
||||
for name, module in analyzer.modules.items():
|
||||
data["modules"][name] = {
|
||||
"name": name,
|
||||
"is_core": module.is_core,
|
||||
"file_count": module.file_count,
|
||||
"dependencies": list(analyzer.dependencies.get(name, {}).keys()),
|
||||
}
|
||||
|
||||
for source, deps in analyzer.dependencies.items():
|
||||
for target, dep in deps.items():
|
||||
data["dependencies"].append({
|
||||
"source": source,
|
||||
"target": target,
|
||||
"import_count": dep.import_count,
|
||||
"source_files": dep.source_files,
|
||||
})
|
||||
|
||||
data["circular_dependencies"] = [
|
||||
{"modules": list(pair)} for pair in analyzer.find_circular_dependencies()
|
||||
]
|
||||
|
||||
for source_module in analyzer.CORE_MODULES:
|
||||
deps = analyzer.dependencies.get(source_module, {})
|
||||
for target_module in deps:
|
||||
if target_module in analyzer.OPTIONAL_MODULES:
|
||||
data["violations"].append({
|
||||
"source": source_module,
|
||||
"target": target_module,
|
||||
"files": deps[target_module].source_files,
|
||||
})
|
||||
|
||||
return json.dumps(data, indent=2)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Generate module dependency graph"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--format",
|
||||
choices=["text", "dot", "json", "mermaid"],
|
||||
default="text",
|
||||
help="Output format (default: text)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=str,
|
||||
help="Output file (default: stdout)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--show-details",
|
||||
action="store_true",
|
||||
help="Show file-level dependency details",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--project-root",
|
||||
type=str,
|
||||
default=".",
|
||||
help="Project root directory (default: current directory)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
project_root = Path(args.project_root).resolve()
|
||||
analyzer = ModuleDependencyAnalyzer(project_root)
|
||||
analyzer.analyze()
|
||||
|
||||
# Format output
|
||||
if args.format == "text":
|
||||
output = format_text(analyzer, show_details=args.show_details)
|
||||
elif args.format == "mermaid":
|
||||
output = format_mermaid(analyzer)
|
||||
elif args.format == "dot":
|
||||
output = format_dot(analyzer)
|
||||
elif args.format == "json":
|
||||
output = format_json(analyzer)
|
||||
else:
|
||||
output = format_text(analyzer, show_details=args.show_details)
|
||||
|
||||
# Write output
|
||||
if args.output:
|
||||
Path(args.output).write_text(output)
|
||||
print(f"Output written to {args.output}")
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user