#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MatAnyone adapter — Using Official API (File-Based) (Enhanced logging, explicit error handling, and stage progress) ... """ from __future__ import annotations import os import time import logging import tempfile import importlib.metadata from pathlib import Path from typing import Optional, Callable, Tuple logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) # ---------- Progress helper ---------- def _env_flag(name: str, default: str = "0") -> bool: return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} _PROGRESS_CB_ENABLED = _env_flag("MATANY_PROGRESS", "1") _PROGRESS_MIN_INTERVAL = float(os.getenv("MATANY_PROGRESS_MIN_SEC", "0.25")) _progress_last = 0.0 _progress_last_msg = None _progress_disabled = False def _emit_progress(cb, pct: float, msg: str): global _progress_last, _progress_last_msg, _progress_disabled if not cb or not _PROGRESS_CB_ENABLED or _progress_disabled: return now = time.time() if (now - _progress_last) < _PROGRESS_MIN_INTERVAL and msg == _progress_last_msg: return try: try: cb(pct, msg) except TypeError: cb(msg) _progress_last = now _progress_last_msg = msg except Exception as e: _progress_disabled = True log.warning("[progress-cb] disabled due to exception: %s", e) class MatAnyError(RuntimeError): pass def _cuda_snapshot(device: Optional[str]) -> str: try: import torch if not torch.cuda.is_available(): return "CUDA: N/A" idx = 0 if device and device.startswith("cuda:"): try: idx = int(device.split(":")[1]) except (ValueError, IndexError): idx = 0 name = torch.cuda.get_device_name(idx) alloc = torch.cuda.memory_allocated(idx) / (1024**3) resv = torch.cuda.memory_reserved(idx) / (1024**3) return f"device={idx}, name={name}, alloc={alloc:.2f}GB, reserved={resv:.2f}GB" except Exception as e: return f"CUDA snapshot error: {e!r}" def _safe_empty_cache(): try: import torch if torch.cuda.is_available(): log.info(f"[MATANY] CUDA memory before empty_cache: {_cuda_snapshot('cuda:0')}") torch.cuda.empty_cache() log.info(f"[MATANY] CUDA memory after empty_cache: {_cuda_snapshot('cuda:0')}") except Exception: pass class MatAnyoneSession: """ Simple wrapper around MatAnyone's official API. Uses file-based input/output as designed by the MatAnyone authors. """ def __init__(self, device: Optional[str] = None, precision: str = "auto"): log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}") # [LOG+SAFETY PATCH] self.device = device or ("cuda" if self._cuda_available() else "cpu") self.precision = precision.lower() try: version = importlib.metadata.version("matanyone") log.info(f"[MATANY] MatAnyone version: {version}") except Exception: log.info("[MATANY] MatAnyone version unknown") try: from matanyone import InferenceCore self.processor = InferenceCore("PeiqingYang/MatAnyone") log.info("[MATANY] MatAnyone InferenceCore initialized successfully") except Exception as e: log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True) # [LOG+SAFETY PATCH] raise MatAnyError(f"Failed to initialize MatAnyone: {e}") def _cuda_available(self) -> bool: try: import torch return torch.cuda.is_available() except Exception: return False def process_stream( self, video_path: Path, seed_mask_path: Optional[Path] = None, out_dir: Optional[Path] = None, progress_cb: Optional[Callable] = None, ) -> Tuple[Path, Path]: log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}") # [LOG+SAFETY PATCH] video_path = Path(video_path) if not video_path.exists(): log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}") # [LOG+SAFETY PATCH] raise MatAnyError(f"Video file not found: {video_path}") if seed_mask_path and not Path(seed_mask_path).exists(): log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}") # [LOG+SAFETY PATCH] raise MatAnyError(f"Seed mask not found: {seed_mask_path}") out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output" out_dir.mkdir(parents=True, exist_ok=True) log.info(f"[MATANY] Processing video: {video_path}") log.info(f"[MATANY] Using mask: {seed_mask_path}") log.info(f"[MATANY] Output directory: {out_dir}") _emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...") try: start_time = time.time() _emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...") foreground_path, alpha_path = self.processor.process_video( input_path=str(video_path), mask_path=str(seed_mask_path) if seed_mask_path else None, output_path=str(out_dir) ) processing_time = time.time() - start_time log.info(f"[MATANY] Processing completed in {processing_time:.1f}s") log.info(f"[MATANY] Foreground output: {foreground_path}") log.info(f"[MATANY] Alpha output: {alpha_path}") fg_path = Path(foreground_path) if foreground_path else None al_path = Path(alpha_path) if alpha_path else None if not fg_path or not fg_path.exists(): log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}") # [LOG+SAFETY PATCH] raise MatAnyError(f"Foreground output not created: {fg_path}") if not al_path or not al_path.exists(): log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}") # [LOG+SAFETY PATCH] raise MatAnyError(f"Alpha output not created: {al_path}") _emit_progress(progress_cb, 1.0, "MatAnyone processing complete") log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.") # [LOG+SAFETY PATCH] return al_path, fg_path # (alpha, foreground) except Exception as e: log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH] raise MatAnyError(f"MatAnyone processing failed: {e}") finally: _safe_empty_cache() class MatAnyoneModel: """Wrapper class for MatAnyone to match app_hf.py interface""" def __init__(self, device="cuda"): log.info(f"[MatAnyoneModel.__init__] device={device}") # [LOG+SAFETY PATCH] self.device = device self.session = None self.loaded = False self._load_model() def _load_model(self): try: self.session = MatAnyoneSession(device=self.device, precision="auto") self.loaded = True log.info("[MatAnyoneModel._load_model] Loaded successfully") # [LOG+SAFETY PATCH] except Exception as e: log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True) # [LOG+SAFETY PATCH] self.loaded = False def replace_background(self, video_path, masks, background_path): log.info(f"[MatAnyoneModel.replace_background] Start") # [LOG+SAFETY PATCH] if not self.loaded: log.error("[MatAnyoneModel.replace_background] Model not loaded") # [LOG+SAFETY PATCH] raise MatAnyError("MatAnyoneModel not loaded") try: video_path = Path(video_path) mask_path = Path(masks) if isinstance(masks, (str, Path)) else None with tempfile.TemporaryDirectory() as temp_dir: output_dir = Path(temp_dir) alpha_path, fg_path = self.session.process_stream( video_path=video_path, seed_mask_path=mask_path, out_dir=output_dir, progress_cb=None ) log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}") # [LOG+SAFETY PATCH] return str(fg_path) except Exception as e: log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True) # [LOG+SAFETY PATCH] raise MatAnyError(f"Background replacement failed: {e}") def create_matanyone_session(device=None): log.info(f"[create_matanyone_session] device={device}") # [LOG+SAFETY PATCH] return MatAnyoneSession(device=device) def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None): log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}") # [LOG+SAFETY PATCH] try: session = MatAnyoneSession(device=device) alpha_path, fg_path = session.process_stream( video_path=Path(video_path), seed_mask_path=Path(mask_path) if mask_path else None, out_dir=Path(output_dir), progress_cb=progress_callback ) log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}") # [LOG+SAFETY PATCH] return str(alpha_path), str(fg_path) except Exception as e: log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH] return None, None