naazimsnh02's picture
Initial deployment: Autonomous AI agent for code modernization
ec4aa90
"""
Workflow Orchestrator - Integrates all phases into complete pipeline.
Phase 5: Complete end-to-end workflow with all MCP integrations.
"""
import os
import logging
import asyncio
from typing import Dict, List, Optional
from pathlib import Path
# Phase 1-2: Classification
from src.agents.classifier import CodeClassifier
from src.agents.pattern_integration import PatternMatcherIntegration
from src.utils.file_handler import FileHandler
# Phase 3: Search
from src.search.vector_store import CodeSearchEngine
# Phase 4: Analysis & Transformation
from src.agents.analyzer import CodeAnalyzer
from src.agents.transformer import CodeTransformer
# Phase 5: Testing & GitHub
from src.agents.test_generator import CodeTestGenerator
from src.sandbox.validator import ModalSandboxValidator
# Lazy import to avoid circular dependency
GitHubMCPClient = None
logger = logging.getLogger(__name__)
class ModernizationOrchestrator:
"""
Orchestrates the complete code modernization workflow.
Integrates all 5 phases into a seamless pipeline.
"""
def __init__(self, use_intelligent_matcher: bool = True):
"""Initialize orchestrator with all components."""
logger.info("Initializing ModernizationOrchestrator")
# Phase 1-2 components
self.use_intelligent_matcher = use_intelligent_matcher
if use_intelligent_matcher:
self.pattern_integration = PatternMatcherIntegration(
use_intelligent_matcher=True,
cache_dir=".pattern_cache"
)
logger.info("Using IntelligentPatternMatcher")
else:
self.classifier = CodeClassifier()
logger.info("Using legacy CodeClassifier")
self.file_handler = FileHandler()
# Phase 3 components
self.search_engine = None # Initialized per repo
# Phase 4 components
self.analyzer = CodeAnalyzer()
self.transformer = CodeTransformer()
# Phase 5 components
self.test_generator = CodeTestGenerator()
self.validator = ModalSandboxValidator()
# Lazy load GitHub client to avoid circular import
self.github_client = None
logger.info("ModernizationOrchestrator initialized successfully")
async def modernize_repository(
self,
repo_path: str,
target_version: str = "Python 3.14",
create_pr: bool = False,
repo_url: Optional[str] = None,
github_token: Optional[str] = None,
progress_callback: Optional[callable] = None
) -> Dict:
"""
Complete modernization workflow for a repository.
Args:
repo_path: Path to repository (ZIP or directory)
target_version: Target language/framework version
create_pr: Whether to create GitHub PR
repo_url: GitHub repository URL (required if create_pr=True)
github_token: GitHub personal access token (optional, uses .env if not provided)
progress_callback: Optional callback function for progress updates
Returns:
Dictionary with complete modernization results
"""
logger.info(f"Starting modernization for {repo_path}")
def update_progress(phase: str, message: str):
"""Helper to call progress callback if provided."""
if progress_callback:
progress_callback(phase, message)
results = {
"success": False,
"phases": {},
"statistics": {},
"errors": []
}
try:
# Phase 1: Extract and discover files
logger.info("Phase 1: File discovery")
update_progress("Phase 1", "Extracting and discovering files...")
if repo_path.endswith('.zip'):
extract_path = self.file_handler.extract_repo(repo_path)
else:
extract_path = repo_path
files = self.file_handler.list_code_files(extract_path)
logger.info(f"Discovered {len(files)} code files")
update_progress("Phase 1", f"Discovered {len(files)} code files")
results['phases']['discovery'] = {
"files_found": len(files),
"repo_path": extract_path
}
# Phase 2: Classify files
logger.info("Phase 2: File classification")
update_progress("Phase 2", "Classifying files with AI pattern detection...")
# Read file contents for intelligent matching
file_contents = {}
if self.use_intelligent_matcher:
logger.info("Reading file contents for intelligent pattern matching...")
for file_path in files[:50]: # Limit to 50 files for demo
try:
full_path = os.path.join(extract_path, file_path)
content = self.file_handler.read_file(full_path)
if content:
file_contents[file_path] = content
except Exception as e:
logger.warning(f"Could not read {file_path}: {e}")
classifications = self.pattern_integration.classify_files(
list(file_contents.keys()),
file_contents
)
# Get detailed statistics
analyses = self.pattern_integration.pattern_matcher.analyze_batch(file_contents)
stats = self.pattern_integration.generate_statistics(analyses)
logger.info(f"Intelligent classification: {stats['modernize_high']} high, "
f"{stats['modernize_low']} low, {stats['skip']} skip")
logger.info(f"Detected {stats['patterns_detected']} patterns across {stats['total_files']} files")
else:
classifications = self.classifier.classify_files(files)
stats = None
modernize_high = [f for f, c in classifications.items() if c == 'modernize_high']
modernize_low = [f for f, c in classifications.items() if c == 'modernize_low']
skip_files = [f for f, c in classifications.items() if c == 'skip']
logger.info(f"Classification: {len(modernize_high)} high, {len(modernize_low)} low, {len(skip_files)} skip")
results['phases']['classification'] = {
"modernize_high": len(modernize_high),
"modernize_low": len(modernize_low),
"skip": len(skip_files),
"classifications": classifications,
"intelligent_stats": stats if self.use_intelligent_matcher else None
}
# Phase 3: Semantic search and pattern grouping
logger.info("Phase 3: Semantic search")
update_progress("Phase 3", "Building semantic index with LlamaIndex...")
self.search_engine = CodeSearchEngine(persist_dir=None)
# Build index for high-priority files
files_to_modernize = modernize_high + modernize_low
if files_to_modernize:
self.search_engine.build_index(extract_path) # Build index from repo
# Find pattern groups
pattern_groups = self._find_pattern_groups(files_to_modernize[:20])
logger.info(f"Found {len(pattern_groups)} pattern groups")
results['phases']['search'] = {
"indexed_files": min(len(files_to_modernize), 100),
"pattern_groups": len(pattern_groups)
}
else:
pattern_groups = []
results['phases']['search'] = {"message": "No files to modernize"}
# Phase 4: Analysis and transformation
logger.info("Phase 4: Code transformation")
update_progress("Phase 4", "Analyzing and transforming code...")
transformations = []
# Use intelligent pattern data if available
if self.use_intelligent_matcher and file_contents:
logger.info("Using intelligent pattern analysis for transformation")
# Get prioritized files from intelligent matcher
prioritized = self.pattern_integration.pattern_matcher.prioritize_files(analyses)
# Process top priority files
files_to_transform = [
(fp, analysis) for fp, analysis in prioritized
if analysis.requires_modernization
][:10] # Limit to 10 files for demo
logger.info(f"Processing {len(files_to_transform)} high-priority files with detailed pattern data")
total_files = len(files_to_transform)
for idx, (file_path, file_analysis) in enumerate(files_to_transform, 1):
try:
update_progress("Phase 4", f"Transforming file {idx}/{total_files}: {Path(file_path).name}")
original_code = file_contents.get(file_path, "")
if not original_code:
continue
# Convert intelligent pattern analysis to transformation plan
transformation_plan = self.pattern_integration.get_transformation_plan(file_analysis)
# Transform using detailed pattern information
modernized_code = await self.transformer.transform_code(
file_path,
original_code,
transformation_plan
)
transformations.append({
"file_path": file_path,
"original_code": original_code,
"modernized_code": modernized_code,
"analysis": transformation_plan,
"patterns_addressed": [p['pattern'] for p in transformation_plan['steps']],
"pattern_details": file_analysis.patterns # Include detailed pattern info
})
except Exception as e:
logger.error(f"Error transforming {file_path}: {e}")
results['errors'].append(f"Transformation error for {file_path}: {e}")
else:
# Fallback to legacy pattern grouping
logger.info("Using legacy pattern grouping for transformation")
file_to_patterns = {}
for group in pattern_groups[:5]: # Limit to 5 groups for demo
for file_path in group['files'][:3]:
if file_path not in file_to_patterns:
file_to_patterns[file_path] = []
file_to_patterns[file_path].append(group['pattern_name'])
logger.info(f"Processing {len(file_to_patterns)} unique files")
total_files = len(file_to_patterns)
for idx, (file_path, patterns) in enumerate(file_to_patterns.items(), 1):
try:
update_progress("Phase 4", f"Transforming file {idx}/{total_files}: {Path(file_path).name}")
full_path = os.path.join(extract_path, file_path)
original_code = self.file_handler.read_file(full_path)
if not original_code:
continue
# Analyze patterns
combined_pattern = " AND ".join(patterns)
analysis = await self.analyzer.analyze_pattern(
[file_path],
combined_pattern,
{file_path: original_code}
)
# Transform file
modernized_code = await self.transformer.transform_code(
file_path,
original_code,
analysis
)
transformations.append({
"file_path": file_path,
"original_code": original_code,
"modernized_code": modernized_code,
"analysis": analysis,
"patterns_addressed": patterns
})
except Exception as e:
logger.error(f"Error transforming {file_path}: {e}")
results['errors'].append(f"Transformation error for {file_path}: {e}")
logger.info(f"Transformed {len(transformations)} files")
# Save transformed files to output directory
output_dir = Path("modernized_output")
output_dir.mkdir(exist_ok=True)
for t in transformations:
try:
# Create subdirectories if needed
output_file = output_dir / t['file_path']
output_file.parent.mkdir(parents=True, exist_ok=True)
# Save modernized code
output_file.write_text(t['modernized_code'])
logger.info(f"Saved: {output_file}")
# Also save original for comparison
original_file = output_dir / "original" / t['file_path']
original_file.parent.mkdir(parents=True, exist_ok=True)
original_file.write_text(t['original_code'])
except Exception as e:
logger.error(f"Error saving {t['file_path']}: {e}")
logger.info(f"Output saved to: {output_dir.absolute()}")
results['phases']['transformation'] = {
"files_transformed": len(transformations),
"output_directory": str(output_dir.absolute())
}
# Store transformations for zip file creation
results['transformations'] = transformations
# Phase 5: Test generation and validation
logger.info("Phase 5: Test generation and validation")
update_progress("Phase 5", "Generating tests and validating in Modal sandbox...")
validation_results = []
# Create tests directory
tests_dir = output_dir / "tests"
tests_dir.mkdir(exist_ok=True)
total_tests = min(len(transformations), 10)
for idx, t in enumerate(transformations[:10], 1): # Limit to 10 for demo
try:
# Update progress
update_progress("Phase 5", f"Testing file {idx}/{total_tests}: {Path(t['file_path']).name}")
# Generate tests
tests = self.test_generator.generate_tests(
t['original_code'],
t['modernized_code'],
t['file_path']
)
# Validate and auto-fix export issues
if tests:
from src.agents.code_validator import validate_and_fix_code
# Detect language from file extension
file_ext = Path(t['file_path']).suffix.lower()
language_map = {
'.ts': 'typescript',
'.js': 'javascript',
'.py': 'python',
'.java': 'java'
}
language = language_map.get(file_ext, 'unknown')
# Validate and fix
fixed_code, is_valid, issues = validate_and_fix_code(
t['modernized_code'],
tests,
language
)
if not is_valid:
logger.warning(f"Code validation issues for {t['file_path']}: {issues}")
if fixed_code != t['modernized_code']:
logger.info(f"Auto-fixed export issues in {t['file_path']}")
t['modernized_code'] = fixed_code
# Re-save the fixed source file
output_file = output_dir / Path(t['file_path']).name
output_file.write_text(fixed_code)
# Save test file
if tests:
test_file = tests_dir / f"test_{Path(t['file_path']).name}"
test_file.write_text(tests)
logger.info(f"Saved test: {test_file}")
# Validate in sandbox
validation = self.validator.validate_transformation(
t['original_code'],
t['modernized_code'],
tests,
file_path=t['file_path']
)
validation['file_path'] = t['file_path']
validation_results.append(validation)
except Exception as e:
logger.error(f"Error validating {t['file_path']}: {e}")
results['errors'].append(f"Validation error: {e}")
# Calculate aggregate test results
total_tests = sum(v.get('tests_run', 0) for v in validation_results)
total_passed = sum(v.get('tests_passed', 0) for v in validation_results)
# Fix: Only average coverage for files that have coverage data
coverage_values = [v.get('coverage_percent', 0) for v in validation_results if v.get('coverage_percent', 0) > 0]
avg_coverage = sum(coverage_values) / len(coverage_values) if coverage_values else 0.0
logger.info(f"Validation: {total_passed}/{total_tests} tests passed, {avg_coverage:.1f}% coverage")
results['phases']['validation'] = {
"files_validated": len(validation_results),
"total_tests": total_tests,
"tests_passed": total_passed,
"tests_failed": total_tests - total_passed,
"average_coverage": round(avg_coverage, 2),
"pass_rate": round(total_passed / max(total_tests, 1) * 100, 2)
}
# Phase 5b: GitHub PR creation (optional)
if create_pr and repo_url:
logger.info("Phase 5b: Creating GitHub PR")
# Lazy load GitHub client
if self.github_client is None:
from src.mcp.github_client import GitHubMCPClient
self.github_client = GitHubMCPClient(github_token=github_token)
# Prepare changed files
changed_files = {
t['file_path']: t['modernized_code']
for t in transformations
}
# Generate PR summary
pr_summary = self._generate_pr_summary(results, target_version)
# Create PR
pr_result = await self.github_client.create_pr(
repo_url=repo_url,
changed_files=changed_files,
pr_summary=pr_summary,
test_results=results['phases']['validation']
)
results['phases']['github_pr'] = pr_result
logger.info(f"PR creation: {pr_result.get('success', False)}")
# Calculate final statistics
results['statistics'] = {
"total_files": len(files),
"files_modernized": len(transformations),
"tests_generated": total_tests,
"test_pass_rate": round(total_passed / max(total_tests, 1) * 100, 2),
"average_coverage": round(avg_coverage, 2)
}
# Add output locations
results['output'] = {
"modernized_files": str(output_dir.absolute()),
"original_files": str((output_dir / "original").absolute()),
"test_files": str((output_dir / "tests").absolute())
}
results['success'] = True
logger.info("Modernization workflow completed successfully")
logger.info(f"📁 Modernized files: {output_dir.absolute()}")
logger.info(f"📁 Test files: {output_dir / 'tests'}")
except Exception as e:
logger.error(f"Workflow error: {e}")
results['errors'].append(f"Workflow error: {e}")
results['success'] = False
return results
def _find_pattern_groups(self, files: List[str]) -> List[Dict]:
"""
Find groups of files with similar legacy patterns.
Detects file languages and uses appropriate pattern queries.
Args:
files: List of file paths
Returns:
List of pattern group dictionaries
"""
# Detect languages present in the files
languages = self._detect_languages_in_files(files)
# Build language-specific pattern queries
pattern_queries = self._get_pattern_queries_for_languages(languages)
groups = []
for query in pattern_queries:
try:
similar_files = self.search_engine.find_similar_patterns(query, top_k=10)
if similar_files:
groups.append({
"pattern_name": query,
"files": [f['file_path'] for f in similar_files],
"similarity_scores": [f['score'] for f in similar_files]
})
except Exception as e:
logger.error(f"Error searching for pattern '{query}': {e}")
return groups
def _detect_languages_in_files(self, files: List[str]) -> set:
"""Detect programming languages from file extensions."""
extension_to_language = {
'.py': 'python',
'.java': 'java',
'.js': 'javascript',
'.ts': 'typescript',
'.jsx': 'javascript',
'.tsx': 'typescript',
'.cpp': 'cpp',
'.c': 'c',
'.h': 'c',
'.cs': 'csharp',
'.go': 'go',
'.rb': 'ruby',
'.php': 'php',
'.kt': 'kotlin',
'.scala': 'scala',
'.rs': 'rust',
'.swift': 'swift'
}
languages = set()
for file_path in files:
ext = Path(file_path).suffix.lower()
if ext in extension_to_language:
languages.add(extension_to_language[ext])
return languages if languages else {'python'} # Default to Python if no recognized extensions
def _get_pattern_queries_for_languages(self, languages: set) -> List[str]:
"""Get pattern queries appropriate for the detected languages."""
# Common patterns for all languages
common_patterns = [
"Files with SQL injection vulnerabilities",
"Files with hardcoded credentials or secrets",
"Files with security vulnerabilities",
"Files with deprecated API usage"
]
# Language-specific patterns
language_patterns = {
'python': [
"Files using deprecated database libraries like MySQLdb",
"Files using Python 2 print statements",
"Files using deprecated urllib2 library",
"Files missing type hints",
"Files using old-style string formatting"
],
'java': [
"Files using deprecated Java APIs like Vector or Hashtable",
"Files using raw JDBC without prepared statements",
"Files missing try-with-resources for AutoCloseable",
"Files using pre-Java 8 patterns without lambdas or streams",
"Files using deprecated Date and Calendar APIs",
"Files with missing null checks or Optional usage"
],
'javascript': [
"Files using var instead of let or const",
"Files using callback patterns instead of Promises or async/await",
"Files using jQuery for DOM manipulation",
"Files with eval() usage",
"Files using prototype-based inheritance"
],
'typescript': [
"Files with excessive any type usage",
"Files missing strict null checks",
"Files using old module syntax"
],
'cpp': [
"Files using raw pointers instead of smart pointers",
"Files with manual memory management",
"Files using C-style casts",
"Files missing RAII patterns"
],
'csharp': [
"Files using deprecated .NET APIs",
"Files missing async/await patterns",
"Files using old collection types"
],
'go': [
"Files missing error handling",
"Files with goroutine leaks",
"Files missing context usage"
],
'ruby': [
"Files using deprecated Ruby syntax",
"Files missing proper error handling"
],
'php': [
"Files using deprecated mysql_* functions",
"Files missing prepared statements",
"Files with register_globals usage"
]
}
queries = common_patterns.copy()
for lang in languages:
if lang in language_patterns:
queries.extend(language_patterns[lang])
return queries
def _generate_pr_summary(self, results: Dict, target_version: str) -> str:
"""Generate PR summary from results."""
stats = results['statistics']
# Build coverage line only if coverage > 0
coverage_line = ""
if stats.get('average_coverage', 0) > 0:
coverage_line = f"**Code Coverage**: {stats['average_coverage']:.1f}%\n"
summary = f"""Automated migration to {target_version} with security fixes and performance improvements.
**Files Modernized**: {stats['files_modernized']} / {stats['total_files']}
**Tests Generated**: {stats['tests_generated']}
**Test Pass Rate**: {stats['test_pass_rate']:.1f}%
{coverage_line}
This PR includes:
- Syntax modernization to {target_version}
- Security vulnerability fixes
- Deprecated library replacements
- Comprehensive test suite
- Performance optimizations
All changes have been validated in an isolated sandbox environment.
"""
return summary
def generate_report(self, results: Dict) -> str:
"""
Generate human-readable report from results.
Args:
results: Workflow results dictionary
Returns:
Formatted report string
"""
report = []
report.append("=" * 60)
report.append("LEGACY CODE MODERNIZATION REPORT")
report.append("=" * 60)
report.append("")
if results['success']:
report.append("✅ Status: SUCCESS")
else:
report.append("❌ Status: FAILED")
report.append("")
report.append("STATISTICS:")
report.append("-" * 60)
stats = results.get('statistics', {})
for key, value in stats.items():
# Skip average_coverage if it's 0
if key == 'average_coverage' and value == 0:
continue
report.append(f" {key.replace('_', ' ').title()}: {value}")
# Add intelligent pattern statistics if available
classification_data = results.get('phases', {}).get('classification', {})
intelligent_stats = classification_data.get('intelligent_stats')
if intelligent_stats:
report.append("")
report.append("INTELLIGENT PATTERN ANALYSIS:")
report.append("-" * 60)
report.append(f" Patterns Detected: {intelligent_stats.get('patterns_detected', 0)}")
report.append(f" Average Modernization Score: {intelligent_stats.get('average_modernization_score', 0)}/100")
report.append(f" Total Estimated Effort: {intelligent_stats.get('total_estimated_effort_hours', 0)}h")
severity_counts = intelligent_stats.get('severity_counts', {})
if severity_counts:
report.append(" Severity Breakdown:")
for severity, count in severity_counts.items():
if count > 0:
report.append(f" {severity.upper()}: {count}")
report.append("")
report.append("PHASE RESULTS:")
report.append("-" * 60)
for phase, data in results.get('phases', {}).items():
report.append(f"\n {phase.upper()}:")
if isinstance(data, dict):
for k, v in data.items():
if k not in ['classifications', 'intelligent_stats']: # Skip large data
report.append(f" {k}: {v}")
# Add output locations
if results.get('output'):
report.append("")
report.append("OUTPUT LOCATIONS:")
report.append("-" * 60)
for key, path in results['output'].items():
report.append(f" 📁 {key.replace('_', ' ').title()}: {path}")
if results.get('errors'):
report.append("")
report.append("ERRORS:")
report.append("-" * 60)
for error in results['errors']:
report.append(f" ⚠️ {error}")
report.append("")
report.append("=" * 60)
return "\n".join(report)