MogensR commited on
Commit
803d2bf
·
1 Parent(s): 00a34b8

Create api/batch_processor.py

Browse files
Files changed (1) hide show
  1. api/batch_processor.py +780 -0
api/batch_processor.py ADDED
@@ -0,0 +1,780 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Batch processing module for BackgroundFX Pro.
3
+ Handles efficient processing of multiple files with optimized resource management.
4
+ """
5
+
6
+ import os
7
+ import cv2
8
+ import numpy as np
9
+ from pathlib import Path
10
+ from typing import Dict, List, Optional, Tuple, Union, Callable, Any, Generator
11
+ from dataclasses import dataclass, field
12
+ from enum import Enum
13
+ import time
14
+ import threading
15
+ from queue import Queue, PriorityQueue, Empty
16
+ from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
17
+ import multiprocessing as mp
18
+ import json
19
+ import hashlib
20
+ import pickle
21
+ import shutil
22
+ import tempfile
23
+ from datetime import datetime
24
+ import psutil
25
+ import mimetypes
26
+
27
+ from ..utils.logger import setup_logger
28
+ from ..utils.device import DeviceManager
29
+ from ..utils import TimeEstimator, MemoryMonitor
30
+ from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode
31
+ from .video_processor import VideoProcessorAPI, VideoStats
32
+
33
+ logger = setup_logger(__name__)
34
+
35
+
36
+ class BatchPriority(Enum):
37
+ """Batch processing priority levels."""
38
+ LOW = 3
39
+ NORMAL = 2
40
+ HIGH = 1
41
+ URGENT = 0
42
+
43
+
44
+ class FileType(Enum):
45
+ """Supported file types."""
46
+ IMAGE = "image"
47
+ VIDEO = "video"
48
+ UNKNOWN = "unknown"
49
+
50
+
51
+ @dataclass
52
+ class BatchItem:
53
+ """Individual item in batch processing."""
54
+ id: str
55
+ input_path: str
56
+ output_path: str
57
+ file_type: FileType
58
+ priority: BatchPriority = BatchPriority.NORMAL
59
+ background: Optional[Union[str, np.ndarray]] = None
60
+ config_overrides: Dict[str, Any] = field(default_factory=dict)
61
+ metadata: Dict[str, Any] = field(default_factory=dict)
62
+ retry_count: int = 0
63
+ max_retries: int = 3
64
+ status: str = "pending"
65
+ error: Optional[str] = None
66
+ result: Optional[Any] = None
67
+ processing_time: float = 0.0
68
+
69
+ def __lt__(self, other):
70
+ """Compare items by priority for PriorityQueue."""
71
+ return self.priority.value < other.priority.value
72
+
73
+
74
+ @dataclass
75
+ class BatchConfig:
76
+ """Configuration for batch processing."""
77
+ # Processing settings
78
+ max_workers: int = mp.cpu_count()
79
+ use_multiprocessing: bool = False
80
+ chunk_size: int = 10
81
+
82
+ # Resource limits
83
+ max_memory_gb: float = 8.0
84
+ max_gpu_memory_gb: float = 4.0
85
+ cpu_limit_percent: float = 80.0
86
+
87
+ # File handling
88
+ input_dir: Optional[str] = None
89
+ output_dir: Optional[str] = None
90
+ recursive: bool = True
91
+ file_patterns: List[str] = field(default_factory=lambda: ["*.jpg", "*.png", "*.mp4", "*.avi"])
92
+ preserve_structure: bool = True
93
+
94
+ # Background settings
95
+ default_background: Optional[Union[str, np.ndarray]] = None
96
+ background_per_file: Dict[str, Union[str, np.ndarray]] = field(default_factory=dict)
97
+
98
+ # Quality settings
99
+ image_quality: int = 95
100
+ video_quality: str = "high"
101
+ maintain_resolution: bool = True
102
+
103
+ # Optimization
104
+ enable_caching: bool = True
105
+ cache_dir: Optional[str] = None
106
+ deduplicate: bool = True
107
+
108
+ # Progress and logging
109
+ progress_callback: Optional[Callable[[float, Dict], None]] = None
110
+ save_report: bool = True
111
+ report_path: Optional[str] = None
112
+
113
+ # Error handling
114
+ stop_on_error: bool = False
115
+ skip_existing: bool = True
116
+
117
+ # Pipeline config
118
+ pipeline_config: Optional[PipelineConfig] = None
119
+
120
+
121
+ @dataclass
122
+ class BatchReport:
123
+ """Batch processing report."""
124
+ start_time: datetime
125
+ end_time: Optional[datetime] = None
126
+ total_items: int = 0
127
+ processed_items: int = 0
128
+ successful_items: int = 0
129
+ failed_items: int = 0
130
+ skipped_items: int = 0
131
+ total_processing_time: float = 0.0
132
+ avg_processing_time: float = 0.0
133
+ total_input_size_mb: float = 0.0
134
+ total_output_size_mb: float = 0.0
135
+ compression_ratio: float = 1.0
136
+ errors: List[Dict[str, Any]] = field(default_factory=list)
137
+ warnings: List[str] = field(default_factory=list)
138
+ resource_usage: Dict[str, Any] = field(default_factory=dict)
139
+ quality_metrics: Dict[str, float] = field(default_factory=dict)
140
+
141
+
142
+ class BatchProcessor:
143
+ """High-performance batch processing engine."""
144
+
145
+ def __init__(self, config: Optional[BatchConfig] = None):
146
+ """
147
+ Initialize batch processor.
148
+
149
+ Args:
150
+ config: Batch processing configuration
151
+ """
152
+ self.config = config or BatchConfig()
153
+ self.logger = setup_logger(f"{__name__}.BatchProcessor")
154
+
155
+ # Initialize components
156
+ self.device_manager = DeviceManager()
157
+ self.memory_monitor = MemoryMonitor()
158
+ self.time_estimator = TimeEstimator()
159
+
160
+ # Processing engines
161
+ self.pipeline = ProcessingPipeline(self.config.pipeline_config)
162
+ self.video_processor = VideoProcessorAPI()
163
+
164
+ # State management
165
+ self.is_processing = False
166
+ self.should_stop = False
167
+ self.current_item = None
168
+
169
+ # Queues
170
+ self.pending_queue = PriorityQueue()
171
+ self.processing_queue = Queue()
172
+ self.completed_queue = Queue()
173
+
174
+ # Worker pool
175
+ if self.config.use_multiprocessing:
176
+ self.executor = ProcessPoolExecutor(max_workers=self.config.max_workers)
177
+ else:
178
+ self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers)
179
+
180
+ # Cache
181
+ self.cache_dir = Path(self.config.cache_dir or tempfile.mkdtemp(prefix="bgfx_cache_"))
182
+ self.cache_index = {}
183
+
184
+ # Statistics
185
+ self.report = BatchReport(start_time=datetime.now())
186
+
187
+ self.logger.info(f"BatchProcessor initialized with {self.config.max_workers} workers")
188
+
189
+ def process_directory(self,
190
+ input_dir: str,
191
+ output_dir: str,
192
+ background: Optional[Union[str, np.ndarray]] = None) -> BatchReport:
193
+ """
194
+ Process all supported files in a directory.
195
+
196
+ Args:
197
+ input_dir: Input directory path
198
+ output_dir: Output directory path
199
+ background: Default background for all files
200
+
201
+ Returns:
202
+ Batch processing report
203
+ """
204
+ input_path = Path(input_dir)
205
+ output_path = Path(output_dir)
206
+
207
+ if not input_path.exists():
208
+ raise ValueError(f"Input directory does not exist: {input_dir}")
209
+
210
+ output_path.mkdir(parents=True, exist_ok=True)
211
+
212
+ # Collect files
213
+ items = self._collect_files(input_path, output_path, background)
214
+
215
+ if not items:
216
+ self.logger.warning("No files found to process")
217
+ return self.report
218
+
219
+ self.logger.info(f"Found {len(items)} files to process")
220
+
221
+ # Process batch
222
+ return self.process_batch(items)
223
+
224
+ def _collect_files(self,
225
+ input_path: Path,
226
+ output_path: Path,
227
+ background: Optional[Union[str, np.ndarray]]) -> List[BatchItem]:
228
+ """Collect all files to process from directory."""
229
+ items = []
230
+
231
+ # Determine search method
232
+ if self.config.recursive:
233
+ file_iterator = input_path.rglob
234
+ else:
235
+ file_iterator = input_path.glob
236
+
237
+ # Collect files matching patterns
238
+ for pattern in self.config.file_patterns:
239
+ for file_path in file_iterator(pattern):
240
+ if file_path.is_file():
241
+ # Determine output path
242
+ if self.config.preserve_structure:
243
+ relative_path = file_path.relative_to(input_path)
244
+ output_file = output_path / relative_path.parent / f"{file_path.stem}_processed{file_path.suffix}"
245
+ else:
246
+ output_file = output_path / f"{file_path.stem}_processed{file_path.suffix}"
247
+
248
+ # Skip if exists and configured to skip
249
+ if self.config.skip_existing and output_file.exists():
250
+ self.report.skipped_items += 1
251
+ continue
252
+
253
+ # Determine file type
254
+ file_type = self._detect_file_type(str(file_path))
255
+
256
+ # Create batch item
257
+ item = BatchItem(
258
+ id=self._generate_item_id(file_path),
259
+ input_path=str(file_path),
260
+ output_path=str(output_file),
261
+ file_type=file_type,
262
+ background=self.config.background_per_file.get(
263
+ str(file_path),
264
+ background or self.config.default_background
265
+ )
266
+ )
267
+
268
+ items.append(item)
269
+
270
+ return items
271
+
272
+ def process_batch(self, items: List[BatchItem]) -> BatchReport:
273
+ """
274
+ Process a batch of items.
275
+
276
+ Args:
277
+ items: List of batch items to process
278
+
279
+ Returns:
280
+ Batch processing report
281
+ """
282
+ self.is_processing = True
283
+ self.report = BatchReport(start_time=datetime.now())
284
+ self.report.total_items = len(items)
285
+
286
+ try:
287
+ # Add items to queue
288
+ for item in items:
289
+ self.pending_queue.put(item)
290
+
291
+ # Check for duplicates if enabled
292
+ if self.config.deduplicate:
293
+ items = self._deduplicate_items(items)
294
+
295
+ # Start processing
296
+ self._process_items(items)
297
+
298
+ finally:
299
+ self.is_processing = False
300
+ self.report.end_time = datetime.now()
301
+ self.report.total_processing_time = (
302
+ self.report.end_time - self.report.start_time
303
+ ).total_seconds()
304
+
305
+ if self.report.processed_items > 0:
306
+ self.report.avg_processing_time = (
307
+ self.report.total_processing_time / self.report.processed_items
308
+ )
309
+
310
+ # Save report if configured
311
+ if self.config.save_report:
312
+ self._save_report()
313
+
314
+ return self.report
315
+
316
+ def _process_items(self, items: List[BatchItem]):
317
+ """Process all items in the batch."""
318
+ # Chunk items for better resource management
319
+ chunks = [items[i:i + self.config.chunk_size]
320
+ for i in range(0, len(items), self.config.chunk_size)]
321
+
322
+ for chunk_idx, chunk in enumerate(chunks):
323
+ if self.should_stop:
324
+ break
325
+
326
+ # Check resource availability
327
+ self._wait_for_resources()
328
+
329
+ # Process chunk
330
+ futures = []
331
+ for item in chunk:
332
+ if self.should_stop:
333
+ break
334
+
335
+ future = self.executor.submit(self._process_single_item, item)
336
+ futures.append((future, item))
337
+
338
+ # Collect results
339
+ for future, item in futures:
340
+ try:
341
+ result = future.result(timeout=300) # 5 minute timeout
342
+ item.result = result
343
+ item.status = "completed" if result else "failed"
344
+
345
+ if result:
346
+ self.report.successful_items += 1
347
+ else:
348
+ self.report.failed_items += 1
349
+
350
+ except Exception as e:
351
+ self.logger.error(f"Processing failed for {item.id}: {e}")
352
+ item.status = "failed"
353
+ item.error = str(e)
354
+ self.report.failed_items += 1
355
+
356
+ if self.config.stop_on_error:
357
+ self.should_stop = True
358
+ break
359
+
360
+ finally:
361
+ self.report.processed_items += 1
362
+
363
+ # Progress callback
364
+ if self.config.progress_callback:
365
+ progress = self.report.processed_items / self.report.total_items
366
+ self.config.progress_callback(progress, {
367
+ 'current_item': item.id,
368
+ 'processed': self.report.processed_items,
369
+ 'total': self.report.total_items,
370
+ 'successful': self.report.successful_items,
371
+ 'failed': self.report.failed_items
372
+ })
373
+
374
+ def _process_single_item(self, item: BatchItem) -> bool:
375
+ """
376
+ Process a single batch item.
377
+
378
+ Args:
379
+ item: Batch item to process
380
+
381
+ Returns:
382
+ True if successful
383
+ """
384
+ start_time = time.time()
385
+
386
+ try:
387
+ # Check cache
388
+ if self.config.enable_caching:
389
+ cached_result = self._check_cache(item)
390
+ if cached_result is not None:
391
+ self._save_cached_result(item, cached_result)
392
+ item.processing_time = time.time() - start_time
393
+ return True
394
+
395
+ # Process based on file type
396
+ if item.file_type == FileType.IMAGE:
397
+ success = self._process_image(item)
398
+ elif item.file_type == FileType.VIDEO:
399
+ success = self._process_video(item)
400
+ else:
401
+ raise ValueError(f"Unsupported file type: {item.file_type}")
402
+
403
+ # Cache result if successful
404
+ if success and self.config.enable_caching:
405
+ self._cache_result(item)
406
+
407
+ item.processing_time = time.time() - start_time
408
+
409
+ # Update file size statistics
410
+ self._update_size_stats(item)
411
+
412
+ return success
413
+
414
+ except Exception as e:
415
+ self.logger.error(f"Error processing {item.id}: {e}")
416
+ item.error = str(e)
417
+
418
+ # Retry logic
419
+ if item.retry_count < item.max_retries:
420
+ item.retry_count += 1
421
+ self.logger.info(f"Retrying {item.id} (attempt {item.retry_count}/{item.max_retries})")
422
+ return self._process_single_item(item)
423
+
424
+ return False
425
+
426
+ def _process_image(self, item: BatchItem) -> bool:
427
+ """Process an image file."""
428
+ try:
429
+ # Load image
430
+ image = cv2.imread(item.input_path)
431
+ if image is None:
432
+ raise ValueError(f"Cannot load image: {item.input_path}")
433
+
434
+ # Apply config overrides
435
+ pipeline_config = self.config.pipeline_config or PipelineConfig()
436
+ for key, value in item.config_overrides.items():
437
+ if hasattr(pipeline_config, key):
438
+ setattr(pipeline_config, key, value)
439
+
440
+ # Process through pipeline
441
+ result = self.pipeline.process_image(
442
+ image,
443
+ item.background
444
+ )
445
+
446
+ if result.success and result.output_image is not None:
447
+ # Create output directory
448
+ output_path = Path(item.output_path)
449
+ output_path.parent.mkdir(parents=True, exist_ok=True)
450
+
451
+ # Save result
452
+ if output_path.suffix.lower() in ['.jpg', '.jpeg']:
453
+ cv2.imwrite(
454
+ str(output_path),
455
+ result.output_image,
456
+ [cv2.IMWRITE_JPEG_QUALITY, self.config.image_quality]
457
+ )
458
+ else:
459
+ cv2.imwrite(str(output_path), result.output_image)
460
+
461
+ # Store quality metrics
462
+ item.metadata['quality_score'] = result.quality_score
463
+ self._update_quality_metrics(result.quality_score)
464
+
465
+ return True
466
+
467
+ return False
468
+
469
+ except Exception as e:
470
+ self.logger.error(f"Image processing failed for {item.input_path}: {e}")
471
+ raise
472
+
473
+ def _process_video(self, item: BatchItem) -> bool:
474
+ """Process a video file."""
475
+ try:
476
+ # Create output directory
477
+ output_path = Path(item.output_path)
478
+ output_path.parent.mkdir(parents=True, exist_ok=True)
479
+
480
+ # Process video
481
+ stats = self.video_processor.process_video(
482
+ item.input_path,
483
+ str(output_path),
484
+ item.background
485
+ )
486
+
487
+ # Store statistics
488
+ item.metadata['video_stats'] = {
489
+ 'frames_processed': stats.frames_processed,
490
+ 'frames_dropped': stats.frames_dropped,
491
+ 'processing_fps': stats.processing_fps,
492
+ 'avg_quality': stats.avg_quality_score
493
+ }
494
+
495
+ self._update_quality_metrics(stats.avg_quality_score)
496
+
497
+ return stats.frames_processed > 0
498
+
499
+ except Exception as e:
500
+ self.logger.error(f"Video processing failed for {item.input_path}: {e}")
501
+ raise
502
+
503
+ def _detect_file_type(self, file_path: str) -> FileType:
504
+ """Detect file type from path."""
505
+ mime_type, _ = mimetypes.guess_type(file_path)
506
+
507
+ if mime_type:
508
+ if mime_type.startswith('image/'):
509
+ return FileType.IMAGE
510
+ elif mime_type.startswith('video/'):
511
+ return FileType.VIDEO
512
+
513
+ # Fallback to extension
514
+ ext = Path(file_path).suffix.lower()
515
+ if ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']:
516
+ return FileType.IMAGE
517
+ elif ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']:
518
+ return FileType.VIDEO
519
+
520
+ return FileType.UNKNOWN
521
+
522
+ def _generate_item_id(self, file_path: Path) -> str:
523
+ """Generate unique ID for batch item."""
524
+ # Combine path and timestamp for uniqueness
525
+ content = f"{file_path}{time.time()}"
526
+ return hashlib.md5(content.encode()).hexdigest()[:16]
527
+
528
+ def _deduplicate_items(self, items: List[BatchItem]) -> List[BatchItem]:
529
+ """Remove duplicate items based on file content hash."""
530
+ seen_hashes = set()
531
+ unique_items = []
532
+
533
+ for item in items:
534
+ try:
535
+ file_hash = self._calculate_file_hash(item.input_path)
536
+
537
+ if file_hash not in seen_hashes:
538
+ seen_hashes.add(file_hash)
539
+ unique_items.append(item)
540
+ else:
541
+ self.logger.info(f"Skipping duplicate: {item.input_path}")
542
+ self.report.skipped_items += 1
543
+
544
+ except Exception as e:
545
+ self.logger.warning(f"Cannot calculate hash for {item.input_path}: {e}")
546
+ unique_items.append(item)
547
+
548
+ return unique_items
549
+
550
+ def _calculate_file_hash(self, file_path: str, chunk_size: int = 8192) -> str:
551
+ """Calculate MD5 hash of file."""
552
+ hasher = hashlib.md5()
553
+
554
+ with open(file_path, 'rb') as f:
555
+ while chunk:= f.read(chunk_size):
556
+ hasher.update(chunk)
557
+
558
+ return hasher.hexdigest()
559
+
560
+ def _check_cache(self, item: BatchItem) -> Optional[Any]:
561
+ """Check if item result is cached."""
562
+ cache_key = self._get_cache_key(item)
563
+ cache_file = self.cache_dir / f"{cache_key}.pkl"
564
+
565
+ if cache_file.exists():
566
+ try:
567
+ with open(cache_file, 'rb') as f:
568
+ cached_data = pickle.load(f)
569
+
570
+ # Verify cache validity
571
+ if cached_data.get('input_hash') == self._calculate_file_hash(item.input_path):
572
+ self.logger.info(f"Using cached result for {item.id}")
573
+ return cached_data['result']
574
+
575
+ except Exception as e:
576
+ self.logger.warning(f"Cache read failed: {e}")
577
+
578
+ return None
579
+
580
+ def _cache_result(self, item: BatchItem):
581
+ """Cache processing result."""
582
+ try:
583
+ cache_key = self._get_cache_key(item)
584
+ cache_file = self.cache_dir / f"{cache_key}.pkl"
585
+
586
+ # Read processed file
587
+ with open(item.output_path, 'rb') as f:
588
+ result_data = f.read()
589
+
590
+ # Cache data
591
+ cache_data = {
592
+ 'input_hash': self._calculate_file_hash(item.input_path),
593
+ 'result': result_data,
594
+ 'metadata': item.metadata,
595
+ 'timestamp': time.time()
596
+ }
597
+
598
+ with open(cache_file, 'wb') as f:
599
+ pickle.dump(cache_data, f)
600
+
601
+ except Exception as e:
602
+ self.logger.warning(f"Cache write failed: {e}")
603
+
604
+ def _save_cached_result(self, item: BatchItem, cached_data: bytes):
605
+ """Save cached result to output file."""
606
+ output_path = Path(item.output_path)
607
+ output_path.parent.mkdir(parents=True, exist_ok=True)
608
+
609
+ with open(output_path, 'wb') as f:
610
+ f.write(cached_data)
611
+
612
+ def _get_cache_key(self, item: BatchItem) -> str:
613
+ """Generate cache key for item."""
614
+ # Include relevant parameters in cache key
615
+ key_parts = [
616
+ item.input_path,
617
+ str(item.background) if item.background is not None else "none",
618
+ json.dumps(item.config_overrides, sort_keys=True)
619
+ ]
620
+
621
+ key_string = "|".join(key_parts)
622
+ return hashlib.md5(key_string.encode()).hexdigest()
623
+
624
+ def _wait_for_resources(self):
625
+ """Wait for sufficient resources before processing."""
626
+ while True:
627
+ # Check CPU usage
628
+ cpu_percent = psutil.cpu_percent(interval=1)
629
+ if cpu_percent > self.config.cpu_limit_percent:
630
+ self.logger.debug(f"CPU usage high ({cpu_percent}%), waiting...")
631
+ time.sleep(2)
632
+ continue
633
+
634
+ # Check memory
635
+ memory = psutil.virtual_memory()
636
+ memory_gb = (memory.total - memory.available) / (1024**3)
637
+ if memory_gb > self.config.max_memory_gb:
638
+ self.logger.debug(f"Memory usage high ({memory_gb:.1f}GB), waiting...")
639
+ time.sleep(2)
640
+ continue
641
+
642
+ # Resources available
643
+ break
644
+
645
+ def _update_size_stats(self, item: BatchItem):
646
+ """Update file size statistics."""
647
+ try:
648
+ input_size = os.path.getsize(item.input_path) / (1024**2) # MB
649
+ output_size = os.path.getsize(item.output_path) / (1024**2) # MB
650
+
651
+ self.report.total_input_size_mb += input_size
652
+ self.report.total_output_size_mb += output_size
653
+
654
+ if self.report.total_input_size_mb > 0:
655
+ self.report.compression_ratio = (
656
+ self.report.total_output_size_mb / self.report.total_input_size_mb
657
+ )
658
+
659
+ except Exception as e:
660
+ self.logger.warning(f"Cannot update size stats: {e}")
661
+
662
+ def _update_quality_metrics(self, quality_score: float):
663
+ """Update quality metrics in report."""
664
+ if 'scores' not in self.report.quality_metrics:
665
+ self.report.quality_metrics['scores'] = []
666
+
667
+ self.report.quality_metrics['scores'].append(quality_score)
668
+
669
+ scores = self.report.quality_metrics['scores']
670
+ self.report.quality_metrics['avg_quality'] = np.mean(scores)
671
+ self.report.quality_metrics['min_quality'] = np.min(scores)
672
+ self.report.quality_metrics['max_quality'] = np.max(scores)
673
+ self.report.quality_metrics['std_quality'] = np.std(scores)
674
+
675
+ def _save_report(self):
676
+ """Save processing report to file."""
677
+ try:
678
+ report_path = self.config.report_path
679
+ if not report_path:
680
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
681
+ report_path = f"batch_report_{timestamp}.json"
682
+
683
+ report_dict = {
684
+ 'start_time': self.report.start_time.isoformat(),
685
+ 'end_time': self.report.end_time.isoformat() if self.report.end_time else None,
686
+ 'total_items': self.report.total_items,
687
+ 'processed_items': self.report.processed_items,
688
+ 'successful_items': self.report.successful_items,
689
+ 'failed_items': self.report.failed_items,
690
+ 'skipped_items': self.report.skipped_items,
691
+ 'total_processing_time': self.report.total_processing_time,
692
+ 'avg_processing_time': self.report.avg_processing_time,
693
+ 'total_input_size_mb': self.report.total_input_size_mb,
694
+ 'total_output_size_mb': self.report.total_output_size_mb,
695
+ 'compression_ratio': self.report.compression_ratio,
696
+ 'quality_metrics': self.report.quality_metrics,
697
+ 'errors': self.report.errors,
698
+ 'warnings': self.report.warnings
699
+ }
700
+
701
+ with open(report_path, 'w') as f:
702
+ json.dump(report_dict, f, indent=2)
703
+
704
+ self.logger.info(f"Report saved to {report_path}")
705
+
706
+ except Exception as e:
707
+ self.logger.error(f"Failed to save report: {e}")
708
+
709
+ def process_with_pattern(self,
710
+ pattern: str,
711
+ output_template: str,
712
+ background: Optional[Union[str, np.ndarray]] = None) -> BatchReport:
713
+ """
714
+ Process files matching a pattern with template-based output.
715
+
716
+ Args:
717
+ pattern: File pattern (e.g., "images/*.jpg")
718
+ output_template: Output path template (e.g., "output/{name}_bg.{ext}")
719
+ background: Background for processing
720
+
721
+ Returns:
722
+ Batch processing report
723
+ """
724
+ items = []
725
+
726
+ for file_path in Path().glob(pattern):
727
+ if file_path.is_file():
728
+ # Parse template
729
+ output_path = output_template.format(
730
+ name=file_path.stem,
731
+ ext=file_path.suffix[1:],
732
+ dir=file_path.parent,
733
+ date=datetime.now().strftime("%Y%m%d")
734
+ )
735
+
736
+ item = BatchItem(
737
+ id=self._generate_item_id(file_path),
738
+ input_path=str(file_path),
739
+ output_path=output_path,
740
+ file_type=self._detect_file_type(str(file_path)),
741
+ background=background
742
+ )
743
+
744
+ items.append(item)
745
+
746
+ return self.process_batch(items)
747
+
748
+ def stop_processing(self):
749
+ """Stop batch processing."""
750
+ self.should_stop = True
751
+ self.logger.info("Stopping batch processing...")
752
+
753
+ def cleanup(self):
754
+ """Clean up resources."""
755
+ self.stop_processing()
756
+ self.executor.shutdown(wait=True)
757
+
758
+ # Clean cache if temporary
759
+ if self.config.cache_dir is None:
760
+ shutil.rmtree(self.cache_dir, ignore_errors=True)
761
+
762
+ self.logger.info("Batch processor cleanup complete")
763
+
764
+ def get_status(self) -> Dict[str, Any]:
765
+ """Get current processing status."""
766
+ return {
767
+ 'is_processing': self.is_processing,
768
+ 'total_items': self.report.total_items,
769
+ 'processed_items': self.report.processed_items,
770
+ 'successful_items': self.report.successful_items,
771
+ 'failed_items': self.report.failed_items,
772
+ 'skipped_items': self.report.skipped_items,
773
+ 'current_item': self.current_item.id if self.current_item else None,
774
+ 'progress': (self.report.processed_items / self.report.total_items * 100
775
+ if self.report.total_items > 0 else 0),
776
+ 'estimated_time_remaining': self.time_estimator.estimate_remaining(
777
+ self.report.processed_items,
778
+ self.report.total_items
779
+ ) if self.is_processing else None
780
+ }