Spaces:
Running
Running
| # ===東吳大學資料系 2025 年 LINEBOT === | |
| import logging | |
| import os | |
| import uuid | |
| from io import BytesIO | |
| from flask import Flask, abort, request, send_from_directory | |
| from google import genai | |
| from google.genai import types | |
| from google.genai.types import Tool, GenerateContentConfig, GoogleSearch | |
| from google.genai.types import Image as GeminiImage | |
| from google.genai import errors as genai_errors # 確保此行存在 | |
| from linebot.v3 import WebhookHandler | |
| from linebot.v3.exceptions import InvalidSignatureError | |
| from linebot.v3.messaging import ( | |
| ApiClient, | |
| Configuration, | |
| FlexMessage, | |
| ImageMessage, | |
| MessagingApi, | |
| MessagingApiBlob, | |
| ReplyMessageRequest, | |
| TextMessage, | |
| VideoMessage | |
| ) | |
| from linebot.v3.webhooks import ( | |
| ImageMessageContent, | |
| MessageEvent, | |
| PostbackEvent, | |
| ) | |
| from PIL import Image | |
| # === 初始化 Google Gemini === | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| google_search_tool = Tool( | |
| google_search=GoogleSearch() | |
| ) | |
| chat = client.chats.create( | |
| model="gemini-2.0-flash", | |
| config=GenerateContentConfig( | |
| system_instruction="你是一個中文的AI助手,請用繁體中文回答", | |
| tools=[google_search_tool], | |
| response_modalities=["TEXT"], | |
| ) | |
| ) | |
| # === 初始設定 === | |
| base_url = os.getenv("SPACE_HOST") # e.g., "your-space-name.hf.space" | |
| # === Flask 應用初始化 === | |
| app = Flask(__name__) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| app.logger.setLevel(logging.INFO) | |
| channel_secret = os.environ.get("YOUR_CHANNEL_SECRET") | |
| channel_access_token = os.environ.get("YOUR_CHANNEL_ACCESS_TOKEN") | |
| configuration = Configuration(access_token=channel_access_token) | |
| handler = WebhookHandler(channel_secret) | |
| # === 靜態圖檔路由 === | |
| def serve_image(filename): | |
| image_dir = os.path.join(os.getcwd(), "images") | |
| return send_from_directory(image_dir, filename) | |
| # === LINE Webhook 接收端點 === | |
| def home(): | |
| return {"message": "Line Webhook Server"} | |
| def callback(): | |
| signature = request.headers.get("X-Line-Signature") | |
| body = request.get_data(as_text=True) | |
| app.logger.info(f"Request body: {body}") | |
| try: | |
| handler.handle(body, signature) | |
| except InvalidSignatureError: | |
| app.logger.warning("Invalid signature. Please check channel credentials.") | |
| abort(400) | |
| return "OK" | |
| # === 處理圖片訊息 === | |
| def handle_image_message(event): | |
| # 下載圖片 | |
| with ApiClient(configuration) as api_client: | |
| blob_api = MessagingApiBlob(api_client) | |
| content = blob_api.get_message_content(message_id=event.message.id) | |
| image_dir = os.path.join(os.getcwd(), "images") | |
| os.makedirs(image_dir, exist_ok=True) | |
| # 產生唯一檔名 | |
| filename = f"{uuid.uuid4().hex}.jpg" | |
| image_path = os.path.join(image_dir, filename) | |
| with open(image_path, "wb") as f: | |
| f.write(content) | |
| image_url = f"https://{base_url}/images/{filename}" | |
| # 回傳 Flex Message 功能選單 | |
| flex_content = { | |
| "type": "bubble", | |
| "hero": { | |
| "type": "image", | |
| "url": image_url, | |
| "size": "full", | |
| "aspectRatio": "20:13", | |
| "aspectMode": "cover" | |
| }, | |
| "body": { | |
| "type": "box", | |
| "layout": "vertical", | |
| "contents": [ | |
| {"type": "text", "text": "請選擇圖片處理功能", "weight": "bold", "size": "lg"} | |
| ] | |
| }, | |
| "footer": { | |
| "type": "box", | |
| "layout": "vertical", | |
| "spacing": "sm", | |
| "contents": [ | |
| { | |
| "type": "button", | |
| "style": "primary", | |
| "action": { | |
| "type": "postback", | |
| "label": "吉卜力風格圖片", | |
| "data": f"ghibli,{filename}" | |
| } | |
| }, | |
| { | |
| "type": "button", | |
| "style": "primary", | |
| "action": { | |
| "type": "postback", | |
| "label": "生成影片", | |
| "data": f"veo,{filename}" | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| with ApiClient(configuration) as api_client: | |
| line_bot_api = MessagingApi(api_client) | |
| line_bot_api.reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[ | |
| FlexMessage.from_dict({ | |
| "type": "flex", | |
| "altText": "請選擇圖片處理功能", | |
| "contents": flex_content | |
| }) | |
| ] | |
| ) | |
| ) | |
| # === 處理 Postback 事件 === | |
| def handle_postback(event): | |
| data = event.postback.data | |
| if data.startswith("ghibli,"): | |
| _, filename = data.split(",", 1) | |
| image_dir = os.path.join(os.getcwd(), "images") | |
| image_path = os.path.join(image_dir, filename) | |
| # 這裡呼叫你的吉卜力風格轉換函式 | |
| ghibli_img_path = convert_to_ghibli_style(image_path) | |
| ghibli_url = f"https://{base_url}/images/{os.path.basename(ghibli_img_path)}" | |
| with ApiClient(configuration) as api_client: | |
| MessagingApi(api_client).reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[ | |
| ImageMessage( | |
| original_content_url=ghibli_url, | |
| preview_image_url=ghibli_url | |
| ), | |
| TextMessage(text="這是吉卜力風格圖片!") | |
| ] | |
| ) | |
| ) | |
| elif data.startswith("veo,"): | |
| _, original_filename = data.split(",", 1) # 這是原始圖片的檔名 | |
| image_dir = os.path.join(os.getcwd(), "images") | |
| original_image_path = os.path.join(image_dir, original_filename) | |
| # 呼叫 Veo 影片生成 | |
| # generate_video_from_image 應該回傳新影片的完整路徑 | |
| generated_video_path = generate_video_from_image(original_image_path) | |
| if generated_video_path and generated_video_path != original_image_path: | |
| # 影片成功生成 | |
| video_filename = os.path.basename(generated_video_path) | |
| video_url = f"https://{base_url}/images/{video_filename}" | |
| # 使用原始圖片作為預覽圖 | |
| preview_image_url = f"https://{base_url}/images/{original_filename}" | |
| app.logger.info(f"Generated video URL: {video_url}") | |
| app.logger.info(f"Preview image URL: {preview_image_url}") | |
| with ApiClient(configuration) as api_client: | |
| MessagingApi(api_client).reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[ | |
| TextMessage(text="影片已生成!"), | |
| VideoMessage( | |
| original_content_url=video_url, | |
| preview_image_url=preview_image_url | |
| ) | |
| ] | |
| ) | |
| ) | |
| else: | |
| # 影片生成失敗或回傳的是原始圖片路徑 | |
| app.logger.error(f"Video generation failed or returned original image path: {generated_video_path}") | |
| with ApiClient(configuration) as api_client: | |
| MessagingApi(api_client).reply_message( | |
| ReplyMessageRequest( | |
| reply_token=event.reply_token, | |
| messages=[ | |
| TextMessage(text="抱歉,影片生成失敗。") | |
| ] | |
| ) | |
| ) | |
| def convert_to_ghibli_style(image_path): | |
| from PIL import Image | |
| import io | |
| # 讀取圖片 | |
| with open(image_path, "rb") as f: | |
| img_bytes = f.read() | |
| image = Image.open(io.BytesIO(img_bytes)) | |
| prompt = "請將這張圖片轉換成吉卜力動畫風格,要和原圖高度相似,不要亂自己生成圖片只回傳圖片。" | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash-preview-image-generation", | |
| contents=[image, prompt], | |
| config=types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"] | |
| ), | |
| ) | |
| # 取得 Gemini 回傳的圖片 | |
| for part in response.candidates[0].content.parts: | |
| if part.inline_data is not None: | |
| out_img = Image.open(io.BytesIO(part.inline_data.data)) | |
| out_path = image_path.replace(".jpg", "_ghibli.jpg") # 假設原始圖片是 .jpg | |
| # 如果原始圖片可能是其他格式,需要更通用的副檔名替換邏輯 | |
| # e.g., base, ext = os.path.splitext(image_path) | |
| # out_path = f"{base}_ghibli{ext}" | |
| out_img.save(out_path) | |
| return out_path | |
| app.logger.warning(f"Ghibli style conversion failed for {image_path}, no image data in response.") | |
| return image_path # 若失敗則回傳原圖 | |
| def generate_video_from_image(image_path): | |
| import time | |
| import os | |
| prompt = "請根據這張圖片生成一段有創意的短影片,內容需與圖片主題高度相關。" | |
| app.logger.info(f"Starting video generation for: {image_path}") | |
| # 1. 讀取本地圖片並轉換為 GeminiImage 物件 | |
| try: | |
| with open(image_path, "rb") as img_file: | |
| img_bytes = img_file.read() | |
| # 簡單判斷 MIME type | |
| mime_type = "image/jpeg" | |
| if image_path.lower().endswith(".png"): | |
| mime_type = "image/png" | |
| elif image_path.lower().endswith(".webp"): | |
| mime_type = "image/webp" | |
| image_for_veo = GeminiImage(image_bytes=img_bytes, mime_type=mime_type) | |
| app.logger.info(f"Successfully created GeminiImage object for Veo. MIME type: {mime_type}") | |
| except Exception as e: | |
| app.logger.error(f"Failed to read or convert image {image_path} for Veo: {e}") | |
| return image_path | |
| try: | |
| # 2. 呼叫 Veo API 生成影片 | |
| operation = client.models.generate_videos( | |
| model="veo-2.0-generate-001", | |
| prompt=prompt, | |
| image=image_for_veo, | |
| config=types.GenerateVideosConfig( | |
| person_generation="dont_allow", | |
| aspect_ratio="16:9", | |
| number_of_videos=1 | |
| ), | |
| ) | |
| app.logger.info(f"Video generation initiated. Operation name: {operation.name}") | |
| # 3. 等待影片生成完成 | |
| while not operation.done: | |
| app.logger.info(f"Waiting for video generation... Operation: {operation.name}") | |
| time.sleep(20) | |
| operation = client.operations.get(operation) | |
| app.logger.info(f"Video generation operation completed. Done: {operation.done}") | |
| # 4. 處理並儲存生成的影片 | |
| if operation.error: | |
| app.logger.error(f"Video generation failed with API error: {operation.error}") | |
| return image_path | |
| if operation.response and operation.response.generated_videos: | |
| image_dir = os.path.join(os.getcwd(), "images") | |
| os.makedirs(image_dir, exist_ok=True) | |
| for n, generated_video_part in enumerate(operation.response.generated_videos): | |
| if hasattr(generated_video_part, 'video') and generated_video_part.video: | |
| video_data = generated_video_part.video | |
| base, _ = os.path.splitext(os.path.basename(image_path)) | |
| fname = f"{base}_veo{n}.mp4" | |
| save_path = os.path.join(image_dir, fname) | |
| try: | |
| # 根據官方範例,先下載再儲存 | |
| client.files.download(file=video_data) | |
| video_data.save(save_path) | |
| # 生成完整的網址並 LOG 印出 | |
| video_filename = os.path.basename(save_path) | |
| full_video_url = f"https://{base_url}/images/{video_filename}" | |
| app.logger.info(f"✅ Video successfully saved to: {save_path}") | |
| app.logger.info(f"🔗 Full video URL for testing: {full_video_url}") | |
| app.logger.info(f"📁 Video filename: {video_filename}") | |
| return save_path | |
| except Exception as e: | |
| app.logger.error(f"Error saving video {fname}: {e}") | |
| else: | |
| app.logger.warning(f"Generated video part {n} does not contain valid video data.") | |
| app.logger.warning("Processed all video parts but none were successfully saved.") | |
| return image_path | |
| else: | |
| app.logger.warning("Video generation operation completed, but no videos found in the response.") | |
| return image_path | |
| except Exception as e: | |
| app.logger.error(f"An unexpected error occurred during video generation: {e}") | |
| return image_path | |
| return image_path | |
| # ... (如果你的應用程式是透過 gunicorn 執行,則不需要 if __name__ == "__main__":) | |
| # if __name__ == "__main__": | |
| # app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) |