MogensR commited on
Commit
1be662e
·
verified ·
1 Parent(s): 01d284f

Update models/matanyone_loader.py

Browse files
Files changed (1) hide show
  1. models/matanyone_loader.py +27 -102
models/matanyone_loader.py CHANGED
@@ -2,18 +2,10 @@
2
  # -*- coding: utf-8 -*-
3
  """
4
  MatAnyone adapter — Using Official API (File-Based)
 
5
 
6
- Fixed to use MatAnyone's official process_video() API instead of
7
- bypassing it with internal tensor manipulation. This eliminates
8
- all 5D tensor dimension issues.
9
-
10
- Changes (2025-09-17):
11
- - Replaced custom tensor processing with official MatAnyone API
12
- - Uses file-based input/output as designed by MatAnyone authors
13
- - Eliminates all tensor dimension compatibility issues
14
- - Simplified error handling and logging
15
  """
16
-
17
  from __future__ import annotations
18
  import os
19
  import time
@@ -23,6 +15,7 @@
23
  from pathlib import Path
24
  from typing import Optional, Callable, Tuple
25
 
 
26
  log = logging.getLogger(__name__)
27
 
28
  # ---------- Progress helper ----------
@@ -44,20 +37,18 @@ def _emit_progress(cb, pct: float, msg: str):
44
  return
45
  try:
46
  try:
47
- cb(pct, msg) # preferred (pct, msg)
48
  except TypeError:
49
- cb(msg) # legacy (msg)
50
  _progress_last = now
51
  _progress_last_msg = msg
52
  except Exception as e:
53
  _progress_disabled = True
54
  log.warning("[progress-cb] disabled due to exception: %s", e)
55
 
56
- # ---------- Errors ----------
57
  class MatAnyError(RuntimeError):
58
  pass
59
 
60
- # ---------- CUDA helpers ----------
61
  def _cuda_snapshot(device: Optional[str]) -> str:
62
  try:
63
  import torch
@@ -86,30 +77,26 @@ def _safe_empty_cache():
86
  except Exception:
87
  pass
88
 
89
- # ============================================================================
90
-
91
  class MatAnyoneSession:
92
  """
93
  Simple wrapper around MatAnyone's official API.
94
  Uses file-based input/output as designed by the MatAnyone authors.
95
  """
96
  def __init__(self, device: Optional[str] = None, precision: str = "auto"):
 
97
  self.device = device or ("cuda" if self._cuda_available() else "cpu")
98
  self.precision = precision.lower()
99
-
100
- # Log MatAnyone version
101
  try:
102
  version = importlib.metadata.version("matanyone")
103
  log.info(f"[MATANY] MatAnyone version: {version}")
104
  except Exception:
105
  log.info("[MATANY] MatAnyone version unknown")
106
-
107
- # Initialize MatAnyone's official API
108
  try:
109
  from matanyone import InferenceCore
110
  self.processor = InferenceCore("PeiqingYang/MatAnyone")
111
  log.info("[MATANY] MatAnyone InferenceCore initialized successfully")
112
  except Exception as e:
 
113
  raise MatAnyError(f"Failed to initialize MatAnyone: {e}")
114
 
115
  def _cuda_available(self) -> bool:
@@ -126,156 +113,93 @@ def process_stream(
126
  out_dir: Optional[Path] = None,
127
  progress_cb: Optional[Callable] = None,
128
  ) -> Tuple[Path, Path]:
129
- """
130
- Process video using MatAnyone's official API.
131
-
132
- Args:
133
- video_path: Path to input video file
134
- seed_mask_path: Path to first-frame mask PNG (white=foreground, black=background)
135
- out_dir: Output directory for results
136
- progress_cb: Progress callback function
137
-
138
- Returns:
139
- Tuple of (alpha_path, foreground_path)
140
- """
141
  video_path = Path(video_path)
142
  if not video_path.exists():
 
143
  raise MatAnyError(f"Video file not found: {video_path}")
144
-
145
  if seed_mask_path and not Path(seed_mask_path).exists():
 
146
  raise MatAnyError(f"Seed mask not found: {seed_mask_path}")
147
-
148
  out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output"
149
  out_dir.mkdir(parents=True, exist_ok=True)
150
-
151
  log.info(f"[MATANY] Processing video: {video_path}")
152
  log.info(f"[MATANY] Using mask: {seed_mask_path}")
153
  log.info(f"[MATANY] Output directory: {out_dir}")
154
-
155
  _emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...")
156
-
157
  try:
158
- # Use MatAnyone's official API
159
  start_time = time.time()
160
-
161
  _emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...")
162
-
163
- # Call the official process_video method
164
  foreground_path, alpha_path = self.processor.process_video(
165
  input_path=str(video_path),
166
  mask_path=str(seed_mask_path) if seed_mask_path else None,
167
  output_path=str(out_dir)
168
  )
169
-
170
  processing_time = time.time() - start_time
171
  log.info(f"[MATANY] Processing completed in {processing_time:.1f}s")
172
  log.info(f"[MATANY] Foreground output: {foreground_path}")
173
  log.info(f"[MATANY] Alpha output: {alpha_path}")
174
-
175
- # Convert to Path objects
176
  fg_path = Path(foreground_path) if foreground_path else None
177
  al_path = Path(alpha_path) if alpha_path else None
178
-
179
- # Verify outputs exist
180
  if not fg_path or not fg_path.exists():
 
181
  raise MatAnyError(f"Foreground output not created: {fg_path}")
182
  if not al_path or not al_path.exists():
 
183
  raise MatAnyError(f"Alpha output not created: {al_path}")
184
-
185
  _emit_progress(progress_cb, 1.0, "MatAnyone processing complete")
186
-
187
- return al_path, fg_path # Return (alpha, foreground) to match expected order
188
-
189
  except Exception as e:
190
- log.error(f"[MATANY] Processing failed: {e}")
191
  raise MatAnyError(f"MatAnyone processing failed: {e}")
192
-
193
  finally:
194
  _safe_empty_cache()
195
 
196
- # ============================================================================
197
- # MatAnyoneModel Wrapper Class for app_hf.py compatibility
198
- # ============================================================================
199
-
200
  class MatAnyoneModel:
201
  """Wrapper class for MatAnyone to match app_hf.py interface"""
202
-
203
  def __init__(self, device="cuda"):
 
204
  self.device = device
205
  self.session = None
206
  self.loaded = False
207
- log.info(f"Initializing MatAnyoneModel on device: {device}")
208
-
209
- # Initialize the session
210
  self._load_model()
211
-
212
  def _load_model(self):
213
- """Load the MatAnyone session"""
214
  try:
215
  self.session = MatAnyoneSession(device=self.device, precision="auto")
216
  self.loaded = True
217
- log.info("MatAnyoneModel loaded successfully")
218
  except Exception as e:
219
- log.error(f"Error loading MatAnyoneModel: {e}")
220
  self.loaded = False
221
-
222
  def replace_background(self, video_path, masks, background_path):
223
- """Replace background in video using MatAnyone"""
224
  if not self.loaded:
 
225
  raise MatAnyError("MatAnyoneModel not loaded")
226
-
227
  try:
228
- from pathlib import Path
229
- import tempfile
230
-
231
- # Convert paths to Path objects
232
  video_path = Path(video_path)
233
-
234
- # For now, we expect masks to be a path to the first-frame mask
235
  mask_path = Path(masks) if isinstance(masks, (str, Path)) else None
236
-
237
- # Create output directory
238
  with tempfile.TemporaryDirectory() as temp_dir:
239
  output_dir = Path(temp_dir)
240
-
241
- # Process the video stream
242
  alpha_path, fg_path = self.session.process_stream(
243
  video_path=video_path,
244
  seed_mask_path=mask_path,
245
  out_dir=output_dir,
246
  progress_cb=None
247
  )
248
-
249
- # Return the foreground video path
250
- # In a full implementation, you'd composite with the background_path
251
  return str(fg_path)
252
-
253
  except Exception as e:
254
- log.error(f"Error in replace_background: {e}")
255
  raise MatAnyError(f"Background replacement failed: {e}")
256
 
257
- # ============================================================================
258
- # Helper function for pipeline integration
259
- # ============================================================================
260
-
261
  def create_matanyone_session(device=None):
262
- """Create a MatAnyone session for use in pipeline"""
263
  return MatAnyoneSession(device=device)
264
 
265
  def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None):
266
- """
267
- Run MatAnyone on video and mask files.
268
-
269
- Args:
270
- video_path: Path to input video
271
- mask_path: Path to first-frame mask PNG
272
- output_dir: Directory for outputs
273
- device: Device to use (cuda/cpu)
274
- progress_callback: Progress callback function
275
-
276
- Returns:
277
- Tuple of (alpha_path, foreground_path) or (None, None) on failure
278
- """
279
  try:
280
  session = MatAnyoneSession(device=device)
281
  alpha_path, fg_path = session.process_stream(
@@ -284,7 +208,8 @@ def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", pro
284
  out_dir=Path(output_dir),
285
  progress_cb=progress_callback
286
  )
 
287
  return str(alpha_path), str(fg_path)
288
  except Exception as e:
289
- log.error(f"MatAnyone processing failed: {e}")
290
- return None, None
 
2
  # -*- coding: utf-8 -*-
3
  """
4
  MatAnyone adapter — Using Official API (File-Based)
5
+ (Enhanced logging, explicit error handling, and stage progress)
6
 
7
+ ...
 
 
 
 
 
 
 
 
8
  """
 
9
  from __future__ import annotations
10
  import os
11
  import time
 
15
  from pathlib import Path
16
  from typing import Optional, Callable, Tuple
17
 
18
+ logging.basicConfig(level=logging.INFO)
19
  log = logging.getLogger(__name__)
20
 
21
  # ---------- Progress helper ----------
 
37
  return
38
  try:
39
  try:
40
+ cb(pct, msg)
41
  except TypeError:
42
+ cb(msg)
43
  _progress_last = now
44
  _progress_last_msg = msg
45
  except Exception as e:
46
  _progress_disabled = True
47
  log.warning("[progress-cb] disabled due to exception: %s", e)
48
 
 
49
  class MatAnyError(RuntimeError):
50
  pass
51
 
 
52
  def _cuda_snapshot(device: Optional[str]) -> str:
53
  try:
54
  import torch
 
77
  except Exception:
78
  pass
79
 
 
 
80
  class MatAnyoneSession:
81
  """
82
  Simple wrapper around MatAnyone's official API.
83
  Uses file-based input/output as designed by the MatAnyone authors.
84
  """
85
  def __init__(self, device: Optional[str] = None, precision: str = "auto"):
86
+ log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}") # [LOG+SAFETY PATCH]
87
  self.device = device or ("cuda" if self._cuda_available() else "cpu")
88
  self.precision = precision.lower()
 
 
89
  try:
90
  version = importlib.metadata.version("matanyone")
91
  log.info(f"[MATANY] MatAnyone version: {version}")
92
  except Exception:
93
  log.info("[MATANY] MatAnyone version unknown")
 
 
94
  try:
95
  from matanyone import InferenceCore
96
  self.processor = InferenceCore("PeiqingYang/MatAnyone")
97
  log.info("[MATANY] MatAnyone InferenceCore initialized successfully")
98
  except Exception as e:
99
+ log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True) # [LOG+SAFETY PATCH]
100
  raise MatAnyError(f"Failed to initialize MatAnyone: {e}")
101
 
102
  def _cuda_available(self) -> bool:
 
113
  out_dir: Optional[Path] = None,
114
  progress_cb: Optional[Callable] = None,
115
  ) -> Tuple[Path, Path]:
116
+ log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}") # [LOG+SAFETY PATCH]
 
 
 
 
 
 
 
 
 
 
 
117
  video_path = Path(video_path)
118
  if not video_path.exists():
119
+ log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}") # [LOG+SAFETY PATCH]
120
  raise MatAnyError(f"Video file not found: {video_path}")
 
121
  if seed_mask_path and not Path(seed_mask_path).exists():
122
+ log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}") # [LOG+SAFETY PATCH]
123
  raise MatAnyError(f"Seed mask not found: {seed_mask_path}")
 
124
  out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output"
125
  out_dir.mkdir(parents=True, exist_ok=True)
 
126
  log.info(f"[MATANY] Processing video: {video_path}")
127
  log.info(f"[MATANY] Using mask: {seed_mask_path}")
128
  log.info(f"[MATANY] Output directory: {out_dir}")
 
129
  _emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...")
 
130
  try:
 
131
  start_time = time.time()
 
132
  _emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...")
 
 
133
  foreground_path, alpha_path = self.processor.process_video(
134
  input_path=str(video_path),
135
  mask_path=str(seed_mask_path) if seed_mask_path else None,
136
  output_path=str(out_dir)
137
  )
 
138
  processing_time = time.time() - start_time
139
  log.info(f"[MATANY] Processing completed in {processing_time:.1f}s")
140
  log.info(f"[MATANY] Foreground output: {foreground_path}")
141
  log.info(f"[MATANY] Alpha output: {alpha_path}")
 
 
142
  fg_path = Path(foreground_path) if foreground_path else None
143
  al_path = Path(alpha_path) if alpha_path else None
 
 
144
  if not fg_path or not fg_path.exists():
145
+ log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}") # [LOG+SAFETY PATCH]
146
  raise MatAnyError(f"Foreground output not created: {fg_path}")
147
  if not al_path or not al_path.exists():
148
+ log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}") # [LOG+SAFETY PATCH]
149
  raise MatAnyError(f"Alpha output not created: {al_path}")
 
150
  _emit_progress(progress_cb, 1.0, "MatAnyone processing complete")
151
+ log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.") # [LOG+SAFETY PATCH]
152
+ return al_path, fg_path # (alpha, foreground)
 
153
  except Exception as e:
154
+ log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH]
155
  raise MatAnyError(f"MatAnyone processing failed: {e}")
 
156
  finally:
157
  _safe_empty_cache()
158
 
 
 
 
 
159
  class MatAnyoneModel:
160
  """Wrapper class for MatAnyone to match app_hf.py interface"""
 
161
  def __init__(self, device="cuda"):
162
+ log.info(f"[MatAnyoneModel.__init__] device={device}") # [LOG+SAFETY PATCH]
163
  self.device = device
164
  self.session = None
165
  self.loaded = False
 
 
 
166
  self._load_model()
 
167
  def _load_model(self):
 
168
  try:
169
  self.session = MatAnyoneSession(device=self.device, precision="auto")
170
  self.loaded = True
171
+ log.info("[MatAnyoneModel._load_model] Loaded successfully") # [LOG+SAFETY PATCH]
172
  except Exception as e:
173
+ log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True) # [LOG+SAFETY PATCH]
174
  self.loaded = False
 
175
  def replace_background(self, video_path, masks, background_path):
176
+ log.info(f"[MatAnyoneModel.replace_background] Start") # [LOG+SAFETY PATCH]
177
  if not self.loaded:
178
+ log.error("[MatAnyoneModel.replace_background] Model not loaded") # [LOG+SAFETY PATCH]
179
  raise MatAnyError("MatAnyoneModel not loaded")
 
180
  try:
 
 
 
 
181
  video_path = Path(video_path)
 
 
182
  mask_path = Path(masks) if isinstance(masks, (str, Path)) else None
 
 
183
  with tempfile.TemporaryDirectory() as temp_dir:
184
  output_dir = Path(temp_dir)
 
 
185
  alpha_path, fg_path = self.session.process_stream(
186
  video_path=video_path,
187
  seed_mask_path=mask_path,
188
  out_dir=output_dir,
189
  progress_cb=None
190
  )
191
+ log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}") # [LOG+SAFETY PATCH]
 
 
192
  return str(fg_path)
 
193
  except Exception as e:
194
+ log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True) # [LOG+SAFETY PATCH]
195
  raise MatAnyError(f"Background replacement failed: {e}")
196
 
 
 
 
 
197
  def create_matanyone_session(device=None):
198
+ log.info(f"[create_matanyone_session] device={device}") # [LOG+SAFETY PATCH]
199
  return MatAnyoneSession(device=device)
200
 
201
  def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None):
202
+ log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}") # [LOG+SAFETY PATCH]
 
 
 
 
 
 
 
 
 
 
 
 
203
  try:
204
  session = MatAnyoneSession(device=device)
205
  alpha_path, fg_path = session.process_stream(
 
208
  out_dir=Path(output_dir),
209
  progress_cb=progress_callback
210
  )
211
+ log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}") # [LOG+SAFETY PATCH]
212
  return str(alpha_path), str(fg_path)
213
  except Exception as e:
214
+ log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH]
215
+ return None, None