Code Data Synthesis Pipeline
About 1312 wordsAbout 4 min
2025-11-21
1. Overview
The code pipeline is designed to process different types of code data, including pretraining code corpora, instruction fine-tuning data, and general code generation datasets. Functionally, it can be divided into three categories:
- Pretraining Code Filtering Pipeline: Applies heuristic and statistical filters to raw pretraining code, removing auto-generated, low‑quality, or unexpected file types to obtain a high‑quality pretraining corpus.
- Code SFT Synthesis Pipeline: Uses existing code as seeds, generates natural‑language instructions from code, then regenerates code from the instructions, combined with quality evaluation and sandbox execution to construct instruction–code pairs for code instruction fine‑tuning.
- Code Generation Dataset Pipeline: Takes dialogue messages or templates as input, enhances them into high‑quality instructions, generates corresponding code, and filters samples via quality scoring and sandbox execution to build datasets for code generation tasks.
All three pipelines follow the same design principle of unified data storage + composable operators:
- Unified storage (
FileStorage): All intermediate results are written to cache files, which makes debugging and resume‑from‑checkpoint easier. - Decoupled operators: Each step is an independent
Operatorthat you can add, remove, reorder, or replace as needed. - Multiple use cases:
- Pretraining code corpus cleaning (CPU‑only, no LLM required)
- API‑based synthesis of instruction–code pairs for SFT
- Enhanced instruction & code generation for code‑centric datasets
The following sections describe the inputs, core operators, and logic for each pipeline.
2. Quick Start
Step 1: Install DataFlow
pip install open-dataflowStep 2: Create a new working directory
mkdir run_dataflow
cd run_dataflowStep 3: Initialize DataFlow
dataflow initAfter initialization, the run_dataflow directory will contain several example pipelines (paths may differ slightly). For this document, the relevant ones are:
run_dataflow/pipelines/api_pipelines/code_gen_dataset_pipeline.py
run_dataflow/pipelines/api_pipelines/code_code_to_sft_data_pipeline.py
run_dataflow/pipelines/cpu_pipelines/code_pt_filter.pyStep 4: Configure API key and LLM serving (API pipelines only)
For Linux and macOS:
export DF_API_KEY="sk-xxxxx"For Windows (PowerShell):
$env:DF_API_KEY = "sk-xxxxx"In code‑based API pipelines, APILLMServing_request is typically configured as:
self.llm_serving = APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o",
max_workers=10, # or 100, depending on the pipeline
)Step 5: Run the example pipelines
From the run_dataflow directory you can run any of the following:
# 1) Pretraining code filtering (CPU)
python pipelines/cpu_pipelines/code_pt_filter.py
# 2) Synthesize SFT instruction–code pairs from existing code
python pipelines/api_pipelines/code_code_to_sft_data_pipeline.py
# 3) Generate a code dataset from raw conversations / templates
python pipelines/api_pipelines/code_gen_dataset_pipeline.py3. Data Flow and Pipeline Logic
1. Common Inputs and FileStorage
All three pipelines use FileStorage to manage input and cached data, differing only in their default input files:
- PTCodeFilter_CPUPipeline
first_entry_file_name="../example_data/CodePipeline/code_input.jsonl"
- CodeSFTSynthesis_APIPipeline
first_entry_file_name="../example_data/CodePipeline/code_synthesis_input.jsonl"
- CodeGenDataset_APIPipeline
first_entry_file_name="../example_data/CodePipeline/raw_code.jsonl"
In real‑world usage, you only need to replace these example paths with your own JSON/JSONL files; the rest of the operators can remain unchanged.
2. Pretraining Code Filtering Pipeline (PTCodeFilter_CPUPipeline)
This pipeline (in pipelines/cpu_pipelines/code_pt_filter.py) is intended for multi‑dimensional filtering and quality assessment of large‑scale code corpora without using any LLMs.
The default input typically provides:
lines: Code split into lines, used for line‑level and length‑related filters.text: Full code text, used for composition analysis, encoded‑data detection, and document‑level quality checks.dataframe: Structured metadata such as file path and language type, used for file‑type filtering.
The main operators are:
Auto‑generated code filter:
CodeAutoGeneratedFilter- Parameters:
min_score=1.0, max_score=1.0 - Input:
lines - Output key:
autogen_filter_label - Function: Detects and filters code that appears to be automatically generated (e.g., large boilerplate sections, generator markers).
- Parameters:
Code length filter:
CodeLengthSampleFilter- Parameters:
min_score=1.0, max_score=1.0 - Input:
lines - Output key:
length_filter_label - Function: Filters samples that are unusually short or long based on line/character counts.
- Parameters:
Text composition filter:
CodeTextCompositionFilter- Parameters:
min_score=1.0, max_score=1.0 - Input:
text - Output key:
text_composition_filter_label - Function: Filters samples whose character composition is abnormal (e.g., too much non‑code content).
- Parameters:
Encoded‑data filter:
CodeEncodedDataFilter- Parameters:
min_score=1.0, max_score=1.0 - Input:
text - Output key:
encoded_data_filter_label - Function: Detects and filters large blocks of encoded data, such as Base64 or long hexadecimal blobs.
- Parameters:
Document quality filter:
CodeDocumentQualityFilter- Parameters:
min_score=1.0, max_score=1.0 - Thresholds (example configuration):
min_num_chars=100,max_num_chars=100000min_num_words=10,max_num_words=50000max_frac_duplicate_lines=0.3max_frac_duplicate_2gram~5gram=0.3max_frac_curly_bracket=0.1max_frac_all_caps_words=0.3min_entropy_unigram=2.0
- Input:
text - Output key:
doc_quality_filter_label - Function: Combines length, redundancy, and entropy metrics to filter low‑quality code documents.
- Parameters:
File‑type content filter:
CodeFileTypeContentFilter- Input:
dataframe - Output key:
file_type_filter_label - Function: Filters out files whose type or content does not match the desired set (e.g., non‑code or unwanted languages).
- Input:
Generic score filter (optional):
CodeGenericScoreFilter- In the example code this is commented out; you can aggregate multiple scores into a single field (e.g.,
quality_score) and then apply this operator for a unified threshold.
- In the example code this is commented out; you can aggregate multiple scores into a single field (e.g.,
3. Code SFT Synthesis Pipeline (CodeSFTSynthesis_APIPipeline)
This pipeline (in pipelines/api_pipelines/code_code_to_sft_data_pipeline.py) is used to synthesize high‑quality instruction–code pairs from existing code, suitable for instruction fine‑tuning of code models.
3.1 Input and storage
- Default input file:
../example_data/CodePipeline/code_synthesis_input.jsonl - Typical fields:
raw_code: Raw code snippets that serve as seeds for synthesis.
FileStorage writes each step’s outputs to separate cache files, enabling debugging and incremental reruns.
3.2 Main operators and data flow
Code → Instruction:
CodeCodeToInstructionGenerator- Input:
raw_code - Output key:
generated_instruction - Function: Uses an LLM to generate natural‑language instructions or task descriptions from code, yielding human‑readable SFT instructions that are tightly aligned with the code.
- Input:
Instruction → Code:
CodeInstructionToCodeGenerator- Input:
generated_instruction - Output key:
generated_code - Function: Regenerates code from the instructions, both validating instruction clarity and producing instruction–code pairs.
- Input:
Quality evaluation:
CodeQualitySampleEvaluator- Inputs:
generated_instruction,generated_code - Function: Uses an LLM to evaluate consistency, clarity, and executability, and outputs quality scores and feedback.
- Inputs:
Score‑based filtering:
CodeQualityScoreFilter- Parameters:
min_score=0.0,max_score=10.0 - Inputs:
generated_instruction,generated_code - Output key:
quality_score_filter_label - Function: Tags or filters out low‑quality samples based on the evaluation scores.
- Parameters:
Sandbox execution:
CodeSandboxSampleEvaluator- Parameters:
language='python' - Input:
generated_code - Function: Executes code in an isolated environment to detect syntax errors or obvious runtime issues.
- Parameters:
4. Code Generation Dataset Pipeline (CodeGenDataset_APIPipeline)
This pipeline (in pipelines/api_pipelines/code_gen_dataset_pipeline.py) is geared towards constructing high‑quality “instruction + code” datasets from raw conversations or template messages.
4.1 Input and storage
- Default input file:
../example_data/CodePipeline/raw_code.jsonl - Typical fields:
input: Raw instruction, templates, or coarse‑grained task descriptions used as seeds for instruction enhancement.
4.2 Main operators and data flow
Instruction enhancement:
CodeEnhancementInstructionGenerator- Input:
input - Output key:
generated_instruction - Function: Converts rough inputs (e.g., chat history, short prompts) into clear, structured, and high‑quality instructions suitable for code generation.
- Input:
Instruction → Code:
CodeInstructionToCodeGenerator- Input:
generated_instruction - Output key:
generated_code - Function: Generates code from instructions to form instruction–code samples.
- Input:
Quality evaluation:
CodeQualitySampleEvaluator- Inputs:
generated_instruction,generated_code - Function: Evaluates instruction–code consistency, completeness, and overall quality.
- Inputs:
High‑score filtering:
CodeQualityScoreFilter- Parameters:
min_score=7.0,max_score=10.0 - Inputs:
generated_instruction,generated_code - Output keys:
quality_score: Numeric quality scorequality_feedback: Textual feedback from the LLMquality_score_filter_label: Pass/fail label
- Function: Keeps only high‑quality samples that are directly usable for training or evaluation.
- Parameters:
Sandbox execution:
CodeSandboxSampleEvaluator- Parameters:
language='python' - Input:
generated_code - Function: Verifies that generated code is syntactically and (in simple cases) semantically valid.
- Parameters:
4. Pipeline Example
The following example shows how to initialize a code SFT synthesis pipeline and sequentially run the main operators.
class CodeSFTSynthesis_APIPipeline:
def __init__(self, llm_serving: LLMServingABC | None = None):
self.storage = FileStorage(
first_entry_file_name="../example_data/CodePipeline/code_synthesis_input.jsonl",
cache_path="./cache",
file_name_prefix="dataflow_cache_step",
cache_type="jsonl",
)
self.llm_serving = llm_serving or APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o",
max_workers=100,
)
self.instruction_synthesizer_step1 = CodeCodeToInstructionGenerator(llm_serving=self.llm_serving)
self.code_generator_step2 = CodeInstructionToCodeGenerator(llm_serving=self.llm_serving)
self.pair_evaluator_step3 = CodeQualitySampleEvaluator(llm_serving=self.llm_serving)
self.score_filter_step4 = CodeQualityScoreFilter(
llm_serving=self.llm_serving,
min_score=0.0,
max_score=10.0,
)
self.sandbox_evaluator_step5 = CodeSandboxSampleEvaluator(language="python")
def forward(self):
self.instruction_synthesizer_step1.run(
storage=self.storage.step(),
input_key="raw_code",
output_key="generated_instruction",
)
self.code_generator_step2.run(
storage=self.storage.step(),
input_key="generated_instruction",
output_key="generated_code",
)
self.pair_evaluator_step3.run(
storage=self.storage.step(),
input_instruction_key="generated_instruction",
input_code_key="generated_code",
)
self.score_filter_step4.run(
storage=self.storage.step(),
input_instruction_key="generated_instruction",
input_code_key="generated_code",
output_key="quality_score_filter_label",
)
self.sandbox_evaluator_step5.run(
storage=self.storage.step(),
input_key="generated_code",
)
if __name__ == "__main__":
pl = CodeSFTSynthesis_APIPipeline()
pl.forward()
