Operator Reuse / Prompt Optimization
About 1638 wordsAbout 5 min
2026-02-05
1. Overview
Prompt Optimization is the core module of DataFlow-Agent designed for Prompt-Engineering. Its design goal is to solve the problem of "generic operator logic reuse".
This module adopts a single-node architecture. When a user proposes a new data processing requirement, the Agent not only writes a Prompt that complies with operator specifications but also automatically generates synthetic test data and internally builds and runs test scripts.
2. Core Features
2.1 Example-Based Generation
- Operator Code Analysis: The Agent automatically reads the source code of the target operator (
OP_NAME) and extracts its parameter definitions. - Prompt Migration: The system retrieves existing Prompt cases from the operator library and uses them as context to guide the LLM in generating a new Prompt class that conforms to the operator's interface specifications (e.g.,
initparameter structure).
2.2 Self-Verification Based on Synthetic Data
The generated Prompt does not merely remain as text; the Agent immediately tests it.
- Data Synthesis: The Agent does not require large-scale business data from the user for testing. Instead, it utilizes the LLM to analyze the operator logic and automatically generates a set of synthetic test data covering various edge cases, saving it as a temporary JSONL file.
- Subprocess Execution: The Agent internally and automatically builds a temporary Python test script and launches a subprocess to execute it, verifying whether the generated Prompt runs correctly and produces the expected results.
2.3 Iterative Optimization
- Interactive Feedback: The system does not perform blind automatic retries. Users input modification suggestions after viewing the test results, the generated Prompt, and data previews on the frontend.
- Targeted Hot Update: Upon receiving feedback, the backend
PromptWritercalls therevise_with_feedbackmethod to make targeted modifications to the Prompt code while maintaining the existing context, automatically triggering a new round of the testing loop.
3. System Architecture
This function is defined by dataflow_agent/workflow/wf_pipeline_prompt.py, featuring a core single-node workflow. All generation and verification logic are highly cohesive within the PromptWriter Agent.
3.1 Core Node Process
The Prompt Writer Node is the only node in the graph, executing the following internal logic in sequence:
- Context Retrieval: Calls Pre-tools to obtain the target operator's source code, the user's target, and Prompt examples.
- Prompt Generation: Calls the LLM to generate the Prompt class code in Python form. It then saves the generated code to the
stateobject via theupdate_state_resultmethod and writes it to a local file to provide dependencies for subsequent test steps. - Test Data Synthesis: Calls the internal method
_build_test_data_by_llmto generate synthetic test data based on the task description. - Test Script Construction: Calls the internal method
_build_test_codeto generate a temporary test script using string templates. - Subprocess Execution: Uses
subprocessto run the test script and captures standard output (stdout) and standard error (stderr). - Test Result Output: Scans and reads the test result file generated by the subprocess execution, updates the test results into
state.temp_data, and completes the process.
3.2 Iterative Optimization Mechanism
The optimization process depends on frontend interaction:
- The user views the execution results in the UI.
- The user submits feedback.
- The frontend calls
_on_chat_submit, triggering the Agent'srevise_with_feedbackinterface. - The Agent modifies the code based on the feedback and re-executes the validation phase described above (Test Data Synthesis -> Test Script Construction -> Subprocess Execution).
4. User Guide
This feature provides two modes of usage: Graphical Interface (Gradio UI) and Command Line Script.
4.1 Graphical Interface
The frontend code is located in gradio_app/pages/PA_frontend.py, which provides a visual interactive experience. It is ideal for interactive exploration and rapid validation. To launch the web interface:
python gradio_app/app.pyVisit http://127.0.0.1:7860 and start using
Initial Generation:
- Configure API information (URL, Key, Model).
- Fill in the task description and operator name.
- (Optional) Specify the output format, argument list, and file output root path.
- Click the "Generate Prompt Template" button.
- View the test data, test results, Prompt code, and test code generated by the Agent.
Multi-turn Optimization:
- If the results do not meet expectations, enter improvement suggestions in the dialogue box on the right.
- Click "Send Rewrite Instruction".
- View the updated code and test results.
- Repeat steps 1-3 until a satisfactory result is obtained.
Using the Generated Prompt:
- Get the generated Prompt file location from "Prompt File Path".
- Import the Prompt class into your operator.
- Specify
prompt_templatein the operator'sinit().
4.2 Script Invocation and Explicit Configuration
For developers who need to integrate Prompt generation into automated pipelines or prefer code-based configuration, script/run_dfa_pipeline_prompt.py can be used.
1. Modify the Configuration
Open script/run_dfa_pipeline_prompt.py and make modifications in the configuration section at the top of the file.
API Configuration
CHAT_API_URL: URL of the LLM serviceapi_key: Access key (using the environment variable DF_API_KEY)MODEL: Model name, default is gpt-4o
Task Configuration
TASK_DESCRIPTION: Describe the task you want this Prompt to complete in natural language- Example:
"I want to write a filter prompt suitable for financial questions."
- Example:
OP_NAME: Specify which operator will load and use the generated PromptOUTPUT_FORMAT(Optional): Specify the output format of the Prompt. If left blank, the Agent will generate it by imitating existing promptsARGUMENTS(Optional): Parameters required by the Prompt template, separated by commas, spaces, or newlines- Example:
["min_len=10", "drop_na=true"]
- Example:
Environment Configuration
CACHE_DIR: Result output directory. The generated Prompt files (.py), temporary test data, test code, etc., will all be saved hereDELETE_TEST_FILES: Whether to automatically clean up temporary synthetic test data after running (True/False)
2. Run the Script
After completing the configuration, execute the following command in the terminal:
python script/run_dfa_pipeline_prompt.py3. Result Output
After the script is executed, the console will print the generation process. You can find the generated files in the CACHE_DIR directory.
4.3 Practical Case: Reuse the ReasoningQuestionFilter to Write a Filter Prompt for Financial Questions
You can refer to the following tutorials for learning, and also use the sample of Google Colab we provide to run the program:
Suppose we want to reuse the ReasoningQuestionFilter operator in the system and turn it into a filter for financial domain questions. Open the script and modify the configuration as follows:
# ===== Example config (edit here) =====
# 1. Define the task
TASK_DESCRIPTION = "I want to write a filter prompt suitable for financial questions."
# 2. Specify the operator to reuse (tell the Agent this Prompt is for PromptedGenerator)
OP_NAME = "ReasoningQuestionFilter"
# These two items only need to be provided if the operator does not have any preset prompts; otherwise, it will be generated by imitating existing prompts
OUTPUT_FORMAT = "" # e.g. "Return JSON with keys: ..."
ARGUMENTS = [] # e.g. ["min_len=10", "drop_na=true"]
# Cache directory for storing test data and prompts
CACHE_DIR = "./pa_cache"
DELETE_TEST_FILES = FalseRun:
After running the script, the terminal will output execution logs. You can find the generated Prompt file finance_question_filter_prompt20260209143556.py, test code test_FinanceQuestionFilterPrompt.py, and test data in the CACHE_DIR directory. The content of the generated Prompt is as follows:
__all__ = ['FinanceQuestionFilterPrompt']
from dataflow.core.prompt import DIYPromptABC
class FinanceQuestionFilterPrompt(DIYPromptABC):
def __init__(self):
pass
def build_prompt(self, question: str) -> str:
prompt = f"""
# 角色:
你是一个金融问题的审核助手。
# 任务
你的任务是检查给定的金融问题是否符合以下标准:
0. 首先,确认输入仅包含一个明确的金融问题(没有额外的指令如“重写”、“翻译”或提供的答案);如果不符合,输出 judgement_test=false。
1. 检查拼写、语法和格式(例如货币符号、百分比表示),不解释语义。
2. 对于每个最小前提(无法进一步分解),验证其是否违反常识、金融领域事实或任务要求(例如,“负利率”在某些情况下可能无效);如果无效,则失败。
3. 检查前提之间或推理过程中的任何矛盾,或者最终结果是否明显不合理或不可解;如果是,则失败。
4. 如果以上都通过,检查是否有足够的信息来完成任务;缺少必要条件 ⇒ 失败,冗余细节是可以接受的。
# 输出格式
完成这些步骤后,输出格式必须为:
{{
"judgement_test": true/false,
"error_type": "<错误描述或null>"
}}
你可以包括你的思维过程,但最终输出必须是上面的JSON格式。
这里是需要评估的内容:
-------------------------------
{question}
-------------------------------
"""
return promptThe test code test_FinanceQuestionFilterPrompt.py generated by the Agent is as follows:
"""
Auto-generated by prompt_writer
"""
from dataflow.pipeline import PipelineABC
from dataflow.utils.storage import FileStorage
from dataflow.serving import APILLMServing_request, LocalModelLLMServing_vllm
try:
from dataflow.operators.reasoning.filter.reasoning_question_filter import ReasoningQuestionFilter
except Exception:
from dataflow.operators.reasoning import ReasoningQuestionFilter
from finance_question_filter_prompt20260209143556 import FinanceQuestionFilterPrompt
class RecommendPipeline(PipelineABC):
def __init__(self):
super().__init__()
# -------- FileStorage --------
self.storage = FileStorage(
first_entry_file_name="./pa_cache/prompt_test_data.jsonl",
cache_path="./pa_cache",
file_name_prefix="dataflow_cache_step",
cache_type="jsonl",
)
# -------- LLM Serving (Remote) --------
self.llm_serving = APILLMServing_request(
api_url="http://123.129.219.111:3000/v1/chat/completions",
key_name_of_api_key="DF_API_KEY",
model_name="gpt-4o",
max_workers=100,
)
self.reasoning_question_filter = ReasoningQuestionFilter(system_prompt='You are a helpful assistant.', llm_serving=self.llm_serving, prompt_template=FinanceQuestionFilterPrompt())
def forward(self):
self.reasoning_question_filter.run(
storage=self.storage.step(), input_key='math_problem'
)
if __name__ == "__main__":
pipeline = RecommendPipeline()
pipeline.compile()
pipeline.forward()
