AI漫剧API自动化:用Python脚本实现批量生成与自动化工作流

详细讲解如何使用Python脚本调用AI绘画API和ComfyUI API实现AI漫剧的批量生成与自动化工作流,涵盖API调用、批量脚本、自动化流程编排等实战内容。

AI漫剧工作室2026-04-0521 分钟阅读

一、API自动化概述

为什么需要API自动化

AI漫剧的制作涉及大量重复性工作:剧本生成、分镜描述、画面生成、配音合成等。手动操作这些流程不仅效率低下,而且容易出错。通过API自动化,可以将这些环节串联起来,实现从剧本到成片的自动化生产。

API自动化的核心价值:

  • 效率提升:批量生成可以将制作效率提升10-50倍
  • 一致性保障:自动化流程确保每集的制作标准一致
  • 成本降低:减少人工干预,降低人力成本
  • 规模化生产:支持日更甚至多更的发布节奏
  • 质量稳定:减少人为错误,保证输出质量

自动化工作流架构

AI漫剧自动化工作流架构

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  剧本生成    │ →  │  分镜拆分    │ →  │  Prompt生成  │
│  (LLM API)  │    │  (脚本解析)  │    │  (LLM API)  │
└─────────────┘    └─────────────┘    └─────────────┘
                                          ↓
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  后期合成    │ ←  │  配音合成    │ ←  │  画面生成    │
│  (FFmpeg)   │    │  (TTS API)  │    │  (SD API)   │
└─────────────┘    └─────────────┘    └─────────────┘

二、ComfyUI API详解

2.1 ComfyUI API基础

ComfyUI提供了完善的API接口,允许通过HTTP请求提交工作流并获取生成结果。

API端点:

端点方法说明
/promptPOST提交生成任务
/interruptPOST中断当前任务
/historyGET获取历史记录
/viewGET查看生成的图片
/upload/imagePOST上传图片
/queueGET查看队列状态

2.2 获取工作流API格式

ComfyUI的工作流有两种格式:前端格式(JSON)和API格式(JSON)。API调用需要使用API格式。

转换方法:

  1. 在ComfyUI界面中构建好工作流
  2. 点击菜单 → Save (API Format)
  3. 保存的JSON文件即为API格式

API格式与前端格式的区别:

// 前端格式(包含UI信息)
{
  "3": {
    "class_type": "KSampler",
    "inputs": {
      "seed": 156680208700286,
      "steps": 20,
      "cfg": 8,
      "sampler_name": "euler",
      "scheduler": "normal",
      "denoise": 1,
      "model": ["4", 0],
      "positive": ["6", 0],
      "negative": ["7", 0],
      "latent_image": ["5", 0]
    }
  }
}

// API格式(节点ID为数字字符串,引用格式为 ["node_id", output_index])
// 结构相同,但保存方式不同

2.3 Python调用ComfyUI API

以下是一个完整的ComfyUI API调用示例:

"""
ComfyUI API 调用基础框架
支持文生图、图生图等基本功能
"""

import json
import urllib.request
import urllib.parse
import time
import random
import os
from typing import Optional

class ComfyUIClient:
    """ComfyUI API客户端"""

    def __init__(self, server_address: str = "127.0.0.1:8188"):
        self.server_address = server_address

    def queue_prompt(self, prompt: dict) -> str:
        """提交生成任务到队列"""
        data = json.dumps({"prompt": prompt}).encode('utf-8')
        req = urllib.request.Request(
            f"http://{self.server_address}/prompt",
            data=data,
            headers={"Content-Type": "application/json"}
        )
        response = urllib.request.urlopen(req)
        return json.loads(response.read())['prompt_id']

    def get_history(self, prompt_id: str) -> dict:
        """获取任务执行历史"""
        req = urllib.request.Request(
            f"http://{self.server_address}/history/{prompt_id}"
        )
        response = urllib.request.urlopen(req)
        return json.loads(response.read())

    def get_image(self, filename: str, subfolder: str, folder_type: str) -> bytes:
        """获取生成的图片"""
        params = urllib.parse.urlencode({
            "filename": filename,
            "subfolder": subfolder,
            "type": folder_type
        })
        req = urllib.request.Request(
            f"http://{self.server_address}/view?{params}"
        )
        response = urllib.request.urlopen(req)
        return response.read()

    def wait_for_completion(self, prompt_id: str, check_interval: float = 1.0,
                            timeout: float = 300) -> dict:
        """等待任务完成"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            history = self.get_history(prompt_id)
            if prompt_id in history:
                return history[prompt_id]
            time.sleep(check_interval)
        raise TimeoutError(f"任务超时: {prompt_id}")

    def upload_image(self, image_path: str, image_name: Optional[str] = None,
                     overwrite: bool = True) -> dict:
        """上传图片到ComfyUI"""
        if image_name is None:
            image_name = os.path.basename(image_path)

        with open(image_path, 'rb') as f:
            body = f.read()

        multipart_form_data = (
            b'--boundary\r\n'
            b'Content-Disposition: form-data; name="image"; filename="'
            + image_name.encode() + b'"\r\n'
            b'Content-Type: image/png\r\n\r\n'
            + body + b'\r\n'
            b'--boundary\r\n'
            b'Content-Disposition: form-data; name="overwrite"\r\n\r\n'
            + (b"true" if overwrite else b"false") + b'\r\n'
            b'--boundary--\r\n'
        )

        req = urllib.request.Request(
            f"http://{self.server_address}/upload/image",
            data=multipart_form_data,
            headers={"Content-Type": "multipart/form-data; boundary=boundary"}
        )
        response = urllib.request.urlopen(req)
        return json.loads(response.read())

    def generate_and_save(self, workflow: dict, output_dir: str = "output",
                          prefix: str = "generated") -> list:
        """生成图片并保存到本地"""
        prompt_id = self.queue_prompt(workflow)
        print(f"任务已提交: {prompt_id}")

        # 等待完成
        history = self.wait_for_completion(prompt_id)
        print("生成完成")

        # 获取输出图片
        outputs = history.get('outputs', {})
        saved_files = []

        for node_id, node_output in outputs.items():
            if 'images' in node_output:
                for img_info in node_output['images']:
                    image_data = self.get_image(
                        img_info['filename'],
                        img_info.get('subfolder', ''),
                        img_info.get('type', 'output')
                    )
                    # 保存图片
                    os.makedirs(output_dir, exist_ok=True)
                    filename = f"{prefix}_{img_info['filename']}"
                    filepath = os.path.join(output_dir, filename)
                    with open(filepath, 'wb') as f:
                        f.write(image_data)
                    saved_files.append(filepath)
                    print(f"已保存: {filepath}")

        return saved_files


# 使用示例
if __name__ == "__main__":
    client = ComfyUIClient("127.0.0.1:8188")

    # 加载API格式的工作流
    with open("workflow_api.json", 'r') as f:
        workflow = json.load(f)

    # 修改随机种子
    workflow["3"]["inputs"]["seed"] = random.randint(0, 2**32 - 1)

    # 生成并保存
    saved = client.generate_and_save(workflow, output_dir="output", prefix="test")
    print(f"生成了 {len(saved)} 张图片")

三、批量画面生成脚本

3.1 从剧本到分镜的自动化

以下脚本实现了从剧本文本到分镜Prompt的自动转换:

"""
AI漫剧批量生成脚本 - 剧本到分镜
自动将剧本文本拆分为分镜,并生成对应的Prompt
"""

import json
import re
from dataclasses import dataclass, asdict
from typing import List, Optional
from openai import OpenAI

@dataclass
class Storyboard:
    """分镜数据结构"""
    episode: int           # 集数
    scene: int             # 场景编号
    shot_type: str         # 镜头类型(特写/中景/远景等)
    description: str       # 场景描述
    character_action: str  # 角色动作
    dialogue: str          # 对话内容
    emotion: str           # 情绪/氛围
    prompt_positive: str   # 正向Prompt
    prompt_negative: str   # 反向Prompt
    camera_angle: str      # 镜头角度
    lighting: str          # 光线描述

class ScriptToStoryboard:
    """剧本转分镜处理器"""

    def __init__(self, api_key: str, base_url: str = None):
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
        )
        self.model = "qwen-max"

    def parse_script(self, script_text: str, episode: int = 1) -> List[Storyboard]:
        """将剧本文本解析为分镜列表"""
        prompt = f"""你是一个专业的AI漫剧分镜师。请将以下剧本拆分为详细的分镜。

要求:
1. 每个分镜包含:镜头类型、场景描述、角色动作、对话、情绪、镜头角度、光线
2. 为每个分镜生成AI绘画用的正向Prompt(英文,包含构图、人物、场景、风格描述)
3. 生成统一的反向Prompt
4. 镜头类型包括:特写(CU)、中景(MS)、远景(WS)、全景(FS)、俯拍、仰拍
5. Prompt中包含质量标签:masterpiece, best quality, highly detailed

剧本:
{script_text}

请以JSON数组格式输出,每个元素包含以下字段:
episode, scene, shot_type, description, character_action, dialogue, emotion,
prompt_positive, prompt_negative, camera_angle, lighting"""

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=4000
        )

        content = response.choices[0].message.content
        # 提取JSON
        json_match = re.search(r'\[.*\]', content, re.DOTALL)
        if json_match:
            storyboards = json.loads(json_match.group())
            return [Storyboard(**sb) for sb in storyboards]
        else:
            raise ValueError("无法从LLM输出中提取分镜数据")

    def save_storyboards(self, storyboards: List[Storyboard], output_file: str):
        """保存分镜数据"""
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump([asdict(sb) for sb in storyboards], f,
                      ensure_ascii=False, indent=2)
        print(f"分镜数据已保存到: {output_file}")


# 使用示例
if __name__ == "__main__":
    converter = ScriptToStoryboard(api_key="your_api_key")

    script = """
    第一集:命运的邂逅

    场景1:清晨的城市街道,阳光透过树叶洒下斑驳光影。
    女主角林晓背着书包走在上学路上,耳机里播放着音乐。
    突然,一个男生从转角冲出来,撞到了她,书本散落一地。

    场景2:男生连忙道歉,蹲下帮忙捡书。
    两人四目相对,时间仿佛静止。
    男生自我介绍:"你好,我是陈默,刚搬到这附近。"
    """

    storyboards = converter.parse_script(script, episode=1)
    converter.save_storyboards(storyboards, "storyboards_ep01.json")

    for sb in storyboards:
        print(f"\n场景 {sb.scene}: {sb.shot_type}")
        print(f"描述: {sb.description}")
        print(f"Prompt: {sb.prompt_positive[:100]}...")

3.2 批量图片生成脚本

"""
AI漫剧批量图片生成脚本
读取分镜数据,调用ComfyUI API批量生成图片
"""

import json
import time
import os
from comfyui_client import ComfyUIClient
from storyboard_parser import Storyboard

class BatchImageGenerator:
    """批量图片生成器"""

    def __init__(self, comfyui_url: str = "127.0.0.1:8188"):
        self.client = ComfyUIClient(comfyui_url)

    def load_workflow_template(self, template_path: str) -> dict:
        """加载工作流模板"""
        with open(template_path, 'r') as f:
            return json.load(f)

    def create_workflow_from_storyboard(self, template: dict,
                                         storyboard: Storyboard) -> dict:
        """根据分镜数据创建工作流"""
        import copy
        workflow = copy.deepcopy(template)

        # 设置正向Prompt
        workflow["6"]["inputs"]["text"] = storyboard.prompt_positive
        # 设置反向Prompt
        workflow["7"]["inputs"]["text"] = storyboard.prompt_negative
        # 设置随机种子
        workflow["3"]["inputs"]["seed"] = int(time.time() * 1000) % (2**32)

        return workflow

    def generate_episode(self, storyboards: list, template_path: str,
                         output_dir: str, max_retries: int = 3) -> list:
        """批量生成一集的所有分镜图片"""
        template = self.load_workflow_template(template_path)
        results = []
        total = len(storyboards)

        for i, sb in enumerate(storyboards):
            print(f"\n[{i+1}/{total}] 生成场景 {sb.scene}...")

            workflow = self.create_workflow_from_storyboard(template, sb)

            for attempt in range(max_retries):
                try:
                    saved = self.client.generate_and_save(
                        workflow,
                        output_dir=output_dir,
                        prefix=f"ep{sb.episode:02d}_sc{sb.scene:02d}"
                    )
                    results.extend(saved)
                    print(f"  成功生成 {len(saved)} 张图片")
                    break
                except Exception as e:
                    print(f"  第{attempt+1}次尝试失败: {e}")
                    if attempt == max_retries - 1:
                        print(f"  场景 {sb.scene} 生成失败,跳过")
                    time.sleep(2)

        return results

    def generate_with_retry(self, workflow: dict, output_dir: str,
                            prefix: str, target_count: int = 4,
                            quality_threshold: float = 0.8) -> list:
        """生成多张候选图并选择最佳"""
        saved = []
        for i in range(target_count):
            workflow_copy = json.loads(json.dumps(workflow))
            # 修改种子以生成不同结果
            import random
            workflow_copy["3"]["inputs"]["seed"] = random.randint(0, 2**32 - 1)

            try:
                files = self.client.generate_and_save(
                    workflow_copy, output_dir, f"{prefix}_v{i}"
                )
                saved.extend(files)
            except Exception as e:
                print(f"  变体{i}生成失败: {e}")

        return saved


# 使用示例
if __name__ == "__main__":
    # 加载分镜数据
    with open("storyboards_ep01.json", 'r', encoding='utf-8') as f:
        storyboards_data = json.load(f)
    storyboards = [Storyboard(**sb) for sb in storyboards_data]

    # 批量生成
    generator = BatchImageGenerator("127.0.0.1:8188")
    results = generator.generate_episode(
        storyboards=storyboards,
        template_path="workflow_api.json",
        output_dir="output/episode01"
    )

    print(f"\n生成完成,共 {len(results)} 张图片")

四、自动化配音脚本

4.1 TTS API集成

"""
AI漫剧自动化配音脚本
支持多种TTS引擎:Edge TTS、阿里云TTS、腾讯云TTS
"""

import asyncio
import json
import os
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class DialogueLine:
    """对话行"""
    character: str      # 角色名
    text: str           # 对话文本
    voice_id: str       # 语音ID
    emotion: str        # 情绪
    output_file: str    # 输出文件路径


class EdgeTTSEngine:
    """Edge TTS引擎(免费)"""

    def __init__(self):
        self.voice_map = {
            "female_young": "zh-CN-XiaoxiaoNeural",
            "female_mature": "zh-CN-XiaoyiNeural",
            "male_young": "zh-CN-YunxiNeural",
            "male_mature": "zh-CN-YunjianNeural",
            "narrator": "zh-CN-YunyangNeural"
        }

    async def synthesize(self, text: str, voice_id: str,
                         output_file: str, rate: str = "+0%",
                         pitch: str = "+0Hz") -> bool:
        """合成语音"""
        import edge_tts

        communicate = edge_tts.Communicate(
            text=text,
            voice=voice_id,
            rate=rate,
            pitch=pitch
        )
        await communicate.save(output_file)
        return os.path.exists(output_file)

    async def batch_synthesize(self, dialogues: List[DialogueLine],
                               output_dir: str) -> List[str]:
        """批量合成语音"""
        os.makedirs(output_dir, exist_ok=True)
        results = []

        for i, dialogue in enumerate(dialogues):
            voice = self.voice_map.get(dialogue.voice_id, dialogue.voice_id)
            output_file = os.path.join(output_dir, dialogue.output_file)

            # 根据情绪调整语速和音调
            rate, pitch = self._get_emotion_params(dialogue.emotion)

            try:
                await self.synthesize(
                    dialogue.text, voice, output_file, rate, pitch
                )
                results.append(output_file)
                print(f"[{i+1}/{len(dialogues)}] {dialogue.character}: "
                      f"{dialogue.text[:30]}... -> {output_file}")
            except Exception as e:
                print(f"[{i+1}/{len(dialogues)}] 合成失败: {e}")

        return results

    def _get_emotion_params(self, emotion: str) -> tuple:
        """根据情绪获取语速和音调参数"""
        params = {
            "happy": ("+10%", "+2Hz"),
            "sad": ("-10%", "-2Hz"),
            "angry": ("+15%", "+3Hz"),
            "surprised": ("+5%", "+5Hz"),
            "fearful": ("-5%", "-1Hz"),
            "calm": ("-5%", "0Hz"),
            "excited": ("+15%", "+3Hz"),
        }
        return params.get(emotion.lower(), ("+0%", "+0Hz"))


class AliyunTTSEngine:
    """阿里云TTS引擎(付费,质量更高)"""

    def __init__(self, api_key: str, app_key: str):
        self.api_key = api_key
        self.app_key = app_key
        self.api_url = "https://nls-gateway-cn-shanghai.aliyuncs.com/stream/v1/tts"

    def synthesize(self, text: str, voice: str = "zhiyan_emo",
                   output_file: str = "output.wav") -> bool:
        """调用阿里云TTS API"""
        import requests

        params = {
            "appkey": self.app_key,
            "token": self.api_key,
            "text": text,
            "voice": voice,
            "format": "wav",
            "sample_rate": 16000
        }

        response = requests.post(self.api_url, json=params)
        if response.status_code == 200:
            with open(output_file, 'wb') as f:
                f.write(response.content)
            return True
        return False


# 使用示例
async def main():
    # 使用Edge TTS(免费)
    engine = EdgeTTSEngine()

    dialogues = [
        DialogueLine(
            character="林晓",
            text="你是谁?为什么撞我?",
            voice_id="female_young",
            emotion="angry",
            output_file="line_001_linxiao.wav"
        ),
        DialogueLine(
            character="陈默",
            text="对不起对不起!我太着急了,没看到你。",
            voice_id="male_young",
            emotion="surprised",
            output_file="line_002_chenmo.wav"
        ),
        DialogueLine(
            character="旁白",
            text="命运的齿轮,就这样开始转动。",
            voice_id="narrator",
            emotion="calm",
            output_file="line_003_narrator.wav"
        ),
    ]

    results = await engine.batch_synthesize(dialogues, "output/audio")
    print(f"\n配音完成,共 {len(results)} 个文件")

if __name__ == "__main__":
    asyncio.run(main())

五、完整自动化流水线

5.1 端到端自动化脚本

"""
AI漫剧端到端自动化流水线
从剧本文本到最终视频的完整自动化流程
"""

import json
import os
import subprocess
import asyncio
from dataclasses import dataclass
from typing import List
from script_to_storyboard import ScriptToStoryboard
from batch_generator import BatchImageGenerator
from tts_engine import EdgeTTSEngine, DialogueLine

@dataclass
class PipelineConfig:
    """流水线配置"""
    project_name: str
    script_file: str
    output_dir: str
    comfyui_url: str
    workflow_template: str
    llm_api_key: str
    fps: int = 24
    resolution: tuple = (1080, 1920)  # 竖版


class AIComicPipeline:
    """AI漫剧自动化流水线"""

    def __init__(self, config: PipelineConfig):
        self.config = config
        self.output_dir = config.output_dir
        self.setup_directories()

    def setup_directories(self):
        """创建输出目录结构"""
        dirs = [
            self.output_dir,
            f"{self.output_dir}/images",
            f"{self.output_dir}/audio",
            f"{self.output_dir}/video",
            f"{self.output_dir}/final",
            f"{self.output_dir}/metadata"
        ]
        for d in dirs:
            os.makedirs(d, exist_ok=True)

    def step1_generate_storyboards(self) -> list:
        """步骤1:剧本转分镜"""
        print("=" * 50)
        print("步骤1:剧本转分镜")
        print("=" * 50)

        with open(self.config.script_file, 'r', encoding='utf-8') as f:
            script = f.read()

        converter = ScriptToStoryboard(api_key=self.config.llm_api_key)
        storyboards = converter.parse_script(script)

        # 保存分镜数据
        sb_file = f"{self.output_dir}/metadata/storyboards.json"
        converter.save_storyboards(storyboards, sb_file)

        print(f"生成了 {len(storyboards)} 个分镜")
        return storyboards

    def step2_generate_images(self, storyboards: list) -> list:
        """步骤2:批量生成画面"""
        print("\n" + "=" * 50)
        print("步骤2:批量生成画面")
        print("=" * 50)

        generator = BatchImageGenerator(self.config.comfyui_url)
        results = generator.generate_episode(
            storyboards=storyboards,
            template_path=self.config.workflow_template,
            output_dir=f"{self.output_dir}/images"
        )

        print(f"生成了 {len(results)} 张图片")
        return results

    async def step3_generate_audio(self, storyboards: list) -> list:
        """步骤3:批量生成配音"""
        print("\n" + "=" * 50)
        print("步骤3:批量生成配音")
        print("=" * 50)

        engine = EdgeTTSEngine()
        dialogues = []

        for i, sb in enumerate(storyboards):
            if sb.dialogue:
                dialogues.append(DialogueLine(
                    character="narrator",
                    text=sb.dialogue,
                    voice_id="narrator",
                    emotion=sb.emotion,
                    output_file=f"line_{i:03d}.wav"
                ))

        results = await engine.batch_synthesize(
            dialogues, f"{self.output_dir}/audio"
        )

        print(f"生成了 {len(results)} 个配音文件")
        return results

    def step4_compose_video(self, storyboards: list) -> str:
        """步骤4:合成视频"""
        print("\n" + "=" * 50)
        print("步骤4:合成视频")
        print("=" * 50)

        # 使用FFmpeg合成视频
        # 这里简化处理,实际项目中需要更复杂的合成逻辑
        output_file = f"{self.output_dir}/final/{self.config.project_name}.mp4"

        # 创建图片序列文件列表
        images_dir = f"{self.output_dir}/images"
        image_files = sorted([
            f for f in os.listdir(images_dir) if f.endswith('.png')
        ])

        with open(f"{self.output_dir}/metadata/filelist.txt", 'w') as f:
            for img in image_files:
                duration = 3  # 每张图显示3秒
                f.write(f"file '{images_dir}/{img}'\n")
                f.write(f"duration {duration}\n")
            # FFmpeg需要最后一行重复
            if image_files:
                f.write(f"file '{images_dir}/{image_files[-1]}'\n")

        # FFmpeg命令
        cmd = [
            "ffmpeg", "-y",
            "-f", "concat", "-safe", "0",
            "-i", f"{self.output_dir}/metadata/filelist.txt",
            "-vf", f"scale={self.config.resolution[0]}:{self.config.resolution[1]}",
            "-c:v", "libx264", "-pix_fmt", "yuv420p",
            "-r", str(self.config.fps),
            output_file
        ]

        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            print(f"视频已生成: {output_file}")
            return output_file
        else:
            print(f"视频生成失败: {result.stderr}")
            return ""

    def run(self):
        """运行完整流水线"""
        print(f"\n{'='*50}")
        print(f"AI漫剧自动化流水线: {self.config.project_name}")
        print(f"{'='*50}\n")

        # 步骤1:剧本转分镜
        storyboards = self.step1_generate_storyboards()

        # 步骤2:批量生成画面
        images = self.step2_generate_images(storyboards)

        # 步骤3:批量生成配音
        asyncio.run(self.step3_generate_audio(storyboards))

        # 步骤4:合成视频
        video = self.step4_compose_video(storyboards)

        print(f"\n{'='*50}")
        print("流水线执行完成!")
        print(f"{'='*50}")
        print(f"分镜数: {len(storyboards)}")
        print(f"图片数: {len(images)}")
        print(f"视频: {video}")


# 使用示例
if __name__ == "__main__":
    config = PipelineConfig(
        project_name="my_comic_ep01",
        script_file="scripts/episode01.txt",
        output_dir="output/project01",
        comfyui_url="127.0.0.1:8188",
        workflow_template="workflows/comic_workflow_api.json",
        llm_api_key="your_api_key"
    )

    pipeline = AIComicPipeline(config)
    pipeline.run()

5.2 任务队列管理

"""
AI漫剧任务队列管理器
支持多任务并发、断点续传、错误重试
"""

import json
import time
import os
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class Task:
    """任务"""
    id: str
    type: str           # storyboard/image/audio/video
    params: dict
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[dict] = None
    error: Optional[str] = None
    retries: int = 0
    max_retries: int = 3
    created_at: float = field(default_factory=time.time)
    completed_at: Optional[float] = None

class TaskQueue:
    """任务队列管理器"""

    def __init__(self, state_file: str = "task_queue.json"):
        self.state_file = state_file
        self.tasks: List[Task] = []
        self._load_state()

    def _load_state(self):
        """加载任务状态"""
        if os.path.exists(self.state_file):
            with open(self.state_file, 'r') as f:
                data = json.load(f)
                self.tasks = [
                    Task(
                        id=t['id'],
                        type=t['type'],
                        params=t['params'],
                        status=TaskStatus(t['status']),
                        result=t.get('result'),
                        error=t.get('error'),
                        retries=t.get('retries', 0),
                        max_retries=t.get('max_retries', 3),
                        created_at=t.get('created_at', time.time()),
                        completed_at=t.get('completed_at')
                    )
                    for t in data['tasks']
                ]

    def _save_state(self):
        """保存任务状态"""
        data = {
            'tasks': [
                {
                    'id': t.id,
                    'type': t.type,
                    'params': t.params,
                    'status': t.status.value,
                    'result': t.result,
                    'error': t.error,
                    'retries': t.retries,
                    'max_retries': t.max_retries,
                    'created_at': t.created_at,
                    'completed_at': t.completed_at
                }
                for t in self.tasks
            ]
        }
        with open(self.state_file, 'w') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

    def add_task(self, task_type: str, params: dict, task_id: str = None) -> Task:
        """添加任务"""
        if task_id is None:
            task_id = f"{task_type}_{int(time.time())}"
        task = Task(id=task_id, type=task_type, params=params)
        self.tasks.append(task)
        self._save_state()
        return task

    def get_pending_tasks(self) -> List[Task]:
        """获取待执行的任务"""
        return [t for t in self.tasks if t.status == TaskStatus.PENDING]

    def get_progress(self) -> dict:
        """获取进度"""
        total = len(self.tasks)
        completed = sum(1 for t in self.tasks if t.status == TaskStatus.COMPLETED)
        failed = sum(1 for t in self.tasks if t.status == TaskStatus.FAILED)
        running = sum(1 for t in self.tasks if t.status == TaskStatus.RUNNING)
        pending = sum(1 for t in self.tasks if t.status == TaskStatus.PENDING)

        return {
            'total': total,
            'completed': completed,
            'failed': failed,
            'running': running,
            'pending': pending,
            'progress': completed / total * 100 if total > 0 else 0
        }

六、最佳实践与注意事项

6.1 错误处理策略

# 错误处理最佳实践
import time
from functools import wraps

def retry(max_retries=3, delay=1, backoff=2):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    wait_time = delay * (backoff ** attempt)
                    print(f"  重试 {attempt+1}/{max_retries}, "
                          f"等待 {wait_time}s: {e}")
                    time.sleep(wait_time)
        return wrapper
    return decorator

# 使用示例
@retry(max_retries=3, delay=2, backoff=2)
def generate_image(workflow):
    """带重试的图片生成"""
    client = ComfyUIClient()
    return client.generate_and_save(workflow)

6.2 性能优化建议

优化方向具体方法预期提升
并发请求使用异步IO或多线程2-5倍
批量处理减少API调用次数1.5-3倍
缓存机制缓存重复的Prompt结果20-50%
模型预加载避免重复加载模型30%+
分辨率优化先低分辨率预览再高清生成2-3倍

6.3 安全注意事项

[!WARNING]

  • 不要将API Key硬编码在脚本中,使用环境变量或配置文件
  • 定期轮换API Key
  • 设置API调用限额,防止意外超支
  • 备份重要的工作流和配置文件
  • 使用版本控制管理脚本代码
# 安全的API Key管理
import os
from dotenv import load_dotenv

load_dotenv()  # 从.env文件加载环境变量

API_KEY = os.getenv("LLM_API_KEY")
COMFYUI_URL = os.getenv("COMFYUI_URL", "127.0.0.1:8188")

七、总结

API自动化核心要点

  1. ComfyUI API是基础:掌握ComfyUI的API调用是实现自动化的第一步
  2. 分层数据结构:剧本 → 分镜 → Prompt → 图片,清晰的数据流是自动化的骨架
  3. 错误处理至关重要:网络请求、API调用都可能出现错误,完善的错误处理和重试机制是必须的
  4. 模块化设计:将不同功能(剧本解析、图片生成、配音合成)封装为独立模块
  5. 渐进式自动化:先自动化单个环节,再串联完整流程

[!TIP] 不要一开始就追求完全自动化。建议先手动完成整个流程,理解每个环节的细节和可能的问题,然后逐步将各环节自动化。先实现最耗时环节的自动化(如图片批量生成),再逐步扩展到其他环节。自动化程度应该根据实际需求来决定,过度自动化可能增加维护成本。

相关教程