Group-manga_generate/backend/utils.py
2026-01-08 20:51:26 +08:00

120 lines
3.8 KiB
Python

import os
import json
import time
from openai import OpenAI
import asyncio
try:
from backend.agent import analyze_characters_with_agent, generate_single_prompt_with_agent
except ImportError:
from agent import analyze_characters_with_agent, generate_single_prompt_with_agent
def split_text_into_paragraphs(text):
"""
Splits the novel text into paragraphs suitable for manga panels.
Use double newline as separator.
"""
if not text:
return []
return [p.strip() for p in text.split('\n\n') if p.strip()]
def analyze_characters(text, api_key, base_url=None, model="gpt-4o"):
"""
Extracts character descriptions using PydanticAI Agent.
"""
try:
# Run async agent synchronously
return asyncio.run(analyze_characters_with_agent(text, api_key, base_url, model))
except Exception as e:
print(f"Error analyzing characters: {e}")
return ""
async def generate_prompts_async(paragraphs, api_key, base_url, model):
full_text = "\n".join(paragraphs)
character_context = await analyze_characters_with_agent(full_text, api_key, base_url, model)
tasks = []
for p in paragraphs:
tasks.append(generate_single_prompt_with_agent(p, character_context, api_key, base_url, model))
results = await asyncio.gather(*tasks)
prompts = []
for p, res in zip(paragraphs, results):
prompts.append({"paragraph": p, "prompt": res})
return prompts
def generate_prompts(paragraphs, api_key=None, base_url=None, model="gpt-4o"):
"""
Generates manga prompts for each paragraph using PydanticAI Agents.
"""
if not api_key:
print("Warning: No API key provided for prompt generation.")
return [{"paragraph": p, "prompt": f"Manga style panel showing: {p} (Mock generated)"} for p in paragraphs]
try:
return asyncio.run(generate_prompts_async(paragraphs, api_key, base_url, model))
except Exception as e:
print(f"Error generating prompts: {e}")
# Fallback
return [{"paragraph": p, "prompt": f"Error: {str(e)}"} for p in paragraphs]
def generate_image_from_prompt(prompt, api_key=None, base_url=None, model="dall-e-3", size="1024x1024"):
"""
Generates an image from a prompt using OpenAI API.
"""
if not api_key:
print("Warning: No API key for image generation.")
# Return mock URL
return "https://via.placeholder.com/1024x1024.png?text=Mock+Image"
client = OpenAI(api_key=api_key, base_url=base_url)
try:
# Note: If using a non-OpenAI provider that is compatible,
# parameters might need adjustment (e.g. size/quality).
response = client.images.generate(
model=model,
prompt=prompt,
size=size,
quality="standard",
n=1,
)
return response.data[0].url
except Exception as e:
print(f"Error generating image: {e}")
return None
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data')
HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
def ensure_data_dir():
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
def save_to_history(item):
"""
Saves a generation item to history.json.
Item should be a dict (e.g., {prompt, image_url, timestamp}).
"""
ensure_data_dir()
history = load_history()
item['timestamp'] = item.get('timestamp', time.time())
history.append(item)
with open(HISTORY_FILE, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
def load_history():
"""
Loads history from history.json.
"""
ensure_data_dir()
if not os.path.exists(HISTORY_FILE):
return []
try:
with open(HISTORY_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return []