|
|
|
|
|
|
|
|
""" |
|
|
MatAnyone adapter — Using Official API (File-Based) |
|
|
(Enhanced logging, explicit error handling, and stage progress) |
|
|
|
|
|
... |
|
|
""" |
|
|
from __future__ import annotations |
|
|
import os |
|
|
import time |
|
|
import logging |
|
|
import tempfile |
|
|
import importlib.metadata |
|
|
from pathlib import Path |
|
|
from typing import Optional, Callable, Tuple |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def _env_flag(name: str, default: str = "0") -> bool: |
|
|
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
|
_PROGRESS_CB_ENABLED = _env_flag("MATANY_PROGRESS", "1") |
|
|
_PROGRESS_MIN_INTERVAL = float(os.getenv("MATANY_PROGRESS_MIN_SEC", "0.25")) |
|
|
_progress_last = 0.0 |
|
|
_progress_last_msg = None |
|
|
_progress_disabled = False |
|
|
|
|
|
def _emit_progress(cb, pct: float, msg: str): |
|
|
global _progress_last, _progress_last_msg, _progress_disabled |
|
|
if not cb or not _PROGRESS_CB_ENABLED or _progress_disabled: |
|
|
return |
|
|
now = time.time() |
|
|
if (now - _progress_last) < _PROGRESS_MIN_INTERVAL and msg == _progress_last_msg: |
|
|
return |
|
|
try: |
|
|
try: |
|
|
cb(pct, msg) |
|
|
except TypeError: |
|
|
cb(msg) |
|
|
_progress_last = now |
|
|
_progress_last_msg = msg |
|
|
except Exception as e: |
|
|
_progress_disabled = True |
|
|
log.warning("[progress-cb] disabled due to exception: %s", e) |
|
|
|
|
|
class MatAnyError(RuntimeError): |
|
|
pass |
|
|
|
|
|
def _cuda_snapshot(device: Optional[str]) -> str: |
|
|
try: |
|
|
import torch |
|
|
if not torch.cuda.is_available(): |
|
|
return "CUDA: N/A" |
|
|
idx = 0 |
|
|
if device and device.startswith("cuda:"): |
|
|
try: |
|
|
idx = int(device.split(":")[1]) |
|
|
except (ValueError, IndexError): |
|
|
idx = 0 |
|
|
name = torch.cuda.get_device_name(idx) |
|
|
alloc = torch.cuda.memory_allocated(idx) / (1024**3) |
|
|
resv = torch.cuda.memory_reserved(idx) / (1024**3) |
|
|
return f"device={idx}, name={name}, alloc={alloc:.2f}GB, reserved={resv:.2f}GB" |
|
|
except Exception as e: |
|
|
return f"CUDA snapshot error: {e!r}" |
|
|
|
|
|
def _safe_empty_cache(): |
|
|
try: |
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
log.info(f"[MATANY] CUDA memory before empty_cache: {_cuda_snapshot('cuda:0')}") |
|
|
torch.cuda.empty_cache() |
|
|
log.info(f"[MATANY] CUDA memory after empty_cache: {_cuda_snapshot('cuda:0')}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
class MatAnyoneSession: |
|
|
""" |
|
|
Simple wrapper around MatAnyone's official API. |
|
|
Uses file-based input/output as designed by the MatAnyone authors. |
|
|
""" |
|
|
def __init__(self, device: Optional[str] = None, precision: str = "auto"): |
|
|
log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}") |
|
|
self.device = device or ("cuda" if self._cuda_available() else "cpu") |
|
|
self.precision = precision.lower() |
|
|
try: |
|
|
version = importlib.metadata.version("matanyone") |
|
|
log.info(f"[MATANY] MatAnyone version: {version}") |
|
|
except Exception: |
|
|
log.info("[MATANY] MatAnyone version unknown") |
|
|
try: |
|
|
from matanyone import InferenceCore |
|
|
self.processor = InferenceCore("PeiqingYang/MatAnyone") |
|
|
log.info("[MATANY] MatAnyone InferenceCore initialized successfully") |
|
|
except Exception as e: |
|
|
log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True) |
|
|
raise MatAnyError(f"Failed to initialize MatAnyone: {e}") |
|
|
|
|
|
def _cuda_available(self) -> bool: |
|
|
try: |
|
|
import torch |
|
|
return torch.cuda.is_available() |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def process_stream( |
|
|
self, |
|
|
video_path: Path, |
|
|
seed_mask_path: Optional[Path] = None, |
|
|
out_dir: Optional[Path] = None, |
|
|
progress_cb: Optional[Callable] = None, |
|
|
) -> Tuple[Path, Path]: |
|
|
log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}") |
|
|
video_path = Path(video_path) |
|
|
if not video_path.exists(): |
|
|
log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}") |
|
|
raise MatAnyError(f"Video file not found: {video_path}") |
|
|
if seed_mask_path and not Path(seed_mask_path).exists(): |
|
|
log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}") |
|
|
raise MatAnyError(f"Seed mask not found: {seed_mask_path}") |
|
|
out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output" |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
log.info(f"[MATANY] Processing video: {video_path}") |
|
|
log.info(f"[MATANY] Using mask: {seed_mask_path}") |
|
|
log.info(f"[MATANY] Output directory: {out_dir}") |
|
|
_emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...") |
|
|
try: |
|
|
start_time = time.time() |
|
|
_emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...") |
|
|
foreground_path, alpha_path = self.processor.process_video( |
|
|
input_path=str(video_path), |
|
|
mask_path=str(seed_mask_path) if seed_mask_path else None, |
|
|
output_path=str(out_dir) |
|
|
) |
|
|
processing_time = time.time() - start_time |
|
|
log.info(f"[MATANY] Processing completed in {processing_time:.1f}s") |
|
|
log.info(f"[MATANY] Foreground output: {foreground_path}") |
|
|
log.info(f"[MATANY] Alpha output: {alpha_path}") |
|
|
fg_path = Path(foreground_path) if foreground_path else None |
|
|
al_path = Path(alpha_path) if alpha_path else None |
|
|
if not fg_path or not fg_path.exists(): |
|
|
log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}") |
|
|
raise MatAnyError(f"Foreground output not created: {fg_path}") |
|
|
if not al_path or not al_path.exists(): |
|
|
log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}") |
|
|
raise MatAnyError(f"Alpha output not created: {al_path}") |
|
|
_emit_progress(progress_cb, 1.0, "MatAnyone processing complete") |
|
|
log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.") |
|
|
return al_path, fg_path |
|
|
except Exception as e: |
|
|
log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True) |
|
|
raise MatAnyError(f"MatAnyone processing failed: {e}") |
|
|
finally: |
|
|
_safe_empty_cache() |
|
|
|
|
|
class MatAnyoneModel: |
|
|
"""Wrapper class for MatAnyone to match app_hf.py interface""" |
|
|
def __init__(self, device="cuda"): |
|
|
log.info(f"[MatAnyoneModel.__init__] device={device}") |
|
|
self.device = device |
|
|
self.session = None |
|
|
self.loaded = False |
|
|
self._load_model() |
|
|
def _load_model(self): |
|
|
try: |
|
|
self.session = MatAnyoneSession(device=self.device, precision="auto") |
|
|
self.loaded = True |
|
|
log.info("[MatAnyoneModel._load_model] Loaded successfully") |
|
|
except Exception as e: |
|
|
log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True) |
|
|
self.loaded = False |
|
|
def replace_background(self, video_path, masks, background_path): |
|
|
log.info(f"[MatAnyoneModel.replace_background] Start") |
|
|
if not self.loaded: |
|
|
log.error("[MatAnyoneModel.replace_background] Model not loaded") |
|
|
raise MatAnyError("MatAnyoneModel not loaded") |
|
|
try: |
|
|
video_path = Path(video_path) |
|
|
mask_path = Path(masks) if isinstance(masks, (str, Path)) else None |
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
output_dir = Path(temp_dir) |
|
|
alpha_path, fg_path = self.session.process_stream( |
|
|
video_path=video_path, |
|
|
seed_mask_path=mask_path, |
|
|
out_dir=output_dir, |
|
|
progress_cb=None |
|
|
) |
|
|
log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}") |
|
|
return str(fg_path) |
|
|
except Exception as e: |
|
|
log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True) |
|
|
raise MatAnyError(f"Background replacement failed: {e}") |
|
|
|
|
|
def create_matanyone_session(device=None): |
|
|
log.info(f"[create_matanyone_session] device={device}") |
|
|
return MatAnyoneSession(device=device) |
|
|
|
|
|
def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None): |
|
|
log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}") |
|
|
try: |
|
|
session = MatAnyoneSession(device=device) |
|
|
alpha_path, fg_path = session.process_stream( |
|
|
video_path=Path(video_path), |
|
|
seed_mask_path=Path(mask_path) if mask_path else None, |
|
|
out_dir=Path(output_dir), |
|
|
progress_cb=progress_callback |
|
|
) |
|
|
log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}") |
|
|
return str(alpha_path), str(fg_path) |
|
|
except Exception as e: |
|
|
log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True) |
|
|
return None, None |
|
|
|