# ===東吳大學資料系 2025 年 LINEBOT === import logging import os import uuid from io import BytesIO from flask import Flask, abort, request, send_from_directory from google import genai from google.genai import types from google.genai.types import Tool, GenerateContentConfig, GoogleSearch from google.genai.types import Image as GeminiImage from google.genai import errors as genai_errors # 確保此行存在 from linebot.v3 import WebhookHandler from linebot.v3.exceptions import InvalidSignatureError from linebot.v3.messaging import ( ApiClient, Configuration, FlexMessage, ImageMessage, MessagingApi, MessagingApiBlob, ReplyMessageRequest, TextMessage, VideoMessage ) from linebot.v3.webhooks import ( ImageMessageContent, MessageEvent, PostbackEvent, ) from PIL import Image # === 初始化 Google Gemini === GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") client = genai.Client(api_key=GOOGLE_API_KEY) google_search_tool = Tool( google_search=GoogleSearch() ) chat = client.chats.create( model="gemini-2.0-flash", config=GenerateContentConfig( system_instruction="你是一個中文的AI助手,請用繁體中文回答", tools=[google_search_tool], response_modalities=["TEXT"], ) ) # === 初始設定 === base_url = os.getenv("SPACE_HOST") # e.g., "your-space-name.hf.space" # === Flask 應用初始化 === app = Flask(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) app.logger.setLevel(logging.INFO) channel_secret = os.environ.get("YOUR_CHANNEL_SECRET") channel_access_token = os.environ.get("YOUR_CHANNEL_ACCESS_TOKEN") configuration = Configuration(access_token=channel_access_token) handler = WebhookHandler(channel_secret) # === 靜態圖檔路由 === @app.route("/images/") def serve_image(filename): image_dir = os.path.join(os.getcwd(), "images") return send_from_directory(image_dir, filename) # === LINE Webhook 接收端點 === @app.route("/") def home(): return {"message": "Line Webhook Server"} @app.route("/", methods=["POST"]) def callback(): signature = request.headers.get("X-Line-Signature") body = request.get_data(as_text=True) app.logger.info(f"Request body: {body}") try: handler.handle(body, signature) except InvalidSignatureError: app.logger.warning("Invalid signature. Please check channel credentials.") abort(400) return "OK" # === 處理圖片訊息 === @handler.add(MessageEvent, message=ImageMessageContent) def handle_image_message(event): # 下載圖片 with ApiClient(configuration) as api_client: blob_api = MessagingApiBlob(api_client) content = blob_api.get_message_content(message_id=event.message.id) image_dir = os.path.join(os.getcwd(), "images") os.makedirs(image_dir, exist_ok=True) # 產生唯一檔名 filename = f"{uuid.uuid4().hex}.jpg" image_path = os.path.join(image_dir, filename) with open(image_path, "wb") as f: f.write(content) image_url = f"https://{base_url}/images/{filename}" # 回傳 Flex Message 功能選單 flex_content = { "type": "bubble", "hero": { "type": "image", "url": image_url, "size": "full", "aspectRatio": "20:13", "aspectMode": "cover" }, "body": { "type": "box", "layout": "vertical", "contents": [ {"type": "text", "text": "請選擇圖片處理功能", "weight": "bold", "size": "lg"} ] }, "footer": { "type": "box", "layout": "vertical", "spacing": "sm", "contents": [ { "type": "button", "style": "primary", "action": { "type": "postback", "label": "吉卜力風格圖片", "data": f"ghibli,{filename}" } }, { "type": "button", "style": "primary", "action": { "type": "postback", "label": "生成影片", "data": f"veo,{filename}" } } ] } } with ApiClient(configuration) as api_client: line_bot_api = MessagingApi(api_client) line_bot_api.reply_message( ReplyMessageRequest( reply_token=event.reply_token, messages=[ FlexMessage.from_dict({ "type": "flex", "altText": "請選擇圖片處理功能", "contents": flex_content }) ] ) ) # === 處理 Postback 事件 === @handler.add(PostbackEvent) def handle_postback(event): data = event.postback.data if data.startswith("ghibli,"): _, filename = data.split(",", 1) image_dir = os.path.join(os.getcwd(), "images") image_path = os.path.join(image_dir, filename) # 這裡呼叫你的吉卜力風格轉換函式 ghibli_img_path = convert_to_ghibli_style(image_path) ghibli_url = f"https://{base_url}/images/{os.path.basename(ghibli_img_path)}" with ApiClient(configuration) as api_client: MessagingApi(api_client).reply_message( ReplyMessageRequest( reply_token=event.reply_token, messages=[ ImageMessage( original_content_url=ghibli_url, preview_image_url=ghibli_url ), TextMessage(text="這是吉卜力風格圖片!") ] ) ) elif data.startswith("veo,"): _, original_filename = data.split(",", 1) # 這是原始圖片的檔名 image_dir = os.path.join(os.getcwd(), "images") original_image_path = os.path.join(image_dir, original_filename) # 呼叫 Veo 影片生成 # generate_video_from_image 應該回傳新影片的完整路徑 generated_video_path = generate_video_from_image(original_image_path) if generated_video_path and generated_video_path != original_image_path: # 影片成功生成 video_filename = os.path.basename(generated_video_path) video_url = f"https://{base_url}/images/{video_filename}" # 使用原始圖片作為預覽圖 preview_image_url = f"https://{base_url}/images/{original_filename}" app.logger.info(f"Generated video URL: {video_url}") app.logger.info(f"Preview image URL: {preview_image_url}") with ApiClient(configuration) as api_client: MessagingApi(api_client).reply_message( ReplyMessageRequest( reply_token=event.reply_token, messages=[ TextMessage(text="影片已生成!"), VideoMessage( original_content_url=video_url, preview_image_url=preview_image_url ) ] ) ) else: # 影片生成失敗或回傳的是原始圖片路徑 app.logger.error(f"Video generation failed or returned original image path: {generated_video_path}") with ApiClient(configuration) as api_client: MessagingApi(api_client).reply_message( ReplyMessageRequest( reply_token=event.reply_token, messages=[ TextMessage(text="抱歉,影片生成失敗。") ] ) ) def convert_to_ghibli_style(image_path): from PIL import Image import io # 讀取圖片 with open(image_path, "rb") as f: img_bytes = f.read() image = Image.open(io.BytesIO(img_bytes)) prompt = "請將這張圖片轉換成吉卜力動畫風格,要和原圖高度相似,不要亂自己生成圖片只回傳圖片。" response = client.models.generate_content( model="gemini-2.0-flash-preview-image-generation", contents=[image, prompt], config=types.GenerateContentConfig( response_modalities=["IMAGE", "TEXT"] ), ) # 取得 Gemini 回傳的圖片 for part in response.candidates[0].content.parts: if part.inline_data is not None: out_img = Image.open(io.BytesIO(part.inline_data.data)) out_path = image_path.replace(".jpg", "_ghibli.jpg") # 假設原始圖片是 .jpg # 如果原始圖片可能是其他格式,需要更通用的副檔名替換邏輯 # e.g., base, ext = os.path.splitext(image_path) # out_path = f"{base}_ghibli{ext}" out_img.save(out_path) return out_path app.logger.warning(f"Ghibli style conversion failed for {image_path}, no image data in response.") return image_path # 若失敗則回傳原圖 def generate_video_from_image(image_path): import time import os prompt = "請根據這張圖片生成一段有創意的短影片,內容需與圖片主題高度相關。" app.logger.info(f"Starting video generation for: {image_path}") # 1. 讀取本地圖片並轉換為 GeminiImage 物件 try: with open(image_path, "rb") as img_file: img_bytes = img_file.read() # 簡單判斷 MIME type mime_type = "image/jpeg" if image_path.lower().endswith(".png"): mime_type = "image/png" elif image_path.lower().endswith(".webp"): mime_type = "image/webp" image_for_veo = GeminiImage(image_bytes=img_bytes, mime_type=mime_type) app.logger.info(f"Successfully created GeminiImage object for Veo. MIME type: {mime_type}") except Exception as e: app.logger.error(f"Failed to read or convert image {image_path} for Veo: {e}") return image_path try: # 2. 呼叫 Veo API 生成影片 operation = client.models.generate_videos( model="veo-2.0-generate-001", prompt=prompt, image=image_for_veo, config=types.GenerateVideosConfig( person_generation="dont_allow", aspect_ratio="16:9", number_of_videos=1 ), ) app.logger.info(f"Video generation initiated. Operation name: {operation.name}") # 3. 等待影片生成完成 while not operation.done: app.logger.info(f"Waiting for video generation... Operation: {operation.name}") time.sleep(20) operation = client.operations.get(operation) app.logger.info(f"Video generation operation completed. Done: {operation.done}") # 4. 處理並儲存生成的影片 if operation.error: app.logger.error(f"Video generation failed with API error: {operation.error}") return image_path if operation.response and operation.response.generated_videos: image_dir = os.path.join(os.getcwd(), "images") os.makedirs(image_dir, exist_ok=True) for n, generated_video_part in enumerate(operation.response.generated_videos): if hasattr(generated_video_part, 'video') and generated_video_part.video: video_data = generated_video_part.video base, _ = os.path.splitext(os.path.basename(image_path)) fname = f"{base}_veo{n}.mp4" save_path = os.path.join(image_dir, fname) try: # 根據官方範例,先下載再儲存 client.files.download(file=video_data) video_data.save(save_path) # 生成完整的網址並 LOG 印出 video_filename = os.path.basename(save_path) full_video_url = f"https://{base_url}/images/{video_filename}" app.logger.info(f"✅ Video successfully saved to: {save_path}") app.logger.info(f"🔗 Full video URL for testing: {full_video_url}") app.logger.info(f"📁 Video filename: {video_filename}") return save_path except Exception as e: app.logger.error(f"Error saving video {fname}: {e}") else: app.logger.warning(f"Generated video part {n} does not contain valid video data.") app.logger.warning("Processed all video parts but none were successfully saved.") return image_path else: app.logger.warning("Video generation operation completed, but no videos found in the response.") return image_path except Exception as e: app.logger.error(f"An unexpected error occurred during video generation: {e}") return image_path return image_path # ... (如果你的應用程式是透過 gunicorn 執行,則不需要 if __name__ == "__main__":) # if __name__ == "__main__": # app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))