|
|
|
|
|
""" |
|
|
Advanced Video Background Replacer - Streamlit Entrypoint |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
import logging |
|
|
import logging.handlers |
|
|
import traceback |
|
|
import uuid |
|
|
from tempfile import NamedTemporaryFile |
|
|
import streamlit as st |
|
|
|
|
|
from ui import render_ui |
|
|
from pipeline.video_pipeline import ( |
|
|
stage1_create_transparent_video, |
|
|
stage2_composite_background, |
|
|
setup_t4_environment, |
|
|
check_gpu |
|
|
) |
|
|
from models.model_loaders import load_sam2, load_matanyone |
|
|
|
|
|
APP_NAME = "Advanced Video Background Replacer" |
|
|
LOG_FILE = "/tmp/app.log" |
|
|
LOG_MAX_BYTES = 5 * 1024 * 1024 |
|
|
LOG_BACKUPS = 5 |
|
|
|
|
|
def setup_logging(level: int = logging.INFO) -> logging.Logger: |
|
|
logger = logging.getLogger(APP_NAME) |
|
|
logger.setLevel(level) |
|
|
logger.propagate = False |
|
|
|
|
|
for h in list(logger.handlers): |
|
|
logger.removeHandler(h) |
|
|
ch = logging.StreamHandler(sys.stdout) |
|
|
ch.setLevel(level) |
|
|
ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
fh = logging.handlers.RotatingFileHandler( |
|
|
LOG_FILE, maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUPS, encoding="utf-8" |
|
|
) |
|
|
fh.setLevel(level) |
|
|
fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
logger.addHandler(ch) |
|
|
logger.addHandler(fh) |
|
|
return logger |
|
|
|
|
|
logger = setup_logging() |
|
|
|
|
|
def custom_excepthook(type, value, tb): |
|
|
logger.error(f"Unhandled: {type.__name__}: {value}\n{''.join(traceback.format_tb(tb))}", exc_info=True) |
|
|
sys.excepthook = custom_excepthook |
|
|
|
|
|
|
|
|
sam2_predictor = load_sam2() |
|
|
matanyone_processor = load_matanyone() |
|
|
|
|
|
def initialize_session_state(): |
|
|
defaults = { |
|
|
'uploaded_video': None, |
|
|
'video_bytes_cache': None, |
|
|
'video_preview_placeholder': None, |
|
|
'bg_image_cache': None, |
|
|
'bg_preview_placeholder': None, |
|
|
'bg_color': "#00FF00", |
|
|
'cached_color': None, |
|
|
'color_display_cache': None, |
|
|
'processed_video_bytes': None, |
|
|
'processing': False, |
|
|
'gpu_available': None, |
|
|
'last_video_id': None, |
|
|
'last_bg_image_id': None, |
|
|
'last_error': None, |
|
|
'log_level_name': 'INFO', |
|
|
'auto_refresh_logs': False, |
|
|
'log_tail_lines': 400, |
|
|
'generated_bg': None, |
|
|
} |
|
|
for k, v in defaults.items(): |
|
|
if k not in st.session_state: |
|
|
st.session_state[k] = v |
|
|
if st.session_state.gpu_available is None: |
|
|
st.session_state.gpu_available = check_gpu(logger) |
|
|
|
|
|
def process_video(uploaded_video, background, bg_type, progress_callback=None): |
|
|
run_id = uuid.uuid4().hex[:8] |
|
|
logger.info("=" * 80) |
|
|
logger.info(f"[RUN {run_id}] VIDEO PROCESSING STARTED at {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}") |
|
|
logger.info(f"[RUN {run_id}] Video size={len(uploaded_video.read()) / 1e6:.2f}MB, BG type={bg_type}") |
|
|
uploaded_video.seek(0) |
|
|
st.session_state.processing = True |
|
|
st.session_state.processed_video_bytes = None |
|
|
st.session_state.last_error = None |
|
|
t0 = time.time() |
|
|
try: |
|
|
if progress_callback: |
|
|
progress_callback("π₯ Uploading video...") |
|
|
suffix = Path(uploaded_video.name).suffix or ".mp4" |
|
|
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp_vid: |
|
|
uploaded_video.seek(0) |
|
|
tmp_vid.write(uploaded_video.read()) |
|
|
tmp_vid_path = tmp_vid.name |
|
|
logger.info(f"[RUN {run_id}] Temporary video path: {tmp_vid_path}") |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback("π Stage 1: Creating transparent video (matting & segmentation)...") |
|
|
|
|
|
transparent_path, audio_path = stage1_create_transparent_video( |
|
|
tmp_vid_path, |
|
|
sam2_predictor=sam2_predictor, |
|
|
matanyone_processor=matanyone_processor, |
|
|
mat_timeout_sec=300 |
|
|
) |
|
|
if not transparent_path or not os.path.exists(transparent_path): |
|
|
raise RuntimeError("Stage 1 failed: Transparent video not created") |
|
|
logger.info(f"[RUN {run_id}] Stage 1 completed: Transparent path={transparent_path}, Audio path={audio_path}") |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback("π¨ Stage 2: Compositing with background and restoring audio...") |
|
|
final_path = stage2_composite_background( |
|
|
transparent_path, |
|
|
audio_path, |
|
|
background, |
|
|
bg_type.lower() |
|
|
) |
|
|
if not final_path or not os.path.exists(final_path): |
|
|
raise RuntimeError("Stage 2 failed: Final video not created") |
|
|
logger.info(f"[RUN {run_id}] Stage 2 completed: Final path={final_path}") |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback("π€ Loading final video for download...") |
|
|
with open(final_path, 'rb') as f: |
|
|
st.session_state.processed_video_bytes = f.read() |
|
|
total = time.time() - t0 |
|
|
logger.info(f"[RUN {run_id}] SUCCESS size={len(st.session_state.processed_video_bytes)/1e6:.2f}MB, total Ξ={total:.2f}s") |
|
|
if progress_callback: |
|
|
progress_callback("β
All done!") |
|
|
return True |
|
|
except Exception as e: |
|
|
total = time.time() - t0 |
|
|
error_msg = f"[RUN {run_id}] Processing Error: {str(e)} (Ξ {total:.2f}s)\n\nCheck logs for details." |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
st.session_state.last_error = error_msg |
|
|
if progress_callback: |
|
|
progress_callback(f"β ERROR: {str(e)}") |
|
|
return False |
|
|
finally: |
|
|
st.session_state.processing = False |
|
|
logger.info(f"[RUN {run_id}] Processing finished") |
|
|
|
|
|
def main(): |
|
|
try: |
|
|
st.set_page_config( |
|
|
page_title=APP_NAME, |
|
|
page_icon="π₯", |
|
|
layout="wide", |
|
|
initial_sidebar_state="expanded" |
|
|
) |
|
|
initialize_session_state() |
|
|
render_ui(process_video) |
|
|
except Exception as e: |
|
|
logger.error(f"Main app error: {e}", exc_info=True) |
|
|
st.error(f"App startup failed: {str(e)}. Check logs for details.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
setup_t4_environment() |
|
|
main() |
|
|
|