legacy_code_modernizer / src /agents /pattern_integration.py
naazimsnh02's picture
Initial deployment: Autonomous AI agent for code modernization
ec4aa90
"""
Integration layer for the new IntelligentPatternMatcher with existing workflow.
Provides backward compatibility while enabling advanced pattern detection.
"""
import logging
from typing import Dict, List, Optional
from pathlib import Path
from .pattern_matcher import (
IntelligentPatternMatcher,
FileAnalysis,
PatternSeverity
)
from .classifier import CodeClassifier
logger = logging.getLogger(__name__)
class PatternMatcherIntegration:
"""
Integrates IntelligentPatternMatcher with existing workflow.
Provides compatibility layer for gradual migration.
"""
def __init__(self, use_intelligent_matcher: bool = True, cache_dir: Optional[str] = None):
"""
Initialize integration layer.
Args:
use_intelligent_matcher: If True, use new AI-powered matcher
cache_dir: Optional cache directory for pattern analysis
"""
self.use_intelligent_matcher = use_intelligent_matcher
if use_intelligent_matcher:
self.pattern_matcher = IntelligentPatternMatcher(cache_dir=cache_dir)
logger.info("Using IntelligentPatternMatcher")
else:
self.classifier = CodeClassifier()
logger.info("Using legacy CodeClassifier")
def classify_files(self, files: List[str], file_contents: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""
Classify files using either intelligent matcher or legacy classifier.
Args:
files: List of file paths
file_contents: Optional dict of file contents (required for intelligent matcher)
Returns:
Dictionary mapping filenames to categories
Categories: 'modernize_high', 'modernize_low', 'skip'
"""
if self.use_intelligent_matcher:
return self._classify_with_intelligent_matcher(files, file_contents)
else:
return self.classifier.classify_files(files)
def _classify_with_intelligent_matcher(
self,
files: List[str],
file_contents: Optional[Dict[str, str]]
) -> Dict[str, str]:
"""
Classify files using intelligent pattern matcher.
Args:
files: List of file paths
file_contents: Dictionary of file contents
Returns:
Dictionary mapping filenames to categories
"""
if not file_contents:
logger.warning("No file contents provided, falling back to legacy classifier")
return self.classifier.classify_files(files)
classifications = {}
# Analyze files
analyses = self.pattern_matcher.analyze_batch(file_contents)
# Convert analyses to legacy classification format
for file_path, analysis in analyses.items():
category = self._analysis_to_category(analysis)
classifications[file_path] = category
return classifications
def _analysis_to_category(self, analysis: FileAnalysis) -> str:
"""
Convert FileAnalysis to legacy category format.
Args:
analysis: FileAnalysis object
Returns:
Category string: 'modernize_high', 'modernize_low', or 'skip'
"""
if not analysis.requires_modernization:
return 'skip'
# Check for critical or high severity patterns
has_critical = any(
p.severity == PatternSeverity.CRITICAL
for p in analysis.patterns
)
has_high = any(
p.severity == PatternSeverity.HIGH
for p in analysis.patterns
)
# Check modernization score
if has_critical or analysis.modernization_score < 50:
return 'modernize_high'
elif has_high or analysis.modernization_score < 75:
return 'modernize_high'
elif analysis.requires_modernization:
return 'modernize_low'
else:
return 'skip'
def get_detailed_analysis(self, file_path: str, code: str) -> FileAnalysis:
"""
Get detailed pattern analysis for a single file.
Args:
file_path: Path to the file
code: File contents
Returns:
FileAnalysis object with detailed pattern information
"""
if not self.use_intelligent_matcher:
raise ValueError("Detailed analysis requires intelligent matcher")
return self.pattern_matcher.analyze_file(file_path, code)
def get_transformation_plan(self, analysis: FileAnalysis) -> Dict:
"""
Convert FileAnalysis to transformation plan format.
Args:
analysis: FileAnalysis object
Returns:
Transformation plan dictionary compatible with CodeTransformer
"""
# Group patterns by type
pattern_groups = {}
for pattern in analysis.patterns:
if pattern.pattern_type not in pattern_groups:
pattern_groups[pattern.pattern_type] = []
pattern_groups[pattern.pattern_type].append(pattern)
# Build transformation steps
steps = []
total_effort = 0
for pattern_type, patterns in pattern_groups.items():
# Get highest severity pattern for this type
highest_severity = max(patterns, key=lambda p: self._severity_to_int(p.severity))
steps.append({
'pattern': pattern_type,
'severity': highest_severity.severity.value,
'description': highest_severity.description,
'recommendation': highest_severity.recommendation,
'line_numbers': highest_severity.line_numbers,
'confidence': highest_severity.confidence
})
total_effort += highest_severity.estimated_effort_hours
return {
'file_path': analysis.file_path,
'language': analysis.language,
'framework': analysis.framework,
'pattern': f"{analysis.language} modernization",
'steps': steps,
'estimated_effort_hours': total_effort,
'priority': analysis.overall_priority.value,
'modernization_score': analysis.modernization_score
}
def _severity_to_int(self, severity: PatternSeverity) -> int:
"""Convert severity to integer for comparison."""
severity_map = {
PatternSeverity.CRITICAL: 5,
PatternSeverity.HIGH: 4,
PatternSeverity.MEDIUM: 3,
PatternSeverity.LOW: 2,
PatternSeverity.INFO: 1
}
return severity_map.get(severity, 0)
def generate_statistics(self, analyses: Dict[str, FileAnalysis]) -> Dict:
"""
Generate statistics from pattern analyses.
Args:
analyses: Dictionary of file analyses
Returns:
Statistics dictionary
"""
total_files = len(analyses)
# Count by category
modernize_high = sum(
1 for a in analyses.values()
if self._analysis_to_category(a) == 'modernize_high'
)
modernize_low = sum(
1 for a in analyses.values()
if self._analysis_to_category(a) == 'modernize_low'
)
skip = total_files - modernize_high - modernize_low
# Count patterns by severity
severity_counts = {s.value: 0 for s in PatternSeverity}
for analysis in analyses.values():
for pattern in analysis.patterns:
severity_counts[pattern.severity.value] += 1
# Calculate average scores
avg_modernization_score = (
sum(a.modernization_score for a in analyses.values()) / max(total_files, 1)
)
# Estimate total effort
total_effort = sum(
sum(p.estimated_effort_hours for p in a.patterns)
for a in analyses.values()
)
return {
'total_files': total_files,
'modernize_high': modernize_high,
'modernize_low': modernize_low,
'skip': skip,
'severity_counts': severity_counts,
'average_modernization_score': round(avg_modernization_score, 2),
'total_estimated_effort_hours': round(total_effort, 2),
'patterns_detected': sum(len(a.patterns) for a in analyses.values())
}
def migrate_to_intelligent_matcher(
orchestrator,
repo_path: str,
file_contents: Dict[str, str]
) -> Dict:
"""
Helper function to migrate existing orchestrator to use intelligent matcher.
Args:
orchestrator: ModernizationOrchestrator instance
repo_path: Path to repository
file_contents: Dictionary of file contents
Returns:
Enhanced results with detailed pattern analysis
"""
logger.info("Migrating to IntelligentPatternMatcher")
# Create integration layer
integration = PatternMatcherIntegration(
use_intelligent_matcher=True,
cache_dir=Path(repo_path) / ".pattern_cache"
)
# Analyze all files
analyses = integration.pattern_matcher.analyze_batch(file_contents)
# Generate prioritized list
prioritized = integration.pattern_matcher.prioritize_files(analyses)
# Convert to transformation plans
transformation_plans = {}
for file_path, analysis in prioritized:
if analysis.requires_modernization:
plan = integration.get_transformation_plan(analysis)
transformation_plans[file_path] = plan
# Generate report
report = integration.pattern_matcher.generate_report(analyses)
return {
'analyses': analyses,
'prioritized_files': prioritized,
'transformation_plans': transformation_plans,
'statistics': integration.generate_statistics(analyses),
'report': report
}