#!/usr/bin/env python3 """ Module Dependency Graph Generator Analyzes the codebase and outputs a dependency graph showing how modules depend on each other. Useful for understanding module coupling and identifying architectural issues. Usage: python scripts/module_dependency_graph.py [--format FORMAT] [--output FILE] Options: --format FORMAT Output format: text (default), dot, json, mermaid --output FILE Output file (default: stdout) --show-details Show file-level dependencies """ import argparse import json import re import sys from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path @dataclass class ModuleDependency: """Represents a dependency from one module to another.""" source_module: str target_module: str source_files: list[str] = field(default_factory=list) import_count: int = 0 @dataclass class ModuleInfo: """Information about a module.""" name: str is_core: bool dependencies: list[str] = field(default_factory=list) dependents: list[str] = field(default_factory=list) file_count: int = 0 class ModuleDependencyAnalyzer: """Analyzes module dependencies in the codebase.""" # Core modules (always enabled) CORE_MODULES = { "contracts", "core", "tenancy", "cms", "customers", "billing", "payments", "messaging", } # Optional modules (can be enabled/disabled per platform) OPTIONAL_MODULES = { "analytics", "cart", "catalog", "checkout", "inventory", "loyalty", "marketplace", "orders", "monitoring", } def __init__(self, project_root: Path): self.project_root = project_root self.modules_path = project_root / "app" / "modules" self.dependencies: dict[str, dict[str, ModuleDependency]] = defaultdict(dict) self.modules: dict[str, ModuleInfo] = {} def analyze(self) -> dict[str, ModuleInfo]: """Analyze all module dependencies.""" if not self.modules_path.exists(): print(f"Error: Modules path not found: {self.modules_path}", file=sys.stderr) return {} # First pass: discover modules module_dirs = [ d for d in self.modules_path.iterdir() if d.is_dir() and d.name != "__pycache__" and not d.name.startswith(".") ] for module_dir in module_dirs: module_name = module_dir.name is_core = module_name in self.CORE_MODULES self.modules[module_name] = ModuleInfo( name=module_name, is_core=is_core, ) # Second pass: analyze imports for module_dir in module_dirs: module_name = module_dir.name self._analyze_module(module_dir, module_name) # Calculate dependents for module_name, module_info in self.modules.items(): for dep in self.dependencies.get(module_name, {}).values(): module_info.dependencies.append(dep.target_module) if dep.target_module in self.modules: self.modules[dep.target_module].dependents.append(module_name) return self.modules def _analyze_module(self, module_dir: Path, module_name: str): """Analyze a single module's dependencies.""" file_count = 0 for py_file in module_dir.rglob("*.py"): if "__pycache__" in str(py_file): continue file_count += 1 try: content = py_file.read_text() self._analyze_file(py_file, content, module_name) except Exception as e: print(f"Warning: Could not analyze {py_file}: {e}", file=sys.stderr) if module_name in self.modules: self.modules[module_name].file_count = file_count def _analyze_file(self, file_path: Path, content: str, source_module: str): """Analyze imports in a single file.""" lines = content.split("\n") in_docstring = False docstring_delimiter = None in_type_checking = False type_checking_indent = 0 for _i, line in enumerate(lines, 1): stripped = line.strip() if stripped.startswith("#") or not stripped: continue # Track docstrings if '"""' in line or "'''" in line: for delim in ['"""', "'''"]: count = line.count(delim) if count == 1: if not in_docstring: in_docstring = True docstring_delimiter = delim elif docstring_delimiter == delim: in_docstring = False docstring_delimiter = None if in_docstring: continue # Track TYPE_CHECKING blocks current_indent = len(line) - len(line.lstrip()) if "if TYPE_CHECKING:" in stripped or "if TYPE_CHECKING" in stripped: in_type_checking = True type_checking_indent = current_indent continue if in_type_checking and current_indent <= type_checking_indent and stripped: in_type_checking = False # Look for import statements import_match = re.match( r"^\s*(?:from\s+(app\.modules\.(\w+))|import\s+(app\.modules\.(\w+)))", line ) if not import_match: continue target_module = import_match.group(2) or import_match.group(4) if not target_module or target_module == source_module: continue # Skip noqa comments if "noqa:" in line.lower() and "import" in line.lower(): continue # Record the dependency if target_module not in self.dependencies[source_module]: self.dependencies[source_module][target_module] = ModuleDependency( source_module=source_module, target_module=target_module, ) dep = self.dependencies[source_module][target_module] dep.import_count += 1 rel_path = str(file_path.relative_to(self.project_root)) if rel_path not in dep.source_files: dep.source_files.append(rel_path) def get_dependency_matrix(self) -> dict[str, set[str]]: """Return a simple dependency matrix (module -> set of dependencies).""" result = {} for module_name in self.modules: deps = set() for dep in self.dependencies.get(module_name, {}).values(): deps.add(dep.target_module) result[module_name] = deps return result def find_circular_dependencies(self) -> list[tuple[str, str]]: """Find bidirectional dependencies (A imports B and B imports A).""" circular = [] checked = set() for source in self.dependencies: for target in self.dependencies.get(source, {}): if (target, source) not in checked: if source in self.dependencies.get(target, {}): circular.append((source, target)) checked.add((source, target)) return circular def format_text(analyzer: ModuleDependencyAnalyzer, show_details: bool = False) -> str: """Format output as plain text.""" lines = [] lines.append("=" * 70) lines.append("MODULE DEPENDENCY GRAPH") lines.append("=" * 70) lines.append("") # Separate core and optional modules core_modules = sorted([m for m in analyzer.modules.values() if m.is_core], key=lambda x: x.name) optional_modules = sorted([m for m in analyzer.modules.values() if not m.is_core], key=lambda x: x.name) lines.append("CORE MODULES (always enabled):") lines.append("-" * 40) for module in core_modules: deps = analyzer.dependencies.get(module.name, {}) deps_str = ", ".join(sorted(deps.keys())) if deps else "(none)" lines.append(f" {module.name} ({module.file_count} files)") lines.append(f" → depends on: {deps_str}") if show_details and deps: for dep_name, dep in sorted(deps.items()): lines.append(f" - {dep_name}: {dep.import_count} imports from {len(dep.source_files)} files") lines.append("") lines.append("OPTIONAL MODULES (can be disabled):") lines.append("-" * 40) for module in optional_modules: deps = analyzer.dependencies.get(module.name, {}) deps_str = ", ".join(sorted(deps.keys())) if deps else "(none)" lines.append(f" {module.name} ({module.file_count} files)") lines.append(f" → depends on: {deps_str}") if show_details and deps: for dep_name, dep in sorted(deps.items()): lines.append(f" - {dep_name}: {dep.import_count} imports from {len(dep.source_files)} files") lines.append("") # Find circular dependencies circular = analyzer.find_circular_dependencies() if circular: lines.append("⚠️ BIDIRECTIONAL DEPENDENCIES (potential design issues):") lines.append("-" * 40) for a, b in circular: lines.append(f" {a} ⟷ {b}") lines.append("") # Check for core→optional violations violations = [] for source_module in analyzer.CORE_MODULES: deps = analyzer.dependencies.get(source_module, {}) for target_module in deps: if target_module in analyzer.OPTIONAL_MODULES: violations.append((source_module, target_module, deps[target_module])) if violations: lines.append("❌ ARCHITECTURE VIOLATIONS (core → optional):") lines.append("-" * 40) for source, target, dep in violations: lines.append(f" {source} → {target} ({dep.import_count} imports)") if show_details: for f in dep.source_files[:5]: lines.append(f" - {f}") if len(dep.source_files) > 5: lines.append(f" ... and {len(dep.source_files) - 5} more files") lines.append("") return "\n".join(lines) def format_mermaid(analyzer: ModuleDependencyAnalyzer) -> str: """Format output as Mermaid diagram.""" lines = [] lines.append("```mermaid") lines.append("flowchart TD") lines.append(' subgraph core["Core Modules"]') for module in sorted(analyzer.CORE_MODULES): if module in analyzer.modules: lines.append(f" {module}[{module}]") lines.append(" end") lines.append("") lines.append(' subgraph optional["Optional Modules"]') for module in sorted(analyzer.OPTIONAL_MODULES): if module in analyzer.modules: lines.append(f" {module}[{module}]") lines.append(" end") lines.append("") # Add edges for source, deps in analyzer.dependencies.items(): for target, _dep in deps.items(): # Style violations differently source_is_core = source in analyzer.CORE_MODULES target_is_optional = target in analyzer.OPTIONAL_MODULES if source_is_core and target_is_optional: lines.append(f" {source} -->|❌| {target}") else: lines.append(f" {source} --> {target}") lines.append("```") return "\n".join(lines) def format_dot(analyzer: ModuleDependencyAnalyzer) -> str: """Format output as GraphViz DOT.""" lines = [] lines.append("digraph ModuleDependencies {") lines.append(" rankdir=LR;") lines.append(" node [shape=box];") lines.append("") # Core modules cluster lines.append(" subgraph cluster_core {") lines.append(' label="Core Modules";') lines.append(" style=filled;") lines.append(" color=lightblue;") for module in sorted(analyzer.CORE_MODULES): if module in analyzer.modules: lines.append(f" {module};") lines.append(" }") lines.append("") # Optional modules cluster lines.append(" subgraph cluster_optional {") lines.append(' label="Optional Modules";') lines.append(" style=filled;") lines.append(" color=lightyellow;") for module in sorted(analyzer.OPTIONAL_MODULES): if module in analyzer.modules: lines.append(f" {module};") lines.append(" }") lines.append("") # Edges for source, deps in analyzer.dependencies.items(): for target, _dep in deps.items(): source_is_core = source in analyzer.CORE_MODULES target_is_optional = target in analyzer.OPTIONAL_MODULES if source_is_core and target_is_optional: lines.append(f' {source} -> {target} [color=red, style=dashed, label="violation"];') else: lines.append(f" {source} -> {target};") lines.append("}") return "\n".join(lines) def format_json(analyzer: ModuleDependencyAnalyzer) -> str: """Format output as JSON.""" data = { "modules": {}, "dependencies": [], "circular_dependencies": [], "violations": [], } for name, module in analyzer.modules.items(): data["modules"][name] = { "name": name, "is_core": module.is_core, "file_count": module.file_count, "dependencies": list(analyzer.dependencies.get(name, {}).keys()), } for source, deps in analyzer.dependencies.items(): for target, dep in deps.items(): data["dependencies"].append({ "source": source, "target": target, "import_count": dep.import_count, "source_files": dep.source_files, }) data["circular_dependencies"] = [ {"modules": list(pair)} for pair in analyzer.find_circular_dependencies() ] for source_module in analyzer.CORE_MODULES: deps = analyzer.dependencies.get(source_module, {}) for target_module in deps: if target_module in analyzer.OPTIONAL_MODULES: data["violations"].append({ "source": source_module, "target": target_module, "files": deps[target_module].source_files, }) return json.dumps(data, indent=2) def main(): parser = argparse.ArgumentParser( description="Generate module dependency graph" ) parser.add_argument( "--format", choices=["text", "dot", "json", "mermaid"], default="text", help="Output format (default: text)", ) parser.add_argument( "--output", type=str, help="Output file (default: stdout)", ) parser.add_argument( "--show-details", action="store_true", help="Show file-level dependency details", ) parser.add_argument( "--project-root", type=str, default=".", help="Project root directory (default: current directory)", ) args = parser.parse_args() project_root = Path(args.project_root).resolve() analyzer = ModuleDependencyAnalyzer(project_root) analyzer.analyze() # Format output if args.format == "text": output = format_text(analyzer, show_details=args.show_details) elif args.format == "mermaid": output = format_mermaid(analyzer) elif args.format == "dot": output = format_dot(analyzer) elif args.format == "json": output = format_json(analyzer) else: output = format_text(analyzer, show_details=args.show_details) # Write output if args.output: Path(args.output).write_text(output) print(f"Output written to {args.output}") else: print(output) if __name__ == "__main__": main()