图像生成流水线(API版)
1325 字约 4 分钟
2026-02-15
1. 概述
图像生成流水线的核心功能是根据用户提供的文本生成目标图片,为后续图像理解、图像编辑等任务提供图片数据。
本版本使用云端API模型进行文本到图片生成,目前支持的API模型包括:
- OpenAI格式:
dall-e-2,dall-e-3,gpt-image-1 - Gemini格式:
gemini-2.5-flash-image,gemini-3-pro-image-preview等
不同模型的具体功能、详细用法请参考各模型的官方API文档。
💡 提示:如果要使用本地GPU模型进行文本到图片生成,请查看 图像生成流水线(GPU版)
2. 快速开始
第一步:创建新的 DataFlow 工作文件夹
mkdir run_dataflow_mm
cd run_dataflow_mm第二步:配置 API KEY 和 BASE URL
通过设置环境变量来配置API KEY和BASE URL:
# 设置API密钥(必需)
export DF_API_KEY=<your_api_key>
# 设置API基础URL(可选)
# 如果不设置,将根据选用的API格式使用默认URL:
# - Gemini格式:https://generativelanguage.googleapis.com
# - OpenAI格式:https://api.openai.com/v1
export DF_BASE_URL=<your_base_url> # 可选第三步:准备文本数据
我们使用 jsonl 文件来保存文本数据,每行一个样本。下面是一个简单的输入数据样例:
{"conversations": [{"content": "a fox darting between snow-covered pines at dusk", "role": "user"}]}
{"conversations": [{"content": "a kite surfer riding emerald waves under a cloudy sky", "role": "user"}]}conversations 包含图片生成描述的对话列表,content 字段是其中的文本提示词。
第四步:运行流水线
- 基本用法
python dataflow/statics/pipelines/api_pipelines/text_to_image_generation_api_pipeline.py \
--first_entry_file_name <your_input_text_file_path>生成的文件会默认保存在 ./cache_local/text2image_api 文件夹内。
- 命令行参数说明
本流水线支持以下命令行参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
--api_format | str | gemini | API格式类型,可选 openai 或 gemini |
--model_name | str | gemini-3-pro-image-preview | 模型名称,可选 dall-e-2, dall-e-3, gpt-image-1, gemini-2.5-flash-image, gemini-3-pro-image-preview 等 |
--batch_size | int | 4 | 批次大小,控制每次处理的样本数量 |
--first_entry_file_name | str | None | 输入数据文件路径( jsonl 格式) |
--cache_path | str | ./cache_local/text2image_api | 缓存路径,用于存储中间结果和最终生成的图片 |
3. 数据流与流水线逻辑
1. 输入数据
该流程的输入数据包括以下字段:
- conversations:对话格式数据,包含文本提示词。
这些输入数据存储在 jsonl 文件中,并通过 FileStorage 对象进行管理和读取:
self.storage = FileStorage(
first_entry_file_name="<your_jsonl_file_path>",
cache_path="./cache_local/text2image_api",
file_name_prefix="dataflow_cache_step",
cache_type="jsonl"
)2. 文本到图像生成(PromptedImageGenerator)
流程的核心步骤是使用提示式图像生成器(PromptedImageGenerator)结合云端API服务为每个文本提示词生成对应的图像。
功能:
- 利用云端API模型根据文本提示词生成图像
- 支持多种API格式(OpenAI、Gemini等)
- 可配置批次大小和生成参数
- 自动保存生成的图像到指定路径
输入:对话格式数据(包含文本提示词)
输出:生成的图像文件路径
API服务配置:
self.serving = APIImageGenServing(
api_url=api_url, # API服务地址
image_io=ImageIO(save_path=image_save_path), # 图像保存路径
Image_gen_task="text2image", # 任务类型:文本到图像
batch_size=4, # 批次大小
api_format="gemini", # API格式:gemini 或 openai
model_name="gemini-3-pro-image-preview", # 模型名称
api_key=api_key, # API密钥
)算子初始化:
self.text_to_image_generator = PromptedImageGenerator(
t2i_serving=self.serving, # 文本到图像服务
save_interval=10 # 保存间隔
)算子运行:
self.text_to_image_generator.run(
storage=self.storage.step(),
input_conversation_key="conversations", # 输入对话字段
output_image_key="images", # 输出图像字段
)3. 输出数据
最终,流水线生成的输出数据将包含以下内容:
- conversations:原始对话数据(包含文本提示词)
- images:生成的图像文件路径列表
输出数据示例:
{"conversations":[{"content":"a fox darting between snow-covered pines at dusk","role":"user"}],"images":["./cache_local/text2image_api/sample0_condition0/sample0_condition0_0.png"]}4. 流水线示例
下面给出使用云端API的文本到图片生成流水线示例:
import os
import argparse
from pathlib import Path
from dataflow.operators.core_vision import PromptedImageGenerator
from dataflow.serving.api_image_gen_serving import APIImageGenServing
from dataflow.utils.storage import FileStorage
from dataflow.io import ImageIO
class ImageGenerationAPIPipeline():
"""
Text to Image Generation API Pipeline
Supported Models:
OpenAI format (api_format="openai"): dall-e-2, dall-e-3, gpt-image-1
Gemini format (api_format="gemini"): gemini-2.5-flash-image, gemini-3-pro-image-preview, etc.
"""
def __init__(
self,
api_format="gemini",
model_name="gemini-3-pro-image-preview",
batch_size=4,
first_entry_file_name=None,
cache_path="./cache_local/text2image_api",
):
current_file = Path(__file__).resolve()
project_root = current_file.parent.parent.parent.parent.parent
if first_entry_file_name is None:
data_file = project_root / "dataflow" / "example" / "image_gen" / "text2image" / "prompts.jsonl"
first_entry_file_name = str(data_file)
# -------- 存储配置 --------
self.storage = FileStorage(
first_entry_file_name=first_entry_file_name,
cache_path=cache_path,
file_name_prefix="dataflow_cache_step",
cache_type="jsonl"
)
# -------- API 配置 --------
api_key = os.environ.get("DF_API_KEY")
api_url = os.environ.get("DF_BASE_URL")
if api_key is None:
raise ValueError("API key is required. Please set it via environment variable DF_API_KEY")
if api_url is None:
if api_format == "gemini":
api_url = "https://generativelanguage.googleapis.com"
else:
api_url = "https://api.openai.com/v1"
image_save_path = str(project_root / "cache_local" / "text2image_api")
# -------- 图像生成 API 服务 --------
self.serving = APIImageGenServing(
api_url=api_url,
image_io=ImageIO(save_path=image_save_path),
Image_gen_task="text2image",
batch_size=batch_size,
api_format=api_format,
model_name=model_name,
api_key=api_key,
)
# -------- 文本到图像生成算子 --------
self.text_to_image_generator = PromptedImageGenerator(
t2i_serving=self.serving,
save_interval=10
)
def forward(self):
# 调用 PromptedImageGenerator 生成图像
self.text_to_image_generator.run(
storage=self.storage.step(),
input_conversation_key="conversations",
output_image_key="images",
)
if __name__ == "__main__":
# -------- 命令行参数解析 --------
parser = argparse.ArgumentParser(description="Cloud API Image Generation Pipeline")
parser.add_argument('--api_format', choices=['openai', 'gemini'], default='gemini',
help='API format type: openai (OpenAI DALL-E) or gemini (Google Gemini)')
parser.add_argument('--model_name', type=str, default='gemini-3-pro-image-preview',
help='Model name')
parser.add_argument('--batch_size', type=int, default=4, help='Batch size')
parser.add_argument('--first_entry_file_name', type=str, default=None,
help='Input data file path (default uses example_data)')
parser.add_argument('--cache_path', type=str, default="./cache_local/text2image_api",
help='Cache path')
args = parser.parse_args()
if not os.environ.get("DF_API_KEY"):
parser.error("Environment variable DF_API_KEY is not set. Please use export DF_API_KEY=your_api_key to set it")
# -------- 流水线入口 --------
model = ImageGenerationAPIPipeline(
api_format=args.api_format,
model_name=args.model_name,
batch_size=args.batch_size,
first_entry_file_name=args.first_entry_file_name,
cache_path=args.cache_path,
)
model.forward()
