VideoBackgroundReplacer2 / models /matanyone_loader.py
MogensR's picture
Update models/matanyone_loader.py
1be662e verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MatAnyone adapter — Using Official API (File-Based)
(Enhanced logging, explicit error handling, and stage progress)
...
"""
from __future__ import annotations
import os
import time
import logging
import tempfile
import importlib.metadata
from pathlib import Path
from typing import Optional, Callable, Tuple
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
# ---------- Progress helper ----------
def _env_flag(name: str, default: str = "0") -> bool:
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}
_PROGRESS_CB_ENABLED = _env_flag("MATANY_PROGRESS", "1")
_PROGRESS_MIN_INTERVAL = float(os.getenv("MATANY_PROGRESS_MIN_SEC", "0.25"))
_progress_last = 0.0
_progress_last_msg = None
_progress_disabled = False
def _emit_progress(cb, pct: float, msg: str):
global _progress_last, _progress_last_msg, _progress_disabled
if not cb or not _PROGRESS_CB_ENABLED or _progress_disabled:
return
now = time.time()
if (now - _progress_last) < _PROGRESS_MIN_INTERVAL and msg == _progress_last_msg:
return
try:
try:
cb(pct, msg)
except TypeError:
cb(msg)
_progress_last = now
_progress_last_msg = msg
except Exception as e:
_progress_disabled = True
log.warning("[progress-cb] disabled due to exception: %s", e)
class MatAnyError(RuntimeError):
pass
def _cuda_snapshot(device: Optional[str]) -> str:
try:
import torch
if not torch.cuda.is_available():
return "CUDA: N/A"
idx = 0
if device and device.startswith("cuda:"):
try:
idx = int(device.split(":")[1])
except (ValueError, IndexError):
idx = 0
name = torch.cuda.get_device_name(idx)
alloc = torch.cuda.memory_allocated(idx) / (1024**3)
resv = torch.cuda.memory_reserved(idx) / (1024**3)
return f"device={idx}, name={name}, alloc={alloc:.2f}GB, reserved={resv:.2f}GB"
except Exception as e:
return f"CUDA snapshot error: {e!r}"
def _safe_empty_cache():
try:
import torch
if torch.cuda.is_available():
log.info(f"[MATANY] CUDA memory before empty_cache: {_cuda_snapshot('cuda:0')}")
torch.cuda.empty_cache()
log.info(f"[MATANY] CUDA memory after empty_cache: {_cuda_snapshot('cuda:0')}")
except Exception:
pass
class MatAnyoneSession:
"""
Simple wrapper around MatAnyone's official API.
Uses file-based input/output as designed by the MatAnyone authors.
"""
def __init__(self, device: Optional[str] = None, precision: str = "auto"):
log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}") # [LOG+SAFETY PATCH]
self.device = device or ("cuda" if self._cuda_available() else "cpu")
self.precision = precision.lower()
try:
version = importlib.metadata.version("matanyone")
log.info(f"[MATANY] MatAnyone version: {version}")
except Exception:
log.info("[MATANY] MatAnyone version unknown")
try:
from matanyone import InferenceCore
self.processor = InferenceCore("PeiqingYang/MatAnyone")
log.info("[MATANY] MatAnyone InferenceCore initialized successfully")
except Exception as e:
log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True) # [LOG+SAFETY PATCH]
raise MatAnyError(f"Failed to initialize MatAnyone: {e}")
def _cuda_available(self) -> bool:
try:
import torch
return torch.cuda.is_available()
except Exception:
return False
def process_stream(
self,
video_path: Path,
seed_mask_path: Optional[Path] = None,
out_dir: Optional[Path] = None,
progress_cb: Optional[Callable] = None,
) -> Tuple[Path, Path]:
log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}") # [LOG+SAFETY PATCH]
video_path = Path(video_path)
if not video_path.exists():
log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Video file not found: {video_path}")
if seed_mask_path and not Path(seed_mask_path).exists():
log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Seed mask not found: {seed_mask_path}")
out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output"
out_dir.mkdir(parents=True, exist_ok=True)
log.info(f"[MATANY] Processing video: {video_path}")
log.info(f"[MATANY] Using mask: {seed_mask_path}")
log.info(f"[MATANY] Output directory: {out_dir}")
_emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...")
try:
start_time = time.time()
_emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...")
foreground_path, alpha_path = self.processor.process_video(
input_path=str(video_path),
mask_path=str(seed_mask_path) if seed_mask_path else None,
output_path=str(out_dir)
)
processing_time = time.time() - start_time
log.info(f"[MATANY] Processing completed in {processing_time:.1f}s")
log.info(f"[MATANY] Foreground output: {foreground_path}")
log.info(f"[MATANY] Alpha output: {alpha_path}")
fg_path = Path(foreground_path) if foreground_path else None
al_path = Path(alpha_path) if alpha_path else None
if not fg_path or not fg_path.exists():
log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Foreground output not created: {fg_path}")
if not al_path or not al_path.exists():
log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Alpha output not created: {al_path}")
_emit_progress(progress_cb, 1.0, "MatAnyone processing complete")
log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.") # [LOG+SAFETY PATCH]
return al_path, fg_path # (alpha, foreground)
except Exception as e:
log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH]
raise MatAnyError(f"MatAnyone processing failed: {e}")
finally:
_safe_empty_cache()
class MatAnyoneModel:
"""Wrapper class for MatAnyone to match app_hf.py interface"""
def __init__(self, device="cuda"):
log.info(f"[MatAnyoneModel.__init__] device={device}") # [LOG+SAFETY PATCH]
self.device = device
self.session = None
self.loaded = False
self._load_model()
def _load_model(self):
try:
self.session = MatAnyoneSession(device=self.device, precision="auto")
self.loaded = True
log.info("[MatAnyoneModel._load_model] Loaded successfully") # [LOG+SAFETY PATCH]
except Exception as e:
log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True) # [LOG+SAFETY PATCH]
self.loaded = False
def replace_background(self, video_path, masks, background_path):
log.info(f"[MatAnyoneModel.replace_background] Start") # [LOG+SAFETY PATCH]
if not self.loaded:
log.error("[MatAnyoneModel.replace_background] Model not loaded") # [LOG+SAFETY PATCH]
raise MatAnyError("MatAnyoneModel not loaded")
try:
video_path = Path(video_path)
mask_path = Path(masks) if isinstance(masks, (str, Path)) else None
with tempfile.TemporaryDirectory() as temp_dir:
output_dir = Path(temp_dir)
alpha_path, fg_path = self.session.process_stream(
video_path=video_path,
seed_mask_path=mask_path,
out_dir=output_dir,
progress_cb=None
)
log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}") # [LOG+SAFETY PATCH]
return str(fg_path)
except Exception as e:
log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True) # [LOG+SAFETY PATCH]
raise MatAnyError(f"Background replacement failed: {e}")
def create_matanyone_session(device=None):
log.info(f"[create_matanyone_session] device={device}") # [LOG+SAFETY PATCH]
return MatAnyoneSession(device=device)
def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None):
log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}") # [LOG+SAFETY PATCH]
try:
session = MatAnyoneSession(device=device)
alpha_path, fg_path = session.process_stream(
video_path=Path(video_path),
seed_mask_path=Path(mask_path) if mask_path else None,
out_dir=Path(output_dir),
progress_cb=progress_callback
)
log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}") # [LOG+SAFETY PATCH]
return str(alpha_path), str(fg_path)
except Exception as e:
log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH]
return None, None