|
|
""" |
|
|
Workflow Orchestrator - Integrates all phases into complete pipeline. |
|
|
Phase 5: Complete end-to-end workflow with all MCP integrations. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import logging |
|
|
import asyncio |
|
|
from typing import Dict, List, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
from src.agents.classifier import CodeClassifier |
|
|
from src.agents.pattern_integration import PatternMatcherIntegration |
|
|
from src.utils.file_handler import FileHandler |
|
|
|
|
|
|
|
|
from src.search.vector_store import CodeSearchEngine |
|
|
|
|
|
|
|
|
from src.agents.analyzer import CodeAnalyzer |
|
|
from src.agents.transformer import CodeTransformer |
|
|
|
|
|
|
|
|
from src.agents.test_generator import CodeTestGenerator |
|
|
from src.sandbox.validator import ModalSandboxValidator |
|
|
|
|
|
|
|
|
GitHubMCPClient = None |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ModernizationOrchestrator: |
|
|
""" |
|
|
Orchestrates the complete code modernization workflow. |
|
|
Integrates all 5 phases into a seamless pipeline. |
|
|
""" |
|
|
|
|
|
def __init__(self, use_intelligent_matcher: bool = True): |
|
|
"""Initialize orchestrator with all components.""" |
|
|
logger.info("Initializing ModernizationOrchestrator") |
|
|
|
|
|
|
|
|
self.use_intelligent_matcher = use_intelligent_matcher |
|
|
if use_intelligent_matcher: |
|
|
self.pattern_integration = PatternMatcherIntegration( |
|
|
use_intelligent_matcher=True, |
|
|
cache_dir=".pattern_cache" |
|
|
) |
|
|
logger.info("Using IntelligentPatternMatcher") |
|
|
else: |
|
|
self.classifier = CodeClassifier() |
|
|
logger.info("Using legacy CodeClassifier") |
|
|
|
|
|
self.file_handler = FileHandler() |
|
|
|
|
|
|
|
|
self.search_engine = None |
|
|
|
|
|
|
|
|
self.analyzer = CodeAnalyzer() |
|
|
self.transformer = CodeTransformer() |
|
|
|
|
|
|
|
|
self.test_generator = CodeTestGenerator() |
|
|
self.validator = ModalSandboxValidator() |
|
|
|
|
|
|
|
|
self.github_client = None |
|
|
|
|
|
logger.info("ModernizationOrchestrator initialized successfully") |
|
|
|
|
|
async def modernize_repository( |
|
|
self, |
|
|
repo_path: str, |
|
|
target_version: str = "Python 3.14", |
|
|
create_pr: bool = False, |
|
|
repo_url: Optional[str] = None, |
|
|
github_token: Optional[str] = None, |
|
|
progress_callback: Optional[callable] = None |
|
|
) -> Dict: |
|
|
""" |
|
|
Complete modernization workflow for a repository. |
|
|
|
|
|
Args: |
|
|
repo_path: Path to repository (ZIP or directory) |
|
|
target_version: Target language/framework version |
|
|
create_pr: Whether to create GitHub PR |
|
|
repo_url: GitHub repository URL (required if create_pr=True) |
|
|
github_token: GitHub personal access token (optional, uses .env if not provided) |
|
|
progress_callback: Optional callback function for progress updates |
|
|
|
|
|
Returns: |
|
|
Dictionary with complete modernization results |
|
|
""" |
|
|
logger.info(f"Starting modernization for {repo_path}") |
|
|
|
|
|
def update_progress(phase: str, message: str): |
|
|
"""Helper to call progress callback if provided.""" |
|
|
if progress_callback: |
|
|
progress_callback(phase, message) |
|
|
|
|
|
results = { |
|
|
"success": False, |
|
|
"phases": {}, |
|
|
"statistics": {}, |
|
|
"errors": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Phase 1: File discovery") |
|
|
update_progress("Phase 1", "Extracting and discovering files...") |
|
|
|
|
|
if repo_path.endswith('.zip'): |
|
|
extract_path = self.file_handler.extract_repo(repo_path) |
|
|
else: |
|
|
extract_path = repo_path |
|
|
|
|
|
files = self.file_handler.list_code_files(extract_path) |
|
|
logger.info(f"Discovered {len(files)} code files") |
|
|
update_progress("Phase 1", f"Discovered {len(files)} code files") |
|
|
|
|
|
results['phases']['discovery'] = { |
|
|
"files_found": len(files), |
|
|
"repo_path": extract_path |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Phase 2: File classification") |
|
|
update_progress("Phase 2", "Classifying files with AI pattern detection...") |
|
|
|
|
|
|
|
|
file_contents = {} |
|
|
if self.use_intelligent_matcher: |
|
|
logger.info("Reading file contents for intelligent pattern matching...") |
|
|
for file_path in files[:50]: |
|
|
try: |
|
|
full_path = os.path.join(extract_path, file_path) |
|
|
content = self.file_handler.read_file(full_path) |
|
|
if content: |
|
|
file_contents[file_path] = content |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not read {file_path}: {e}") |
|
|
|
|
|
classifications = self.pattern_integration.classify_files( |
|
|
list(file_contents.keys()), |
|
|
file_contents |
|
|
) |
|
|
|
|
|
|
|
|
analyses = self.pattern_integration.pattern_matcher.analyze_batch(file_contents) |
|
|
stats = self.pattern_integration.generate_statistics(analyses) |
|
|
|
|
|
logger.info(f"Intelligent classification: {stats['modernize_high']} high, " |
|
|
f"{stats['modernize_low']} low, {stats['skip']} skip") |
|
|
logger.info(f"Detected {stats['patterns_detected']} patterns across {stats['total_files']} files") |
|
|
else: |
|
|
classifications = self.classifier.classify_files(files) |
|
|
stats = None |
|
|
|
|
|
modernize_high = [f for f, c in classifications.items() if c == 'modernize_high'] |
|
|
modernize_low = [f for f, c in classifications.items() if c == 'modernize_low'] |
|
|
skip_files = [f for f, c in classifications.items() if c == 'skip'] |
|
|
|
|
|
logger.info(f"Classification: {len(modernize_high)} high, {len(modernize_low)} low, {len(skip_files)} skip") |
|
|
|
|
|
results['phases']['classification'] = { |
|
|
"modernize_high": len(modernize_high), |
|
|
"modernize_low": len(modernize_low), |
|
|
"skip": len(skip_files), |
|
|
"classifications": classifications, |
|
|
"intelligent_stats": stats if self.use_intelligent_matcher else None |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Phase 3: Semantic search") |
|
|
update_progress("Phase 3", "Building semantic index with LlamaIndex...") |
|
|
|
|
|
self.search_engine = CodeSearchEngine(persist_dir=None) |
|
|
|
|
|
|
|
|
files_to_modernize = modernize_high + modernize_low |
|
|
if files_to_modernize: |
|
|
self.search_engine.build_index(extract_path) |
|
|
|
|
|
|
|
|
pattern_groups = self._find_pattern_groups(files_to_modernize[:20]) |
|
|
logger.info(f"Found {len(pattern_groups)} pattern groups") |
|
|
|
|
|
results['phases']['search'] = { |
|
|
"indexed_files": min(len(files_to_modernize), 100), |
|
|
"pattern_groups": len(pattern_groups) |
|
|
} |
|
|
else: |
|
|
pattern_groups = [] |
|
|
results['phases']['search'] = {"message": "No files to modernize"} |
|
|
|
|
|
|
|
|
logger.info("Phase 4: Code transformation") |
|
|
update_progress("Phase 4", "Analyzing and transforming code...") |
|
|
|
|
|
transformations = [] |
|
|
|
|
|
|
|
|
if self.use_intelligent_matcher and file_contents: |
|
|
logger.info("Using intelligent pattern analysis for transformation") |
|
|
|
|
|
|
|
|
prioritized = self.pattern_integration.pattern_matcher.prioritize_files(analyses) |
|
|
|
|
|
|
|
|
files_to_transform = [ |
|
|
(fp, analysis) for fp, analysis in prioritized |
|
|
if analysis.requires_modernization |
|
|
][:10] |
|
|
|
|
|
logger.info(f"Processing {len(files_to_transform)} high-priority files with detailed pattern data") |
|
|
|
|
|
total_files = len(files_to_transform) |
|
|
for idx, (file_path, file_analysis) in enumerate(files_to_transform, 1): |
|
|
try: |
|
|
update_progress("Phase 4", f"Transforming file {idx}/{total_files}: {Path(file_path).name}") |
|
|
|
|
|
original_code = file_contents.get(file_path, "") |
|
|
if not original_code: |
|
|
continue |
|
|
|
|
|
|
|
|
transformation_plan = self.pattern_integration.get_transformation_plan(file_analysis) |
|
|
|
|
|
|
|
|
modernized_code = await self.transformer.transform_code( |
|
|
file_path, |
|
|
original_code, |
|
|
transformation_plan |
|
|
) |
|
|
|
|
|
transformations.append({ |
|
|
"file_path": file_path, |
|
|
"original_code": original_code, |
|
|
"modernized_code": modernized_code, |
|
|
"analysis": transformation_plan, |
|
|
"patterns_addressed": [p['pattern'] for p in transformation_plan['steps']], |
|
|
"pattern_details": file_analysis.patterns |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error transforming {file_path}: {e}") |
|
|
results['errors'].append(f"Transformation error for {file_path}: {e}") |
|
|
else: |
|
|
|
|
|
logger.info("Using legacy pattern grouping for transformation") |
|
|
|
|
|
file_to_patterns = {} |
|
|
for group in pattern_groups[:5]: |
|
|
for file_path in group['files'][:3]: |
|
|
if file_path not in file_to_patterns: |
|
|
file_to_patterns[file_path] = [] |
|
|
file_to_patterns[file_path].append(group['pattern_name']) |
|
|
|
|
|
logger.info(f"Processing {len(file_to_patterns)} unique files") |
|
|
|
|
|
total_files = len(file_to_patterns) |
|
|
for idx, (file_path, patterns) in enumerate(file_to_patterns.items(), 1): |
|
|
try: |
|
|
update_progress("Phase 4", f"Transforming file {idx}/{total_files}: {Path(file_path).name}") |
|
|
|
|
|
full_path = os.path.join(extract_path, file_path) |
|
|
original_code = self.file_handler.read_file(full_path) |
|
|
|
|
|
if not original_code: |
|
|
continue |
|
|
|
|
|
|
|
|
combined_pattern = " AND ".join(patterns) |
|
|
analysis = await self.analyzer.analyze_pattern( |
|
|
[file_path], |
|
|
combined_pattern, |
|
|
{file_path: original_code} |
|
|
) |
|
|
|
|
|
|
|
|
modernized_code = await self.transformer.transform_code( |
|
|
file_path, |
|
|
original_code, |
|
|
analysis |
|
|
) |
|
|
|
|
|
transformations.append({ |
|
|
"file_path": file_path, |
|
|
"original_code": original_code, |
|
|
"modernized_code": modernized_code, |
|
|
"analysis": analysis, |
|
|
"patterns_addressed": patterns |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error transforming {file_path}: {e}") |
|
|
results['errors'].append(f"Transformation error for {file_path}: {e}") |
|
|
|
|
|
logger.info(f"Transformed {len(transformations)} files") |
|
|
|
|
|
|
|
|
output_dir = Path("modernized_output") |
|
|
output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
for t in transformations: |
|
|
try: |
|
|
|
|
|
output_file = output_dir / t['file_path'] |
|
|
output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
output_file.write_text(t['modernized_code']) |
|
|
logger.info(f"Saved: {output_file}") |
|
|
|
|
|
|
|
|
original_file = output_dir / "original" / t['file_path'] |
|
|
original_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
original_file.write_text(t['original_code']) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving {t['file_path']}: {e}") |
|
|
|
|
|
logger.info(f"Output saved to: {output_dir.absolute()}") |
|
|
|
|
|
results['phases']['transformation'] = { |
|
|
"files_transformed": len(transformations), |
|
|
"output_directory": str(output_dir.absolute()) |
|
|
} |
|
|
|
|
|
|
|
|
results['transformations'] = transformations |
|
|
|
|
|
|
|
|
logger.info("Phase 5: Test generation and validation") |
|
|
update_progress("Phase 5", "Generating tests and validating in Modal sandbox...") |
|
|
|
|
|
validation_results = [] |
|
|
|
|
|
|
|
|
tests_dir = output_dir / "tests" |
|
|
tests_dir.mkdir(exist_ok=True) |
|
|
|
|
|
total_tests = min(len(transformations), 10) |
|
|
for idx, t in enumerate(transformations[:10], 1): |
|
|
try: |
|
|
|
|
|
update_progress("Phase 5", f"Testing file {idx}/{total_tests}: {Path(t['file_path']).name}") |
|
|
|
|
|
|
|
|
tests = self.test_generator.generate_tests( |
|
|
t['original_code'], |
|
|
t['modernized_code'], |
|
|
t['file_path'] |
|
|
) |
|
|
|
|
|
|
|
|
if tests: |
|
|
from src.agents.code_validator import validate_and_fix_code |
|
|
|
|
|
|
|
|
file_ext = Path(t['file_path']).suffix.lower() |
|
|
language_map = { |
|
|
'.ts': 'typescript', |
|
|
'.js': 'javascript', |
|
|
'.py': 'python', |
|
|
'.java': 'java' |
|
|
} |
|
|
language = language_map.get(file_ext, 'unknown') |
|
|
|
|
|
|
|
|
fixed_code, is_valid, issues = validate_and_fix_code( |
|
|
t['modernized_code'], |
|
|
tests, |
|
|
language |
|
|
) |
|
|
|
|
|
if not is_valid: |
|
|
logger.warning(f"Code validation issues for {t['file_path']}: {issues}") |
|
|
|
|
|
if fixed_code != t['modernized_code']: |
|
|
logger.info(f"Auto-fixed export issues in {t['file_path']}") |
|
|
t['modernized_code'] = fixed_code |
|
|
|
|
|
|
|
|
output_file = output_dir / Path(t['file_path']).name |
|
|
output_file.write_text(fixed_code) |
|
|
|
|
|
|
|
|
if tests: |
|
|
test_file = tests_dir / f"test_{Path(t['file_path']).name}" |
|
|
test_file.write_text(tests) |
|
|
logger.info(f"Saved test: {test_file}") |
|
|
|
|
|
|
|
|
validation = self.validator.validate_transformation( |
|
|
t['original_code'], |
|
|
t['modernized_code'], |
|
|
tests, |
|
|
file_path=t['file_path'] |
|
|
) |
|
|
|
|
|
validation['file_path'] = t['file_path'] |
|
|
validation_results.append(validation) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error validating {t['file_path']}: {e}") |
|
|
results['errors'].append(f"Validation error: {e}") |
|
|
|
|
|
|
|
|
total_tests = sum(v.get('tests_run', 0) for v in validation_results) |
|
|
total_passed = sum(v.get('tests_passed', 0) for v in validation_results) |
|
|
|
|
|
coverage_values = [v.get('coverage_percent', 0) for v in validation_results if v.get('coverage_percent', 0) > 0] |
|
|
avg_coverage = sum(coverage_values) / len(coverage_values) if coverage_values else 0.0 |
|
|
|
|
|
logger.info(f"Validation: {total_passed}/{total_tests} tests passed, {avg_coverage:.1f}% coverage") |
|
|
|
|
|
results['phases']['validation'] = { |
|
|
"files_validated": len(validation_results), |
|
|
"total_tests": total_tests, |
|
|
"tests_passed": total_passed, |
|
|
"tests_failed": total_tests - total_passed, |
|
|
"average_coverage": round(avg_coverage, 2), |
|
|
"pass_rate": round(total_passed / max(total_tests, 1) * 100, 2) |
|
|
} |
|
|
|
|
|
|
|
|
if create_pr and repo_url: |
|
|
logger.info("Phase 5b: Creating GitHub PR") |
|
|
|
|
|
|
|
|
if self.github_client is None: |
|
|
from src.mcp.github_client import GitHubMCPClient |
|
|
self.github_client = GitHubMCPClient(github_token=github_token) |
|
|
|
|
|
|
|
|
changed_files = { |
|
|
t['file_path']: t['modernized_code'] |
|
|
for t in transformations |
|
|
} |
|
|
|
|
|
|
|
|
pr_summary = self._generate_pr_summary(results, target_version) |
|
|
|
|
|
|
|
|
pr_result = await self.github_client.create_pr( |
|
|
repo_url=repo_url, |
|
|
changed_files=changed_files, |
|
|
pr_summary=pr_summary, |
|
|
test_results=results['phases']['validation'] |
|
|
) |
|
|
|
|
|
results['phases']['github_pr'] = pr_result |
|
|
logger.info(f"PR creation: {pr_result.get('success', False)}") |
|
|
|
|
|
|
|
|
results['statistics'] = { |
|
|
"total_files": len(files), |
|
|
"files_modernized": len(transformations), |
|
|
"tests_generated": total_tests, |
|
|
"test_pass_rate": round(total_passed / max(total_tests, 1) * 100, 2), |
|
|
"average_coverage": round(avg_coverage, 2) |
|
|
} |
|
|
|
|
|
|
|
|
results['output'] = { |
|
|
"modernized_files": str(output_dir.absolute()), |
|
|
"original_files": str((output_dir / "original").absolute()), |
|
|
"test_files": str((output_dir / "tests").absolute()) |
|
|
} |
|
|
|
|
|
results['success'] = True |
|
|
logger.info("Modernization workflow completed successfully") |
|
|
logger.info(f"📁 Modernized files: {output_dir.absolute()}") |
|
|
logger.info(f"📁 Test files: {output_dir / 'tests'}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Workflow error: {e}") |
|
|
results['errors'].append(f"Workflow error: {e}") |
|
|
results['success'] = False |
|
|
|
|
|
return results |
|
|
|
|
|
def _find_pattern_groups(self, files: List[str]) -> List[Dict]: |
|
|
""" |
|
|
Find groups of files with similar legacy patterns. |
|
|
Detects file languages and uses appropriate pattern queries. |
|
|
|
|
|
Args: |
|
|
files: List of file paths |
|
|
|
|
|
Returns: |
|
|
List of pattern group dictionaries |
|
|
""" |
|
|
|
|
|
languages = self._detect_languages_in_files(files) |
|
|
|
|
|
|
|
|
pattern_queries = self._get_pattern_queries_for_languages(languages) |
|
|
|
|
|
groups = [] |
|
|
|
|
|
for query in pattern_queries: |
|
|
try: |
|
|
similar_files = self.search_engine.find_similar_patterns(query, top_k=10) |
|
|
|
|
|
if similar_files: |
|
|
groups.append({ |
|
|
"pattern_name": query, |
|
|
"files": [f['file_path'] for f in similar_files], |
|
|
"similarity_scores": [f['score'] for f in similar_files] |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error searching for pattern '{query}': {e}") |
|
|
|
|
|
return groups |
|
|
|
|
|
def _detect_languages_in_files(self, files: List[str]) -> set: |
|
|
"""Detect programming languages from file extensions.""" |
|
|
extension_to_language = { |
|
|
'.py': 'python', |
|
|
'.java': 'java', |
|
|
'.js': 'javascript', |
|
|
'.ts': 'typescript', |
|
|
'.jsx': 'javascript', |
|
|
'.tsx': 'typescript', |
|
|
'.cpp': 'cpp', |
|
|
'.c': 'c', |
|
|
'.h': 'c', |
|
|
'.cs': 'csharp', |
|
|
'.go': 'go', |
|
|
'.rb': 'ruby', |
|
|
'.php': 'php', |
|
|
'.kt': 'kotlin', |
|
|
'.scala': 'scala', |
|
|
'.rs': 'rust', |
|
|
'.swift': 'swift' |
|
|
} |
|
|
|
|
|
languages = set() |
|
|
for file_path in files: |
|
|
ext = Path(file_path).suffix.lower() |
|
|
if ext in extension_to_language: |
|
|
languages.add(extension_to_language[ext]) |
|
|
|
|
|
return languages if languages else {'python'} |
|
|
|
|
|
def _get_pattern_queries_for_languages(self, languages: set) -> List[str]: |
|
|
"""Get pattern queries appropriate for the detected languages.""" |
|
|
|
|
|
common_patterns = [ |
|
|
"Files with SQL injection vulnerabilities", |
|
|
"Files with hardcoded credentials or secrets", |
|
|
"Files with security vulnerabilities", |
|
|
"Files with deprecated API usage" |
|
|
] |
|
|
|
|
|
|
|
|
language_patterns = { |
|
|
'python': [ |
|
|
"Files using deprecated database libraries like MySQLdb", |
|
|
"Files using Python 2 print statements", |
|
|
"Files using deprecated urllib2 library", |
|
|
"Files missing type hints", |
|
|
"Files using old-style string formatting" |
|
|
], |
|
|
'java': [ |
|
|
"Files using deprecated Java APIs like Vector or Hashtable", |
|
|
"Files using raw JDBC without prepared statements", |
|
|
"Files missing try-with-resources for AutoCloseable", |
|
|
"Files using pre-Java 8 patterns without lambdas or streams", |
|
|
"Files using deprecated Date and Calendar APIs", |
|
|
"Files with missing null checks or Optional usage" |
|
|
], |
|
|
'javascript': [ |
|
|
"Files using var instead of let or const", |
|
|
"Files using callback patterns instead of Promises or async/await", |
|
|
"Files using jQuery for DOM manipulation", |
|
|
"Files with eval() usage", |
|
|
"Files using prototype-based inheritance" |
|
|
], |
|
|
'typescript': [ |
|
|
"Files with excessive any type usage", |
|
|
"Files missing strict null checks", |
|
|
"Files using old module syntax" |
|
|
], |
|
|
'cpp': [ |
|
|
"Files using raw pointers instead of smart pointers", |
|
|
"Files with manual memory management", |
|
|
"Files using C-style casts", |
|
|
"Files missing RAII patterns" |
|
|
], |
|
|
'csharp': [ |
|
|
"Files using deprecated .NET APIs", |
|
|
"Files missing async/await patterns", |
|
|
"Files using old collection types" |
|
|
], |
|
|
'go': [ |
|
|
"Files missing error handling", |
|
|
"Files with goroutine leaks", |
|
|
"Files missing context usage" |
|
|
], |
|
|
'ruby': [ |
|
|
"Files using deprecated Ruby syntax", |
|
|
"Files missing proper error handling" |
|
|
], |
|
|
'php': [ |
|
|
"Files using deprecated mysql_* functions", |
|
|
"Files missing prepared statements", |
|
|
"Files with register_globals usage" |
|
|
] |
|
|
} |
|
|
|
|
|
queries = common_patterns.copy() |
|
|
|
|
|
for lang in languages: |
|
|
if lang in language_patterns: |
|
|
queries.extend(language_patterns[lang]) |
|
|
|
|
|
return queries |
|
|
|
|
|
def _generate_pr_summary(self, results: Dict, target_version: str) -> str: |
|
|
"""Generate PR summary from results.""" |
|
|
stats = results['statistics'] |
|
|
|
|
|
|
|
|
coverage_line = "" |
|
|
if stats.get('average_coverage', 0) > 0: |
|
|
coverage_line = f"**Code Coverage**: {stats['average_coverage']:.1f}%\n" |
|
|
|
|
|
summary = f"""Automated migration to {target_version} with security fixes and performance improvements. |
|
|
|
|
|
**Files Modernized**: {stats['files_modernized']} / {stats['total_files']} |
|
|
**Tests Generated**: {stats['tests_generated']} |
|
|
**Test Pass Rate**: {stats['test_pass_rate']:.1f}% |
|
|
{coverage_line} |
|
|
This PR includes: |
|
|
- Syntax modernization to {target_version} |
|
|
- Security vulnerability fixes |
|
|
- Deprecated library replacements |
|
|
- Comprehensive test suite |
|
|
- Performance optimizations |
|
|
|
|
|
All changes have been validated in an isolated sandbox environment. |
|
|
""" |
|
|
|
|
|
return summary |
|
|
|
|
|
def generate_report(self, results: Dict) -> str: |
|
|
""" |
|
|
Generate human-readable report from results. |
|
|
|
|
|
Args: |
|
|
results: Workflow results dictionary |
|
|
|
|
|
Returns: |
|
|
Formatted report string |
|
|
""" |
|
|
report = [] |
|
|
report.append("=" * 60) |
|
|
report.append("LEGACY CODE MODERNIZATION REPORT") |
|
|
report.append("=" * 60) |
|
|
report.append("") |
|
|
|
|
|
if results['success']: |
|
|
report.append("✅ Status: SUCCESS") |
|
|
else: |
|
|
report.append("❌ Status: FAILED") |
|
|
|
|
|
report.append("") |
|
|
report.append("STATISTICS:") |
|
|
report.append("-" * 60) |
|
|
|
|
|
stats = results.get('statistics', {}) |
|
|
for key, value in stats.items(): |
|
|
|
|
|
if key == 'average_coverage' and value == 0: |
|
|
continue |
|
|
report.append(f" {key.replace('_', ' ').title()}: {value}") |
|
|
|
|
|
|
|
|
classification_data = results.get('phases', {}).get('classification', {}) |
|
|
intelligent_stats = classification_data.get('intelligent_stats') |
|
|
if intelligent_stats: |
|
|
report.append("") |
|
|
report.append("INTELLIGENT PATTERN ANALYSIS:") |
|
|
report.append("-" * 60) |
|
|
report.append(f" Patterns Detected: {intelligent_stats.get('patterns_detected', 0)}") |
|
|
report.append(f" Average Modernization Score: {intelligent_stats.get('average_modernization_score', 0)}/100") |
|
|
report.append(f" Total Estimated Effort: {intelligent_stats.get('total_estimated_effort_hours', 0)}h") |
|
|
|
|
|
severity_counts = intelligent_stats.get('severity_counts', {}) |
|
|
if severity_counts: |
|
|
report.append(" Severity Breakdown:") |
|
|
for severity, count in severity_counts.items(): |
|
|
if count > 0: |
|
|
report.append(f" {severity.upper()}: {count}") |
|
|
|
|
|
report.append("") |
|
|
report.append("PHASE RESULTS:") |
|
|
report.append("-" * 60) |
|
|
|
|
|
for phase, data in results.get('phases', {}).items(): |
|
|
report.append(f"\n {phase.upper()}:") |
|
|
if isinstance(data, dict): |
|
|
for k, v in data.items(): |
|
|
if k not in ['classifications', 'intelligent_stats']: |
|
|
report.append(f" {k}: {v}") |
|
|
|
|
|
|
|
|
if results.get('output'): |
|
|
report.append("") |
|
|
report.append("OUTPUT LOCATIONS:") |
|
|
report.append("-" * 60) |
|
|
for key, path in results['output'].items(): |
|
|
report.append(f" 📁 {key.replace('_', ' ').title()}: {path}") |
|
|
|
|
|
if results.get('errors'): |
|
|
report.append("") |
|
|
report.append("ERRORS:") |
|
|
report.append("-" * 60) |
|
|
for error in results['errors']: |
|
|
report.append(f" ⚠️ {error}") |
|
|
|
|
|
report.append("") |
|
|
report.append("=" * 60) |
|
|
|
|
|
return "\n".join(report) |
|
|
|