sculinebot2025 / image_examples.py
huchiahsi's picture
9c0afa9
# ===東吳大學資料系 2025 年 LINEBOT ===
import logging
import os
import uuid
from io import BytesIO
from flask import Flask, abort, request, send_from_directory
from google import genai
from google.genai import types
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch
from google.genai.types import Image as GeminiImage
from google.genai import errors as genai_errors # 確保此行存在
from linebot.v3 import WebhookHandler
from linebot.v3.exceptions import InvalidSignatureError
from linebot.v3.messaging import (
ApiClient,
Configuration,
FlexMessage,
ImageMessage,
MessagingApi,
MessagingApiBlob,
ReplyMessageRequest,
TextMessage,
VideoMessage
)
from linebot.v3.webhooks import (
ImageMessageContent,
MessageEvent,
PostbackEvent,
)
from PIL import Image
# === 初始化 Google Gemini ===
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
client = genai.Client(api_key=GOOGLE_API_KEY)
google_search_tool = Tool(
google_search=GoogleSearch()
)
chat = client.chats.create(
model="gemini-2.0-flash",
config=GenerateContentConfig(
system_instruction="你是一個中文的AI助手,請用繁體中文回答",
tools=[google_search_tool],
response_modalities=["TEXT"],
)
)
# === 初始設定 ===
base_url = os.getenv("SPACE_HOST") # e.g., "your-space-name.hf.space"
# === Flask 應用初始化 ===
app = Flask(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
app.logger.setLevel(logging.INFO)
channel_secret = os.environ.get("YOUR_CHANNEL_SECRET")
channel_access_token = os.environ.get("YOUR_CHANNEL_ACCESS_TOKEN")
configuration = Configuration(access_token=channel_access_token)
handler = WebhookHandler(channel_secret)
# === 靜態圖檔路由 ===
@app.route("/images/<filename>")
def serve_image(filename):
image_dir = os.path.join(os.getcwd(), "images")
return send_from_directory(image_dir, filename)
# === LINE Webhook 接收端點 ===
@app.route("/")
def home():
return {"message": "Line Webhook Server"}
@app.route("/", methods=["POST"])
def callback():
signature = request.headers.get("X-Line-Signature")
body = request.get_data(as_text=True)
app.logger.info(f"Request body: {body}")
try:
handler.handle(body, signature)
except InvalidSignatureError:
app.logger.warning("Invalid signature. Please check channel credentials.")
abort(400)
return "OK"
# === 處理圖片訊息 ===
@handler.add(MessageEvent, message=ImageMessageContent)
def handle_image_message(event):
# 下載圖片
with ApiClient(configuration) as api_client:
blob_api = MessagingApiBlob(api_client)
content = blob_api.get_message_content(message_id=event.message.id)
image_dir = os.path.join(os.getcwd(), "images")
os.makedirs(image_dir, exist_ok=True)
# 產生唯一檔名
filename = f"{uuid.uuid4().hex}.jpg"
image_path = os.path.join(image_dir, filename)
with open(image_path, "wb") as f:
f.write(content)
image_url = f"https://{base_url}/images/{filename}"
# 回傳 Flex Message 功能選單
flex_content = {
"type": "bubble",
"hero": {
"type": "image",
"url": image_url,
"size": "full",
"aspectRatio": "20:13",
"aspectMode": "cover"
},
"body": {
"type": "box",
"layout": "vertical",
"contents": [
{"type": "text", "text": "請選擇圖片處理功能", "weight": "bold", "size": "lg"}
]
},
"footer": {
"type": "box",
"layout": "vertical",
"spacing": "sm",
"contents": [
{
"type": "button",
"style": "primary",
"action": {
"type": "postback",
"label": "吉卜力風格圖片",
"data": f"ghibli,{filename}"
}
},
{
"type": "button",
"style": "primary",
"action": {
"type": "postback",
"label": "生成影片",
"data": f"veo,{filename}"
}
}
]
}
}
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[
FlexMessage.from_dict({
"type": "flex",
"altText": "請選擇圖片處理功能",
"contents": flex_content
})
]
)
)
# === 處理 Postback 事件 ===
@handler.add(PostbackEvent)
def handle_postback(event):
data = event.postback.data
if data.startswith("ghibli,"):
_, filename = data.split(",", 1)
image_dir = os.path.join(os.getcwd(), "images")
image_path = os.path.join(image_dir, filename)
# 這裡呼叫你的吉卜力風格轉換函式
ghibli_img_path = convert_to_ghibli_style(image_path)
ghibli_url = f"https://{base_url}/images/{os.path.basename(ghibli_img_path)}"
with ApiClient(configuration) as api_client:
MessagingApi(api_client).reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[
ImageMessage(
original_content_url=ghibli_url,
preview_image_url=ghibli_url
),
TextMessage(text="這是吉卜力風格圖片!")
]
)
)
elif data.startswith("veo,"):
_, original_filename = data.split(",", 1) # 這是原始圖片的檔名
image_dir = os.path.join(os.getcwd(), "images")
original_image_path = os.path.join(image_dir, original_filename)
# 呼叫 Veo 影片生成
# generate_video_from_image 應該回傳新影片的完整路徑
generated_video_path = generate_video_from_image(original_image_path)
if generated_video_path and generated_video_path != original_image_path:
# 影片成功生成
video_filename = os.path.basename(generated_video_path)
video_url = f"https://{base_url}/images/{video_filename}"
# 使用原始圖片作為預覽圖
preview_image_url = f"https://{base_url}/images/{original_filename}"
app.logger.info(f"Generated video URL: {video_url}")
app.logger.info(f"Preview image URL: {preview_image_url}")
with ApiClient(configuration) as api_client:
MessagingApi(api_client).reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[
TextMessage(text="影片已生成!"),
VideoMessage(
original_content_url=video_url,
preview_image_url=preview_image_url
)
]
)
)
else:
# 影片生成失敗或回傳的是原始圖片路徑
app.logger.error(f"Video generation failed or returned original image path: {generated_video_path}")
with ApiClient(configuration) as api_client:
MessagingApi(api_client).reply_message(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[
TextMessage(text="抱歉,影片生成失敗。")
]
)
)
def convert_to_ghibli_style(image_path):
from PIL import Image
import io
# 讀取圖片
with open(image_path, "rb") as f:
img_bytes = f.read()
image = Image.open(io.BytesIO(img_bytes))
prompt = "請將這張圖片轉換成吉卜力動畫風格,要和原圖高度相似,不要亂自己生成圖片只回傳圖片。"
response = client.models.generate_content(
model="gemini-2.0-flash-preview-image-generation",
contents=[image, prompt],
config=types.GenerateContentConfig(
response_modalities=["IMAGE", "TEXT"]
),
)
# 取得 Gemini 回傳的圖片
for part in response.candidates[0].content.parts:
if part.inline_data is not None:
out_img = Image.open(io.BytesIO(part.inline_data.data))
out_path = image_path.replace(".jpg", "_ghibli.jpg") # 假設原始圖片是 .jpg
# 如果原始圖片可能是其他格式,需要更通用的副檔名替換邏輯
# e.g., base, ext = os.path.splitext(image_path)
# out_path = f"{base}_ghibli{ext}"
out_img.save(out_path)
return out_path
app.logger.warning(f"Ghibli style conversion failed for {image_path}, no image data in response.")
return image_path # 若失敗則回傳原圖
def generate_video_from_image(image_path):
import time
import os
prompt = "請根據這張圖片生成一段有創意的短影片,內容需與圖片主題高度相關。"
app.logger.info(f"Starting video generation for: {image_path}")
# 1. 讀取本地圖片並轉換為 GeminiImage 物件
try:
with open(image_path, "rb") as img_file:
img_bytes = img_file.read()
# 簡單判斷 MIME type
mime_type = "image/jpeg"
if image_path.lower().endswith(".png"):
mime_type = "image/png"
elif image_path.lower().endswith(".webp"):
mime_type = "image/webp"
image_for_veo = GeminiImage(image_bytes=img_bytes, mime_type=mime_type)
app.logger.info(f"Successfully created GeminiImage object for Veo. MIME type: {mime_type}")
except Exception as e:
app.logger.error(f"Failed to read or convert image {image_path} for Veo: {e}")
return image_path
try:
# 2. 呼叫 Veo API 生成影片
operation = client.models.generate_videos(
model="veo-2.0-generate-001",
prompt=prompt,
image=image_for_veo,
config=types.GenerateVideosConfig(
person_generation="dont_allow",
aspect_ratio="16:9",
number_of_videos=1
),
)
app.logger.info(f"Video generation initiated. Operation name: {operation.name}")
# 3. 等待影片生成完成
while not operation.done:
app.logger.info(f"Waiting for video generation... Operation: {operation.name}")
time.sleep(20)
operation = client.operations.get(operation)
app.logger.info(f"Video generation operation completed. Done: {operation.done}")
# 4. 處理並儲存生成的影片
if operation.error:
app.logger.error(f"Video generation failed with API error: {operation.error}")
return image_path
if operation.response and operation.response.generated_videos:
image_dir = os.path.join(os.getcwd(), "images")
os.makedirs(image_dir, exist_ok=True)
for n, generated_video_part in enumerate(operation.response.generated_videos):
if hasattr(generated_video_part, 'video') and generated_video_part.video:
video_data = generated_video_part.video
base, _ = os.path.splitext(os.path.basename(image_path))
fname = f"{base}_veo{n}.mp4"
save_path = os.path.join(image_dir, fname)
try:
# 根據官方範例,先下載再儲存
client.files.download(file=video_data)
video_data.save(save_path)
# 生成完整的網址並 LOG 印出
video_filename = os.path.basename(save_path)
full_video_url = f"https://{base_url}/images/{video_filename}"
app.logger.info(f"✅ Video successfully saved to: {save_path}")
app.logger.info(f"🔗 Full video URL for testing: {full_video_url}")
app.logger.info(f"📁 Video filename: {video_filename}")
return save_path
except Exception as e:
app.logger.error(f"Error saving video {fname}: {e}")
else:
app.logger.warning(f"Generated video part {n} does not contain valid video data.")
app.logger.warning("Processed all video parts but none were successfully saved.")
return image_path
else:
app.logger.warning("Video generation operation completed, but no videos found in the response.")
return image_path
except Exception as e:
app.logger.error(f"An unexpected error occurred during video generation: {e}")
return image_path
return image_path
# ... (如果你的應用程式是透過 gunicorn 執行,則不需要 if __name__ == "__main__":)
# if __name__ == "__main__":
# app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))