|
|
""" |
|
|
Integration layer for the new IntelligentPatternMatcher with existing workflow. |
|
|
Provides backward compatibility while enabling advanced pattern detection. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
from .pattern_matcher import ( |
|
|
IntelligentPatternMatcher, |
|
|
FileAnalysis, |
|
|
PatternSeverity |
|
|
) |
|
|
from .classifier import CodeClassifier |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PatternMatcherIntegration: |
|
|
""" |
|
|
Integrates IntelligentPatternMatcher with existing workflow. |
|
|
Provides compatibility layer for gradual migration. |
|
|
""" |
|
|
|
|
|
def __init__(self, use_intelligent_matcher: bool = True, cache_dir: Optional[str] = None): |
|
|
""" |
|
|
Initialize integration layer. |
|
|
|
|
|
Args: |
|
|
use_intelligent_matcher: If True, use new AI-powered matcher |
|
|
cache_dir: Optional cache directory for pattern analysis |
|
|
""" |
|
|
self.use_intelligent_matcher = use_intelligent_matcher |
|
|
|
|
|
if use_intelligent_matcher: |
|
|
self.pattern_matcher = IntelligentPatternMatcher(cache_dir=cache_dir) |
|
|
logger.info("Using IntelligentPatternMatcher") |
|
|
else: |
|
|
self.classifier = CodeClassifier() |
|
|
logger.info("Using legacy CodeClassifier") |
|
|
|
|
|
def classify_files(self, files: List[str], file_contents: Optional[Dict[str, str]] = None) -> Dict[str, str]: |
|
|
""" |
|
|
Classify files using either intelligent matcher or legacy classifier. |
|
|
|
|
|
Args: |
|
|
files: List of file paths |
|
|
file_contents: Optional dict of file contents (required for intelligent matcher) |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping filenames to categories |
|
|
Categories: 'modernize_high', 'modernize_low', 'skip' |
|
|
""" |
|
|
if self.use_intelligent_matcher: |
|
|
return self._classify_with_intelligent_matcher(files, file_contents) |
|
|
else: |
|
|
return self.classifier.classify_files(files) |
|
|
|
|
|
def _classify_with_intelligent_matcher( |
|
|
self, |
|
|
files: List[str], |
|
|
file_contents: Optional[Dict[str, str]] |
|
|
) -> Dict[str, str]: |
|
|
""" |
|
|
Classify files using intelligent pattern matcher. |
|
|
|
|
|
Args: |
|
|
files: List of file paths |
|
|
file_contents: Dictionary of file contents |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping filenames to categories |
|
|
""" |
|
|
if not file_contents: |
|
|
logger.warning("No file contents provided, falling back to legacy classifier") |
|
|
return self.classifier.classify_files(files) |
|
|
|
|
|
classifications = {} |
|
|
|
|
|
|
|
|
analyses = self.pattern_matcher.analyze_batch(file_contents) |
|
|
|
|
|
|
|
|
for file_path, analysis in analyses.items(): |
|
|
category = self._analysis_to_category(analysis) |
|
|
classifications[file_path] = category |
|
|
|
|
|
return classifications |
|
|
|
|
|
def _analysis_to_category(self, analysis: FileAnalysis) -> str: |
|
|
""" |
|
|
Convert FileAnalysis to legacy category format. |
|
|
|
|
|
Args: |
|
|
analysis: FileAnalysis object |
|
|
|
|
|
Returns: |
|
|
Category string: 'modernize_high', 'modernize_low', or 'skip' |
|
|
""" |
|
|
if not analysis.requires_modernization: |
|
|
return 'skip' |
|
|
|
|
|
|
|
|
has_critical = any( |
|
|
p.severity == PatternSeverity.CRITICAL |
|
|
for p in analysis.patterns |
|
|
) |
|
|
has_high = any( |
|
|
p.severity == PatternSeverity.HIGH |
|
|
for p in analysis.patterns |
|
|
) |
|
|
|
|
|
|
|
|
if has_critical or analysis.modernization_score < 50: |
|
|
return 'modernize_high' |
|
|
elif has_high or analysis.modernization_score < 75: |
|
|
return 'modernize_high' |
|
|
elif analysis.requires_modernization: |
|
|
return 'modernize_low' |
|
|
else: |
|
|
return 'skip' |
|
|
|
|
|
def get_detailed_analysis(self, file_path: str, code: str) -> FileAnalysis: |
|
|
""" |
|
|
Get detailed pattern analysis for a single file. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file |
|
|
code: File contents |
|
|
|
|
|
Returns: |
|
|
FileAnalysis object with detailed pattern information |
|
|
""" |
|
|
if not self.use_intelligent_matcher: |
|
|
raise ValueError("Detailed analysis requires intelligent matcher") |
|
|
|
|
|
return self.pattern_matcher.analyze_file(file_path, code) |
|
|
|
|
|
def get_transformation_plan(self, analysis: FileAnalysis) -> Dict: |
|
|
""" |
|
|
Convert FileAnalysis to transformation plan format. |
|
|
|
|
|
Args: |
|
|
analysis: FileAnalysis object |
|
|
|
|
|
Returns: |
|
|
Transformation plan dictionary compatible with CodeTransformer |
|
|
""" |
|
|
|
|
|
pattern_groups = {} |
|
|
for pattern in analysis.patterns: |
|
|
if pattern.pattern_type not in pattern_groups: |
|
|
pattern_groups[pattern.pattern_type] = [] |
|
|
pattern_groups[pattern.pattern_type].append(pattern) |
|
|
|
|
|
|
|
|
steps = [] |
|
|
total_effort = 0 |
|
|
|
|
|
for pattern_type, patterns in pattern_groups.items(): |
|
|
|
|
|
highest_severity = max(patterns, key=lambda p: self._severity_to_int(p.severity)) |
|
|
|
|
|
steps.append({ |
|
|
'pattern': pattern_type, |
|
|
'severity': highest_severity.severity.value, |
|
|
'description': highest_severity.description, |
|
|
'recommendation': highest_severity.recommendation, |
|
|
'line_numbers': highest_severity.line_numbers, |
|
|
'confidence': highest_severity.confidence |
|
|
}) |
|
|
|
|
|
total_effort += highest_severity.estimated_effort_hours |
|
|
|
|
|
return { |
|
|
'file_path': analysis.file_path, |
|
|
'language': analysis.language, |
|
|
'framework': analysis.framework, |
|
|
'pattern': f"{analysis.language} modernization", |
|
|
'steps': steps, |
|
|
'estimated_effort_hours': total_effort, |
|
|
'priority': analysis.overall_priority.value, |
|
|
'modernization_score': analysis.modernization_score |
|
|
} |
|
|
|
|
|
def _severity_to_int(self, severity: PatternSeverity) -> int: |
|
|
"""Convert severity to integer for comparison.""" |
|
|
severity_map = { |
|
|
PatternSeverity.CRITICAL: 5, |
|
|
PatternSeverity.HIGH: 4, |
|
|
PatternSeverity.MEDIUM: 3, |
|
|
PatternSeverity.LOW: 2, |
|
|
PatternSeverity.INFO: 1 |
|
|
} |
|
|
return severity_map.get(severity, 0) |
|
|
|
|
|
def generate_statistics(self, analyses: Dict[str, FileAnalysis]) -> Dict: |
|
|
""" |
|
|
Generate statistics from pattern analyses. |
|
|
|
|
|
Args: |
|
|
analyses: Dictionary of file analyses |
|
|
|
|
|
Returns: |
|
|
Statistics dictionary |
|
|
""" |
|
|
total_files = len(analyses) |
|
|
|
|
|
|
|
|
modernize_high = sum( |
|
|
1 for a in analyses.values() |
|
|
if self._analysis_to_category(a) == 'modernize_high' |
|
|
) |
|
|
modernize_low = sum( |
|
|
1 for a in analyses.values() |
|
|
if self._analysis_to_category(a) == 'modernize_low' |
|
|
) |
|
|
skip = total_files - modernize_high - modernize_low |
|
|
|
|
|
|
|
|
severity_counts = {s.value: 0 for s in PatternSeverity} |
|
|
for analysis in analyses.values(): |
|
|
for pattern in analysis.patterns: |
|
|
severity_counts[pattern.severity.value] += 1 |
|
|
|
|
|
|
|
|
avg_modernization_score = ( |
|
|
sum(a.modernization_score for a in analyses.values()) / max(total_files, 1) |
|
|
) |
|
|
|
|
|
|
|
|
total_effort = sum( |
|
|
sum(p.estimated_effort_hours for p in a.patterns) |
|
|
for a in analyses.values() |
|
|
) |
|
|
|
|
|
return { |
|
|
'total_files': total_files, |
|
|
'modernize_high': modernize_high, |
|
|
'modernize_low': modernize_low, |
|
|
'skip': skip, |
|
|
'severity_counts': severity_counts, |
|
|
'average_modernization_score': round(avg_modernization_score, 2), |
|
|
'total_estimated_effort_hours': round(total_effort, 2), |
|
|
'patterns_detected': sum(len(a.patterns) for a in analyses.values()) |
|
|
} |
|
|
|
|
|
|
|
|
def migrate_to_intelligent_matcher( |
|
|
orchestrator, |
|
|
repo_path: str, |
|
|
file_contents: Dict[str, str] |
|
|
) -> Dict: |
|
|
""" |
|
|
Helper function to migrate existing orchestrator to use intelligent matcher. |
|
|
|
|
|
Args: |
|
|
orchestrator: ModernizationOrchestrator instance |
|
|
repo_path: Path to repository |
|
|
file_contents: Dictionary of file contents |
|
|
|
|
|
Returns: |
|
|
Enhanced results with detailed pattern analysis |
|
|
""" |
|
|
logger.info("Migrating to IntelligentPatternMatcher") |
|
|
|
|
|
|
|
|
integration = PatternMatcherIntegration( |
|
|
use_intelligent_matcher=True, |
|
|
cache_dir=Path(repo_path) / ".pattern_cache" |
|
|
) |
|
|
|
|
|
|
|
|
analyses = integration.pattern_matcher.analyze_batch(file_contents) |
|
|
|
|
|
|
|
|
prioritized = integration.pattern_matcher.prioritize_files(analyses) |
|
|
|
|
|
|
|
|
transformation_plans = {} |
|
|
for file_path, analysis in prioritized: |
|
|
if analysis.requires_modernization: |
|
|
plan = integration.get_transformation_plan(analysis) |
|
|
transformation_plans[file_path] = plan |
|
|
|
|
|
|
|
|
report = integration.pattern_matcher.generate_report(analyses) |
|
|
|
|
|
return { |
|
|
'analyses': analyses, |
|
|
'prioritized_files': prioritized, |
|
|
'transformation_plans': transformation_plans, |
|
|
'statistics': integration.generate_statistics(analyses), |
|
|
'report': report |
|
|
} |
|
|
|