Agentic RAG Data Synthesis Pipeline
About 648 wordsAbout 2 min
2025-07-14
1. Overview
The Agentic RAG Data Synthesis Pipeline aims to automatically generate high-quality, verifiable Q&A pairs from raw textual contexts and evaluate their quality, providing robust data for subsequent Agentic RAG training (including RL rewards).
We support the following use cases:
- RL-based Agentic RAG training
- Automatically constructing high-quality Q&A pairs from raw text
The main stages of the pipeline include:
- Atomic Q&A generation: extract and generate questions, answers, refined answers, and optional answers from text.
- Quality evaluation: score generated answers against golden document answers (e.g., F1) for downstream training.
2. Quick Start
Step 1: Install dataflow
pip install open-dataflowStep 2: Create a new dataflow workspace
mkdir run_dataflow
cd run_dataflowStep 3: Initialize Dataflow
dataflow initYou will see
run_dataflow/pipelines/api_pipelines/agentic_rag_pipeline.pyStep 4: Set your API key and api_url
For Linux and macOS
export DF_API_KEY="sk-xxxxx"For Windows
$env:DF_API_KEY = "sk-xxxxx"In agentic_rag_pipeline.py, set api_url like:
self.llm_serving = APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o-mini",
max_workers=500
)Step 5: One-click run
python pipelines/api_pipelines/agentic_rag_pipeline.pyYou can also run any other pipeline script as needed; the process is similar. Below we introduce the operators used in the pipeline and how to configure them.
3. Data Flow and Pipeline Logic
1. Input Data
The pipeline input typically includes the following fields:
- contents: raw text content
These inputs can be stored in specified files (e.g., json, jsonl) and managed/read via a FileStorage object. The example loads a default data path; in practice, you can adjust paths to load your own data and cache locations:
self.storage = FileStorage(
first_entry_file_name="../example_data/AgenticRAGPipeline/eval_test_data.jsonl",
cache_path="./agenticRAG_eval_cache",
file_name_prefix="agentic_rag_eval",
cache_type="jsonl",
)2. Atomic Q&A Generation (AgenticRAGAtomicTaskGenerator)
The first step is to use the Atomic Task Generator (AgenticRAGAtomicTaskGenerator) to generate: questions, initial answers, refined answers, optional answers, and answers with/without document context.
Functionality:
- Generate questions and multiple forms of answers from textual context
- Produce structured fields for evaluation and training (questions, answers, refined answers, optional answers, LLM answers, golden document answers, etc.)
Input: raw text content (prompts or contents as in the example)
Output: question, answer, refined_answer, optional_answer, llm_answer, golden_doc_answer, identifier, candidate_tasks_str, llm_score, golden_doc_score
self.llm_serving = APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o-mini",
max_workers=500
)
atomic_task_generator = AgenticRAGAtomicTaskGenerator(
llm_serving=self.llm_serving
)
result = atomic_task_generator.run(
storage = self.storage.step(),
input_key = "contents",
)3. Q&A Quality Evaluation (AgenticRAGQAF1SampleEvaluator)
The second step uses the F1 Scorer (AgenticRAGQAF1SampleEvaluator) to evaluate the F1 score between the refined answer and the golden document answer. This is used to build reward signals for RL training and ensure training quality.
Functionality:
- Evaluate overlap (F1) between
refined_answerandgolden_doc_answer
Input: refined_answer, golden_doc_answer
Output: F1Score
evaluator = AgenticRAGQAF1SampleEvaluator()
evaluator.run(
storage=self.storage.step(),
output_key="F1Score",
input_prediction_key="refined_answer",
input_ground_truth_key="golden_doc_answer"
)4. Output Data
The final output includes:
- question: atomized question generated by the model
- answer: initial answer
- refined_answer: cleaned and optimized final answer
- optional_answer: a set of acceptable alternative answers
- llm_answer: answer generated by the LLM without context (for evaluation)
- golden_doc_answer: golden-standard answer extracted/generated from the original document
- identifier: content identifier extracted from input text
- candidate_tasks_str: JSON string of candidate tasks and conclusions
- llm_score/golden_doc_score: quality scores
- F1Score: F1 between
refined_answerandgolden_doc_answer
4. Pipeline Example
An example pipeline demonstrating AgenticRAG data processing with generation and evaluation steps:
import pandas as pd
from dataflow.operators.agentic_rag import AgenticRAGQAF1SampleEvaluator
from dataflow.operators.agentic_rag import (
AgenticRAGAtomicTaskGenerator,
AgenticRAGDepthQAGenerator,
AgenticRAGWidthQAGenerator
)
from dataflow.utils.storage import FileStorage
from dataflow.serving import APILLMServing_request
from dataflow.core import LLMServingABC
class AgenticRAGEval_APIPipeline():
def __init__(self, llm_serving=None):
self.storage = FileStorage(
first_entry_file_name="../example_data/AgenticRAGPipeline/eval_test_data.jsonl",
cache_path="./agenticRAG_eval_cache",
file_name_prefix="agentic_rag_eval",
cache_type="jsonl",
)
self.llm_serving = APILLMServing_request(
api_url="https://api.openai.com/v1/chat/completions",
model_name="gpt-4o-mini",
max_workers=500
)
self.task_step1 = AgenticRAGAtomicTaskGenerator(
llm_serving=self.llm_serving
)
self.task_step2 = AgenticRAGQAF1SampleEvaluator()
def forward(self):
self.task_step1.run(
storage = self.storage.step(),
input_key = "contents",
)
self.task_step2.run(
storage=self.storage.step(),
output_key="F1Score",
input_prediction_key="refined_answer",
input_ground_truth_key="golden_doc_answer"
)
if __name__ == "__main__":
model = AgenticRAGEval_APIPipeline()
model.forward()
