AI漫剧API自动化:用Python脚本实现批量生成与自动化工作流
详细讲解如何使用Python脚本调用AI绘画API和ComfyUI API实现AI漫剧的批量生成与自动化工作流,涵盖API调用、批量脚本、自动化流程编排等实战内容。
AI漫剧工作室2026-04-0521 分钟阅读
一、API自动化概述
为什么需要API自动化
AI漫剧的制作涉及大量重复性工作:剧本生成、分镜描述、画面生成、配音合成等。手动操作这些流程不仅效率低下,而且容易出错。通过API自动化,可以将这些环节串联起来,实现从剧本到成片的自动化生产。
API自动化的核心价值:
- 效率提升:批量生成可以将制作效率提升10-50倍
- 一致性保障:自动化流程确保每集的制作标准一致
- 成本降低:减少人工干预,降低人力成本
- 规模化生产:支持日更甚至多更的发布节奏
- 质量稳定:减少人为错误,保证输出质量
自动化工作流架构
AI漫剧自动化工作流架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 剧本生成 │ → │ 分镜拆分 │ → │ Prompt生成 │
│ (LLM API) │ │ (脚本解析) │ │ (LLM API) │
└─────────────┘ └─────────────┘ └─────────────┘
↓
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 后期合成 │ ← │ 配音合成 │ ← │ 画面生成 │
│ (FFmpeg) │ │ (TTS API) │ │ (SD API) │
└─────────────┘ └─────────────┘ └─────────────┘
二、ComfyUI API详解
2.1 ComfyUI API基础
ComfyUI提供了完善的API接口,允许通过HTTP请求提交工作流并获取生成结果。
API端点:
| 端点 | 方法 | 说明 |
|---|---|---|
| /prompt | POST | 提交生成任务 |
| /interrupt | POST | 中断当前任务 |
| /history | GET | 获取历史记录 |
| /view | GET | 查看生成的图片 |
| /upload/image | POST | 上传图片 |
| /queue | GET | 查看队列状态 |
2.2 获取工作流API格式
ComfyUI的工作流有两种格式:前端格式(JSON)和API格式(JSON)。API调用需要使用API格式。
转换方法:
- 在ComfyUI界面中构建好工作流
- 点击菜单 → Save (API Format)
- 保存的JSON文件即为API格式
API格式与前端格式的区别:
// 前端格式(包含UI信息)
{
"3": {
"class_type": "KSampler",
"inputs": {
"seed": 156680208700286,
"steps": 20,
"cfg": 8,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1,
"model": ["4", 0],
"positive": ["6", 0],
"negative": ["7", 0],
"latent_image": ["5", 0]
}
}
}
// API格式(节点ID为数字字符串,引用格式为 ["node_id", output_index])
// 结构相同,但保存方式不同
2.3 Python调用ComfyUI API
以下是一个完整的ComfyUI API调用示例:
"""
ComfyUI API 调用基础框架
支持文生图、图生图等基本功能
"""
import json
import urllib.request
import urllib.parse
import time
import random
import os
from typing import Optional
class ComfyUIClient:
"""ComfyUI API客户端"""
def __init__(self, server_address: str = "127.0.0.1:8188"):
self.server_address = server_address
def queue_prompt(self, prompt: dict) -> str:
"""提交生成任务到队列"""
data = json.dumps({"prompt": prompt}).encode('utf-8')
req = urllib.request.Request(
f"http://{self.server_address}/prompt",
data=data,
headers={"Content-Type": "application/json"}
)
response = urllib.request.urlopen(req)
return json.loads(response.read())['prompt_id']
def get_history(self, prompt_id: str) -> dict:
"""获取任务执行历史"""
req = urllib.request.Request(
f"http://{self.server_address}/history/{prompt_id}"
)
response = urllib.request.urlopen(req)
return json.loads(response.read())
def get_image(self, filename: str, subfolder: str, folder_type: str) -> bytes:
"""获取生成的图片"""
params = urllib.parse.urlencode({
"filename": filename,
"subfolder": subfolder,
"type": folder_type
})
req = urllib.request.Request(
f"http://{self.server_address}/view?{params}"
)
response = urllib.request.urlopen(req)
return response.read()
def wait_for_completion(self, prompt_id: str, check_interval: float = 1.0,
timeout: float = 300) -> dict:
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < timeout:
history = self.get_history(prompt_id)
if prompt_id in history:
return history[prompt_id]
time.sleep(check_interval)
raise TimeoutError(f"任务超时: {prompt_id}")
def upload_image(self, image_path: str, image_name: Optional[str] = None,
overwrite: bool = True) -> dict:
"""上传图片到ComfyUI"""
if image_name is None:
image_name = os.path.basename(image_path)
with open(image_path, 'rb') as f:
body = f.read()
multipart_form_data = (
b'--boundary\r\n'
b'Content-Disposition: form-data; name="image"; filename="'
+ image_name.encode() + b'"\r\n'
b'Content-Type: image/png\r\n\r\n'
+ body + b'\r\n'
b'--boundary\r\n'
b'Content-Disposition: form-data; name="overwrite"\r\n\r\n'
+ (b"true" if overwrite else b"false") + b'\r\n'
b'--boundary--\r\n'
)
req = urllib.request.Request(
f"http://{self.server_address}/upload/image",
data=multipart_form_data,
headers={"Content-Type": "multipart/form-data; boundary=boundary"}
)
response = urllib.request.urlopen(req)
return json.loads(response.read())
def generate_and_save(self, workflow: dict, output_dir: str = "output",
prefix: str = "generated") -> list:
"""生成图片并保存到本地"""
prompt_id = self.queue_prompt(workflow)
print(f"任务已提交: {prompt_id}")
# 等待完成
history = self.wait_for_completion(prompt_id)
print("生成完成")
# 获取输出图片
outputs = history.get('outputs', {})
saved_files = []
for node_id, node_output in outputs.items():
if 'images' in node_output:
for img_info in node_output['images']:
image_data = self.get_image(
img_info['filename'],
img_info.get('subfolder', ''),
img_info.get('type', 'output')
)
# 保存图片
os.makedirs(output_dir, exist_ok=True)
filename = f"{prefix}_{img_info['filename']}"
filepath = os.path.join(output_dir, filename)
with open(filepath, 'wb') as f:
f.write(image_data)
saved_files.append(filepath)
print(f"已保存: {filepath}")
return saved_files
# 使用示例
if __name__ == "__main__":
client = ComfyUIClient("127.0.0.1:8188")
# 加载API格式的工作流
with open("workflow_api.json", 'r') as f:
workflow = json.load(f)
# 修改随机种子
workflow["3"]["inputs"]["seed"] = random.randint(0, 2**32 - 1)
# 生成并保存
saved = client.generate_and_save(workflow, output_dir="output", prefix="test")
print(f"生成了 {len(saved)} 张图片")
三、批量画面生成脚本
3.1 从剧本到分镜的自动化
以下脚本实现了从剧本文本到分镜Prompt的自动转换:
"""
AI漫剧批量生成脚本 - 剧本到分镜
自动将剧本文本拆分为分镜,并生成对应的Prompt
"""
import json
import re
from dataclasses import dataclass, asdict
from typing import List, Optional
from openai import OpenAI
@dataclass
class Storyboard:
"""分镜数据结构"""
episode: int # 集数
scene: int # 场景编号
shot_type: str # 镜头类型(特写/中景/远景等)
description: str # 场景描述
character_action: str # 角色动作
dialogue: str # 对话内容
emotion: str # 情绪/氛围
prompt_positive: str # 正向Prompt
prompt_negative: str # 反向Prompt
camera_angle: str # 镜头角度
lighting: str # 光线描述
class ScriptToStoryboard:
"""剧本转分镜处理器"""
def __init__(self, api_key: str, base_url: str = None):
self.client = OpenAI(
api_key=api_key,
base_url=base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
)
self.model = "qwen-max"
def parse_script(self, script_text: str, episode: int = 1) -> List[Storyboard]:
"""将剧本文本解析为分镜列表"""
prompt = f"""你是一个专业的AI漫剧分镜师。请将以下剧本拆分为详细的分镜。
要求:
1. 每个分镜包含:镜头类型、场景描述、角色动作、对话、情绪、镜头角度、光线
2. 为每个分镜生成AI绘画用的正向Prompt(英文,包含构图、人物、场景、风格描述)
3. 生成统一的反向Prompt
4. 镜头类型包括:特写(CU)、中景(MS)、远景(WS)、全景(FS)、俯拍、仰拍
5. Prompt中包含质量标签:masterpiece, best quality, highly detailed
剧本:
{script_text}
请以JSON数组格式输出,每个元素包含以下字段:
episode, scene, shot_type, description, character_action, dialogue, emotion,
prompt_positive, prompt_negative, camera_angle, lighting"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=4000
)
content = response.choices[0].message.content
# 提取JSON
json_match = re.search(r'\[.*\]', content, re.DOTALL)
if json_match:
storyboards = json.loads(json_match.group())
return [Storyboard(**sb) for sb in storyboards]
else:
raise ValueError("无法从LLM输出中提取分镜数据")
def save_storyboards(self, storyboards: List[Storyboard], output_file: str):
"""保存分镜数据"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump([asdict(sb) for sb in storyboards], f,
ensure_ascii=False, indent=2)
print(f"分镜数据已保存到: {output_file}")
# 使用示例
if __name__ == "__main__":
converter = ScriptToStoryboard(api_key="your_api_key")
script = """
第一集:命运的邂逅
场景1:清晨的城市街道,阳光透过树叶洒下斑驳光影。
女主角林晓背着书包走在上学路上,耳机里播放着音乐。
突然,一个男生从转角冲出来,撞到了她,书本散落一地。
场景2:男生连忙道歉,蹲下帮忙捡书。
两人四目相对,时间仿佛静止。
男生自我介绍:"你好,我是陈默,刚搬到这附近。"
"""
storyboards = converter.parse_script(script, episode=1)
converter.save_storyboards(storyboards, "storyboards_ep01.json")
for sb in storyboards:
print(f"\n场景 {sb.scene}: {sb.shot_type}")
print(f"描述: {sb.description}")
print(f"Prompt: {sb.prompt_positive[:100]}...")
3.2 批量图片生成脚本
"""
AI漫剧批量图片生成脚本
读取分镜数据,调用ComfyUI API批量生成图片
"""
import json
import time
import os
from comfyui_client import ComfyUIClient
from storyboard_parser import Storyboard
class BatchImageGenerator:
"""批量图片生成器"""
def __init__(self, comfyui_url: str = "127.0.0.1:8188"):
self.client = ComfyUIClient(comfyui_url)
def load_workflow_template(self, template_path: str) -> dict:
"""加载工作流模板"""
with open(template_path, 'r') as f:
return json.load(f)
def create_workflow_from_storyboard(self, template: dict,
storyboard: Storyboard) -> dict:
"""根据分镜数据创建工作流"""
import copy
workflow = copy.deepcopy(template)
# 设置正向Prompt
workflow["6"]["inputs"]["text"] = storyboard.prompt_positive
# 设置反向Prompt
workflow["7"]["inputs"]["text"] = storyboard.prompt_negative
# 设置随机种子
workflow["3"]["inputs"]["seed"] = int(time.time() * 1000) % (2**32)
return workflow
def generate_episode(self, storyboards: list, template_path: str,
output_dir: str, max_retries: int = 3) -> list:
"""批量生成一集的所有分镜图片"""
template = self.load_workflow_template(template_path)
results = []
total = len(storyboards)
for i, sb in enumerate(storyboards):
print(f"\n[{i+1}/{total}] 生成场景 {sb.scene}...")
workflow = self.create_workflow_from_storyboard(template, sb)
for attempt in range(max_retries):
try:
saved = self.client.generate_and_save(
workflow,
output_dir=output_dir,
prefix=f"ep{sb.episode:02d}_sc{sb.scene:02d}"
)
results.extend(saved)
print(f" 成功生成 {len(saved)} 张图片")
break
except Exception as e:
print(f" 第{attempt+1}次尝试失败: {e}")
if attempt == max_retries - 1:
print(f" 场景 {sb.scene} 生成失败,跳过")
time.sleep(2)
return results
def generate_with_retry(self, workflow: dict, output_dir: str,
prefix: str, target_count: int = 4,
quality_threshold: float = 0.8) -> list:
"""生成多张候选图并选择最佳"""
saved = []
for i in range(target_count):
workflow_copy = json.loads(json.dumps(workflow))
# 修改种子以生成不同结果
import random
workflow_copy["3"]["inputs"]["seed"] = random.randint(0, 2**32 - 1)
try:
files = self.client.generate_and_save(
workflow_copy, output_dir, f"{prefix}_v{i}"
)
saved.extend(files)
except Exception as e:
print(f" 变体{i}生成失败: {e}")
return saved
# 使用示例
if __name__ == "__main__":
# 加载分镜数据
with open("storyboards_ep01.json", 'r', encoding='utf-8') as f:
storyboards_data = json.load(f)
storyboards = [Storyboard(**sb) for sb in storyboards_data]
# 批量生成
generator = BatchImageGenerator("127.0.0.1:8188")
results = generator.generate_episode(
storyboards=storyboards,
template_path="workflow_api.json",
output_dir="output/episode01"
)
print(f"\n生成完成,共 {len(results)} 张图片")
四、自动化配音脚本
4.1 TTS API集成
"""
AI漫剧自动化配音脚本
支持多种TTS引擎:Edge TTS、阿里云TTS、腾讯云TTS
"""
import asyncio
import json
import os
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class DialogueLine:
"""对话行"""
character: str # 角色名
text: str # 对话文本
voice_id: str # 语音ID
emotion: str # 情绪
output_file: str # 输出文件路径
class EdgeTTSEngine:
"""Edge TTS引擎(免费)"""
def __init__(self):
self.voice_map = {
"female_young": "zh-CN-XiaoxiaoNeural",
"female_mature": "zh-CN-XiaoyiNeural",
"male_young": "zh-CN-YunxiNeural",
"male_mature": "zh-CN-YunjianNeural",
"narrator": "zh-CN-YunyangNeural"
}
async def synthesize(self, text: str, voice_id: str,
output_file: str, rate: str = "+0%",
pitch: str = "+0Hz") -> bool:
"""合成语音"""
import edge_tts
communicate = edge_tts.Communicate(
text=text,
voice=voice_id,
rate=rate,
pitch=pitch
)
await communicate.save(output_file)
return os.path.exists(output_file)
async def batch_synthesize(self, dialogues: List[DialogueLine],
output_dir: str) -> List[str]:
"""批量合成语音"""
os.makedirs(output_dir, exist_ok=True)
results = []
for i, dialogue in enumerate(dialogues):
voice = self.voice_map.get(dialogue.voice_id, dialogue.voice_id)
output_file = os.path.join(output_dir, dialogue.output_file)
# 根据情绪调整语速和音调
rate, pitch = self._get_emotion_params(dialogue.emotion)
try:
await self.synthesize(
dialogue.text, voice, output_file, rate, pitch
)
results.append(output_file)
print(f"[{i+1}/{len(dialogues)}] {dialogue.character}: "
f"{dialogue.text[:30]}... -> {output_file}")
except Exception as e:
print(f"[{i+1}/{len(dialogues)}] 合成失败: {e}")
return results
def _get_emotion_params(self, emotion: str) -> tuple:
"""根据情绪获取语速和音调参数"""
params = {
"happy": ("+10%", "+2Hz"),
"sad": ("-10%", "-2Hz"),
"angry": ("+15%", "+3Hz"),
"surprised": ("+5%", "+5Hz"),
"fearful": ("-5%", "-1Hz"),
"calm": ("-5%", "0Hz"),
"excited": ("+15%", "+3Hz"),
}
return params.get(emotion.lower(), ("+0%", "+0Hz"))
class AliyunTTSEngine:
"""阿里云TTS引擎(付费,质量更高)"""
def __init__(self, api_key: str, app_key: str):
self.api_key = api_key
self.app_key = app_key
self.api_url = "https://nls-gateway-cn-shanghai.aliyuncs.com/stream/v1/tts"
def synthesize(self, text: str, voice: str = "zhiyan_emo",
output_file: str = "output.wav") -> bool:
"""调用阿里云TTS API"""
import requests
params = {
"appkey": self.app_key,
"token": self.api_key,
"text": text,
"voice": voice,
"format": "wav",
"sample_rate": 16000
}
response = requests.post(self.api_url, json=params)
if response.status_code == 200:
with open(output_file, 'wb') as f:
f.write(response.content)
return True
return False
# 使用示例
async def main():
# 使用Edge TTS(免费)
engine = EdgeTTSEngine()
dialogues = [
DialogueLine(
character="林晓",
text="你是谁?为什么撞我?",
voice_id="female_young",
emotion="angry",
output_file="line_001_linxiao.wav"
),
DialogueLine(
character="陈默",
text="对不起对不起!我太着急了,没看到你。",
voice_id="male_young",
emotion="surprised",
output_file="line_002_chenmo.wav"
),
DialogueLine(
character="旁白",
text="命运的齿轮,就这样开始转动。",
voice_id="narrator",
emotion="calm",
output_file="line_003_narrator.wav"
),
]
results = await engine.batch_synthesize(dialogues, "output/audio")
print(f"\n配音完成,共 {len(results)} 个文件")
if __name__ == "__main__":
asyncio.run(main())
五、完整自动化流水线
5.1 端到端自动化脚本
"""
AI漫剧端到端自动化流水线
从剧本文本到最终视频的完整自动化流程
"""
import json
import os
import subprocess
import asyncio
from dataclasses import dataclass
from typing import List
from script_to_storyboard import ScriptToStoryboard
from batch_generator import BatchImageGenerator
from tts_engine import EdgeTTSEngine, DialogueLine
@dataclass
class PipelineConfig:
"""流水线配置"""
project_name: str
script_file: str
output_dir: str
comfyui_url: str
workflow_template: str
llm_api_key: str
fps: int = 24
resolution: tuple = (1080, 1920) # 竖版
class AIComicPipeline:
"""AI漫剧自动化流水线"""
def __init__(self, config: PipelineConfig):
self.config = config
self.output_dir = config.output_dir
self.setup_directories()
def setup_directories(self):
"""创建输出目录结构"""
dirs = [
self.output_dir,
f"{self.output_dir}/images",
f"{self.output_dir}/audio",
f"{self.output_dir}/video",
f"{self.output_dir}/final",
f"{self.output_dir}/metadata"
]
for d in dirs:
os.makedirs(d, exist_ok=True)
def step1_generate_storyboards(self) -> list:
"""步骤1:剧本转分镜"""
print("=" * 50)
print("步骤1:剧本转分镜")
print("=" * 50)
with open(self.config.script_file, 'r', encoding='utf-8') as f:
script = f.read()
converter = ScriptToStoryboard(api_key=self.config.llm_api_key)
storyboards = converter.parse_script(script)
# 保存分镜数据
sb_file = f"{self.output_dir}/metadata/storyboards.json"
converter.save_storyboards(storyboards, sb_file)
print(f"生成了 {len(storyboards)} 个分镜")
return storyboards
def step2_generate_images(self, storyboards: list) -> list:
"""步骤2:批量生成画面"""
print("\n" + "=" * 50)
print("步骤2:批量生成画面")
print("=" * 50)
generator = BatchImageGenerator(self.config.comfyui_url)
results = generator.generate_episode(
storyboards=storyboards,
template_path=self.config.workflow_template,
output_dir=f"{self.output_dir}/images"
)
print(f"生成了 {len(results)} 张图片")
return results
async def step3_generate_audio(self, storyboards: list) -> list:
"""步骤3:批量生成配音"""
print("\n" + "=" * 50)
print("步骤3:批量生成配音")
print("=" * 50)
engine = EdgeTTSEngine()
dialogues = []
for i, sb in enumerate(storyboards):
if sb.dialogue:
dialogues.append(DialogueLine(
character="narrator",
text=sb.dialogue,
voice_id="narrator",
emotion=sb.emotion,
output_file=f"line_{i:03d}.wav"
))
results = await engine.batch_synthesize(
dialogues, f"{self.output_dir}/audio"
)
print(f"生成了 {len(results)} 个配音文件")
return results
def step4_compose_video(self, storyboards: list) -> str:
"""步骤4:合成视频"""
print("\n" + "=" * 50)
print("步骤4:合成视频")
print("=" * 50)
# 使用FFmpeg合成视频
# 这里简化处理,实际项目中需要更复杂的合成逻辑
output_file = f"{self.output_dir}/final/{self.config.project_name}.mp4"
# 创建图片序列文件列表
images_dir = f"{self.output_dir}/images"
image_files = sorted([
f for f in os.listdir(images_dir) if f.endswith('.png')
])
with open(f"{self.output_dir}/metadata/filelist.txt", 'w') as f:
for img in image_files:
duration = 3 # 每张图显示3秒
f.write(f"file '{images_dir}/{img}'\n")
f.write(f"duration {duration}\n")
# FFmpeg需要最后一行重复
if image_files:
f.write(f"file '{images_dir}/{image_files[-1]}'\n")
# FFmpeg命令
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", f"{self.output_dir}/metadata/filelist.txt",
"-vf", f"scale={self.config.resolution[0]}:{self.config.resolution[1]}",
"-c:v", "libx264", "-pix_fmt", "yuv420p",
"-r", str(self.config.fps),
output_file
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"视频已生成: {output_file}")
return output_file
else:
print(f"视频生成失败: {result.stderr}")
return ""
def run(self):
"""运行完整流水线"""
print(f"\n{'='*50}")
print(f"AI漫剧自动化流水线: {self.config.project_name}")
print(f"{'='*50}\n")
# 步骤1:剧本转分镜
storyboards = self.step1_generate_storyboards()
# 步骤2:批量生成画面
images = self.step2_generate_images(storyboards)
# 步骤3:批量生成配音
asyncio.run(self.step3_generate_audio(storyboards))
# 步骤4:合成视频
video = self.step4_compose_video(storyboards)
print(f"\n{'='*50}")
print("流水线执行完成!")
print(f"{'='*50}")
print(f"分镜数: {len(storyboards)}")
print(f"图片数: {len(images)}")
print(f"视频: {video}")
# 使用示例
if __name__ == "__main__":
config = PipelineConfig(
project_name="my_comic_ep01",
script_file="scripts/episode01.txt",
output_dir="output/project01",
comfyui_url="127.0.0.1:8188",
workflow_template="workflows/comic_workflow_api.json",
llm_api_key="your_api_key"
)
pipeline = AIComicPipeline(config)
pipeline.run()
5.2 任务队列管理
"""
AI漫剧任务队列管理器
支持多任务并发、断点续传、错误重试
"""
import json
import time
import os
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Task:
"""任务"""
id: str
type: str # storyboard/image/audio/video
params: dict
status: TaskStatus = TaskStatus.PENDING
result: Optional[dict] = None
error: Optional[str] = None
retries: int = 0
max_retries: int = 3
created_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
class TaskQueue:
"""任务队列管理器"""
def __init__(self, state_file: str = "task_queue.json"):
self.state_file = state_file
self.tasks: List[Task] = []
self._load_state()
def _load_state(self):
"""加载任务状态"""
if os.path.exists(self.state_file):
with open(self.state_file, 'r') as f:
data = json.load(f)
self.tasks = [
Task(
id=t['id'],
type=t['type'],
params=t['params'],
status=TaskStatus(t['status']),
result=t.get('result'),
error=t.get('error'),
retries=t.get('retries', 0),
max_retries=t.get('max_retries', 3),
created_at=t.get('created_at', time.time()),
completed_at=t.get('completed_at')
)
for t in data['tasks']
]
def _save_state(self):
"""保存任务状态"""
data = {
'tasks': [
{
'id': t.id,
'type': t.type,
'params': t.params,
'status': t.status.value,
'result': t.result,
'error': t.error,
'retries': t.retries,
'max_retries': t.max_retries,
'created_at': t.created_at,
'completed_at': t.completed_at
}
for t in self.tasks
]
}
with open(self.state_file, 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def add_task(self, task_type: str, params: dict, task_id: str = None) -> Task:
"""添加任务"""
if task_id is None:
task_id = f"{task_type}_{int(time.time())}"
task = Task(id=task_id, type=task_type, params=params)
self.tasks.append(task)
self._save_state()
return task
def get_pending_tasks(self) -> List[Task]:
"""获取待执行的任务"""
return [t for t in self.tasks if t.status == TaskStatus.PENDING]
def get_progress(self) -> dict:
"""获取进度"""
total = len(self.tasks)
completed = sum(1 for t in self.tasks if t.status == TaskStatus.COMPLETED)
failed = sum(1 for t in self.tasks if t.status == TaskStatus.FAILED)
running = sum(1 for t in self.tasks if t.status == TaskStatus.RUNNING)
pending = sum(1 for t in self.tasks if t.status == TaskStatus.PENDING)
return {
'total': total,
'completed': completed,
'failed': failed,
'running': running,
'pending': pending,
'progress': completed / total * 100 if total > 0 else 0
}
六、最佳实践与注意事项
6.1 错误处理策略
# 错误处理最佳实践
import time
from functools import wraps
def retry(max_retries=3, delay=1, backoff=2):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = delay * (backoff ** attempt)
print(f" 重试 {attempt+1}/{max_retries}, "
f"等待 {wait_time}s: {e}")
time.sleep(wait_time)
return wrapper
return decorator
# 使用示例
@retry(max_retries=3, delay=2, backoff=2)
def generate_image(workflow):
"""带重试的图片生成"""
client = ComfyUIClient()
return client.generate_and_save(workflow)
6.2 性能优化建议
| 优化方向 | 具体方法 | 预期提升 |
|---|---|---|
| 并发请求 | 使用异步IO或多线程 | 2-5倍 |
| 批量处理 | 减少API调用次数 | 1.5-3倍 |
| 缓存机制 | 缓存重复的Prompt结果 | 20-50% |
| 模型预加载 | 避免重复加载模型 | 30%+ |
| 分辨率优化 | 先低分辨率预览再高清生成 | 2-3倍 |
6.3 安全注意事项
[!WARNING]
- 不要将API Key硬编码在脚本中,使用环境变量或配置文件
- 定期轮换API Key
- 设置API调用限额,防止意外超支
- 备份重要的工作流和配置文件
- 使用版本控制管理脚本代码
# 安全的API Key管理
import os
from dotenv import load_dotenv
load_dotenv() # 从.env文件加载环境变量
API_KEY = os.getenv("LLM_API_KEY")
COMFYUI_URL = os.getenv("COMFYUI_URL", "127.0.0.1:8188")
七、总结
API自动化核心要点
- ComfyUI API是基础:掌握ComfyUI的API调用是实现自动化的第一步
- 分层数据结构:剧本 → 分镜 → Prompt → 图片,清晰的数据流是自动化的骨架
- 错误处理至关重要:网络请求、API调用都可能出现错误,完善的错误处理和重试机制是必须的
- 模块化设计:将不同功能(剧本解析、图片生成、配音合成)封装为独立模块
- 渐进式自动化:先自动化单个环节,再串联完整流程
[!TIP] 不要一开始就追求完全自动化。建议先手动完成整个流程,理解每个环节的细节和可能的问题,然后逐步将各环节自动化。先实现最耗时环节的自动化(如图片批量生成),再逐步扩展到其他环节。自动化程度应该根据实际需求来决定,过度自动化可能增加维护成本。
相关教程

高级工具与工作流
ComfyUI工作流详解:AI漫剧专业级画面生成方案
深入讲解ComfyUI在AI漫剧创作中的应用,包括安装配置、核心节点、角色一致性工作流、批量生成工作流,以及常用工作流模板推荐。
19分钟2026-05-02

进阶工具与工作流
AI漫剧高效工作流搭建:从脚本到成片的标准化流水线
手把手教你搭建一套高效的AI漫剧创作工作流,包括极简方案、进阶方案和专业方案三套流水线,以及资产管理和复用方法。
21分钟2026-05-01

精选
入门工具与工作流
AI漫剧工具全景对比:2026年最全的AI漫剧创作工具评测
全面对比2026年AI漫剧创作领域的所有主流工具,按创作环节分类评测,帮你找到最适合自己需求和工作流的最优工具组合。
15分钟2026-04-30