代码数据合成流水线
2198 字约 7 分钟
2025-11-23
1. 概述
代码流水线旨在处理不同来源和用途的代码数据,包括预训练代码语料、指令微调数据以及通用代码生成数据集。根据功能,可以划分为三类:
- 预训练代码数据过滤流水线:对原始预训练代码进行启发式和统计特征过滤,去除自动生成、质量较差或类型不符合预期的代码,得到高质量预训练代码语料。
- 代码 SFT 数据合成流水线:以现有代码为种子,反向生成自然语言指令,再根据指令生成代码,并结合质量评估和沙盒执行,构建适用于代码指令微调的指令-代码对数据。
- 代码生成数据集合成流水线:以对话消息或模板为输入,增强生成高质量指令,再合成对应代码,并通过质量打分和沙盒执行筛选高质量「指令 + 代码」样本,构建代码生成任务的数据集。
这些流水线都遵循「统一的数据存储 + 可组合算子」设计:
- 统一存储 (
FileStorage):所有中间结果都写入缓存文件,便于断点续跑和调试。 - 算子解耦:每一步都是一个独立的
Operator,你可以增删、重排、替换。 - 多场景覆盖:
- 预训练代码语料清洗(CPU,无需大模型)
- 基于 API 的代码指令-代码对合成
- 面向代码生成任务的指令 & 代码增强
下文将分别介绍三条流水线的输入格式、主要算子以及核心逻辑。
2. 快速开始
第一步:安装 dataflow 环境
pip install open-dataflow第二步:创建新的 dataflow 工作文件夹
mkdir run_dataflow
cd run_dataflow第三步:初始化 DataFlow
dataflow init初始化后,run_dataflow 目录下会生成示例流水线文件(路径略有不同,以下为与本说明对应的三个示例):
run_dataflow/pipelines/api_pipelines/code_gen_dataset_pipeline.py
run_dataflow/pipelines/api_pipelines/code_code_to_sft_data_pipeline.py
run_dataflow/pipelines/cpu_pipelines/code_pt_filter.py第四步:配置 API Key 以及 LLM Serving(仅 API 流水线需要)
对于 Linux 和 macOS:
export DF_API_KEY="sk-xxxxx"对于 Windows(PowerShell):
$env:DF_API_KEY = "sk-xxxxx"在代码类 API 流水线中,通常会使用如下方式构造 APILLMServing_request:
self.llm_serving = APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o",
max_workers=10, # 或 100,视具体流水线而定
)第五步:运行示例流水线
在 run_dataflow 目录下,你可以按需运行任意一条流水线:
# 1) 预训练代码过滤(CPU)
python pipelines/cpu_pipelines/code_pt_filter.py
# 2) 从代码合成 SFT 指令-代码对
python pipelines/api_pipelines/code_code_to_sft_data_pipeline.py
# 3) 从原始对话 / 模板生成代码数据集
python pipelines/api_pipelines/code_gen_dataset_pipeline.py3. 数据流与流水线逻辑
3.1 通用输入与 FileStorage
三条流水线都使用 FileStorage 来管理输入与缓存数据,只是默认输入文件不同:
- PTCodeFilter_CPUPipeline
first_entry_file_name="../example_data/CodePipeline/code_input.jsonl"
- CodeSFTSynthesis_APIPipeline
first_entry_file_name="../example_data/CodePipeline/code_synthesis_input.jsonl"
- CodeGenDataset_APIPipeline
first_entry_file_name="../example_data/CodePipeline/raw_code.jsonl"
在实际项目中,你只需将这些示例路径替换为自己的 JSON/JSONL 文件路径即可,无需修改算子调用方式。
3.2 预训练代码过滤流水线:PTCodeFilter_CPUPipeline
该流水线位于 pipelines/cpu_pipelines/code_pt_filter.py,主要用于在不依赖大模型的情况下,对大规模代码语料进行多维度过滤与质量评估。
默认输入文件中通常包含以下字段:
lines:按行拆分后的代码列表,用于行级和长度相关过滤。text:完整的代码文本,用于文本组成、编码数据检测和文档质量评估。dataframe:包含文件路径、语言类型等结构化信息,用于文件类型过滤。
流水线中主要算子及其作用如下:
自动生成代码过滤:
CodeAutoGeneratedFilter- 参数:
min_score=1.0, max_score=1.0 - 输入:
lines - 输出键:
autogen_filter_label - 功能:识别并过滤带有明显自动生成痕迹的代码(如大段模板化注释、生成器标记等)。
- 参数:
代码长度过滤:
CodeLengthSampleFilter- 参数:
min_score=1.0, max_score=1.0 - 输入:
lines - 输出键:
length_filter_label - 功能:根据行数、字符数等长度信息过滤异常短/长的样本。
- 参数:
文本组成过滤:
CodeTextCompositionFilter- 参数:
min_score=1.0, max_score=1.0 - 输入:
text - 输出键:
text_composition_filter_label - 功能:过滤文本组成异常的样本,例如非代码字符比例过高等。
- 参数:
编码数据过滤:
CodeEncodedDataFilter- 参数:
min_score=1.0, max_score=1.0 - 输入:
text - 输出键:
encoded_data_filter_label - 功能:识别并过滤包含大量编码数据(如 Base64、大块十六进制字符串等)的样本。
- 参数:
文档质量过滤:
CodeDocumentQualityFilter- 参数:
min_score=1.0, max_score=1.0 - 阈值配置:
min_num_chars=100,max_num_chars=100000min_num_words=10,max_num_words=50000max_frac_duplicate_lines=0.3max_frac_duplicate_2gram~5gram=0.3max_frac_curly_bracket=0.1max_frac_all_caps_words=0.3min_entropy_unigram=2.0
- 输入:
text - 输出键:
doc_quality_filter_label - 功能:综合考虑长度、重复度、熵等指标,过滤整体质量较差的代码文档。
- 参数:
文件类型内容过滤:
CodeFileTypeContentFilter- 输入:
dataframe - 输出键:
file_type_filter_label - 功能:基于文件扩展名、语法特征等信息过滤不感兴趣或不符合预期类型的文件。
- 输入:
通用分数过滤(可选):
CodeGenericScoreFilter- 在示例代码中默认被注释掉,你可以将多路过滤分数汇总到某个字段(如
quality_score),再通过该算子做统一阈值过滤。
- 在示例代码中默认被注释掉,你可以将多路过滤分数汇总到某个字段(如
3.3 代码 SFT 合成流水线:CodeSFTSynthesis_APIPipeline
该流水线位于 pipelines/api_pipelines/code_code_to_sft_data_pipeline.py,目标是从已有代码出发,自动合成高质量的指令-代码对数据集,适用于代码指令微调(Code SFT)场景。
3.3.1 输入与存储
- 默认输入文件:
../example_data/CodePipeline/code_synthesis_input.jsonl - 典型字段:
raw_code:原始代码片段,是整个流水线的核心输入。
FileStorage 会将每一步的输出缓存为中间文件,方便断点调试与增量运行。
3.3.2 主要算子与数据流
Code → Instruction:
CodeCodeToInstructionGenerator- 输入:
raw_code - 输出键:
generated_instruction - 功能:使用 LLM 从代码反向生成自然语言指令/任务描述,使得指令与代码强相关、可用于人类可读的 SFT 数据。
- 输入:
Instruction → Code:
CodeInstructionToCodeGenerator- 输入:
generated_instruction - 输出键:
generated_code - 功能:根据生成的指令重新生成代码,用于校验指令是否充分、明确,并产生一对(指令,代码)。
- 输入:
质量评估:
CodeQualitySampleEvaluator- 输入:
generated_instruction与generated_code - 功能:使用 LLM 评估指令与代码的一致性、清晰程度以及可执行性,生成质量分数和反馈信息。
- 输入:
分数过滤:
CodeQualityScoreFilter- 参数:
min_score=0.0,max_score=10.0 - 输入:
generated_instruction,generated_code - 输出键:
quality_score_filter_label - 功能:基于质量评估结果对样本进行打标或筛选,去除明显低质量样本。
- 参数:
沙盒执行评估:
CodeSandboxSampleEvaluator- 参数:
language='python' - 输入:
generated_code - 功能:在隔离环境中尝试执行代码,检查是否存在语法错误或明显运行时问题,为后续进一步筛选提供依据。
- 参数:
3.4 代码生成数据集流水线:CodeGenDataset_APIPipeline
该流水线位于 pipelines/api_pipelines/code_gen_dataset_pipeline.py,更偏向从原始对话 / 模板消息出发,增强指令并生成高质量代码数据集。
3.4.1 输入与存储
- 默认输入文件:
../example_data/CodePipeline/raw_code.jsonl - 典型字段:
input:原始指令或粗粒度任务说明,用于生成更高质量的指令。
3.4.2 主要算子与数据流
指令增强:
CodeEnhancementInstructionGenerator- 输入:
input - 输出键:
generated_instruction - 功能:基于粗粒度输入(如聊天记录、简短提示),生成结构化、清晰、可直接用于 CodeGen 任务的高质量指令。
- 输入:
Instruction → Code:
CodeInstructionToCodeGenerator- 输入:
generated_instruction - 输出键:
generated_code - 功能:根据指令生成对应代码,实现「指令 + 代码」样本对。
- 输入:
质量评估:
CodeQualitySampleEvaluator- 输入:
generated_instruction,generated_code - 功能:评估指令与代码的一致性、完整性和合理性。
- 输入:
高分样本过滤:
CodeQualityScoreFilter- 参数:
min_score=7.0,max_score=10.0 - 输入:
generated_instruction,generated_code - 输出键:
quality_score:质量分数quality_feedback:来自 LLM 的文字反馈quality_score_filter_label:是否通过过滤的标记
- 功能:仅保留质量分数较高的样本,便于直接作为训练/评测数据使用。
- 参数:
沙盒执行评估:
CodeSandboxSampleEvaluator- 参数:
language='python' - 输入:
generated_code - 功能:验证生成代码在语法和简单运行层面的可行性。
- 参数:
4. 流水线示例代码
下面给出示例代码,演示如何使用多个算子进行代码数据处理。该示例展示了如何初始化一个代码数据处理流水线,并且顺序执行各个过滤和清理步骤。
class CodeSFTSynthesis_APIPipeline:
def __init__(self, llm_serving: LLMServingABC | None = None):
self.storage = FileStorage(
first_entry_file_name="../example_data/CodePipeline/code_synthesis_input.jsonl",
cache_path="./cache",
file_name_prefix="dataflow_cache_step",
cache_type="jsonl",
)
self.llm_serving = llm_serving or APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o",
max_workers=100,
)
self.instruction_synthesizer_step1 = CodeCodeToInstructionGenerator(llm_serving=self.llm_serving)
self.code_generator_step2 = CodeInstructionToCodeGenerator(llm_serving=self.llm_serving)
self.pair_evaluator_step3 = CodeQualitySampleEvaluator(llm_serving=self.llm_serving)
self.score_filter_step4 = CodeQualityScoreFilter(
llm_serving=self.llm_serving, min_score=0.0, max_score=10.0
)
self.sandbox_evaluator_step5 = CodeSandboxSampleEvaluator(language="python")
def forward(self):
self.instruction_synthesizer_step1.run(
storage=self.storage.step(),
input_key="raw_code",
output_key="generated_instruction",
)
self.code_generator_step2.run(
storage=self.storage.step(),
input_key="generated_instruction",
output_key="generated_code",
)
self.pair_evaluator_step3.run(
storage=self.storage.step(),
input_instruction_key="generated_instruction",
input_code_key="generated_code",
)
self.score_filter_step4.run(
storage=self.storage.step(),
input_instruction_key="generated_instruction",
input_code_key="generated_code",
output_key="quality_score_filter_label",
)
self.sandbox_evaluator_step5.run(
storage=self.storage.step(),
input_key="generated_code",
)
if __name__ == "__main__":
pl = CodeSFTSynthesis_APIPipeline()
pl.forward()
