CompositionTaskFilter
About 458 wordsAbout 2 min
2025-10-09
📘 Overview
The CompositionTaskFilter is an operator designed to evaluate the feasibility and completeness of a composition task based on its constituent sub-tasks. It leverages a Large Language Model (LLM) service to judge whether a task is executable and subsequently filters out the tasks that are not viable.
__init__ function
def __init__(self, llm_serving: LLMServingABC)| Parameter | Type | Default | Description |
|---|---|---|---|
| llm_serving | LLMServingABC | Required | LLM serving object implementing the LLMServingABC interface. |
Prompt Template Descriptions
| Prompt Template Name | Main Purpose | Applicable Scenarios | Feature Description |
|---|---|---|---|
run function
def run(self, storage: DataFlowStorage, input_composition_task_key: str, input_sub_tasks_keys: list[str], output_key: str = "runable_label")| Parameter | Type | Default | Description |
|---|---|---|---|
| storage | DataFlowStorage | Required | DataFlow storage instance, responsible for reading and writing data. |
| input_composition_task_key | str | Required | Field name for the composition task. |
| input_sub_tasks_keys | list[str] | Required | List of field names for sub-tasks (e.g., atomic, parallel, subsequent tasks). |
| output_key | str | "runable_label" | Output field name for the executability label. |
🧠 Example Usage
from dataflow.operators.conversations import CompositionTaskFilter
from dataflow.utils.storage import FileStorage
from dataflow.serving import APILLMServing_request
from dataflow.core import LLMServingABC
class CompositionTaskFilterExample:
def __init__(self, llm_serving: LLMServingABC = None):
self.storage = FileStorage(
first_entry_file_name="input.jsonl",
cache_path="./cache_local",
file_name_prefix="dataflow_cache_step",
cache_type="jsonl",
)
self.llm_serving = APILLMServing_request(
api_url="",
model_name="gpt-4o",
max_workers=30
)
self.operator = CompositionTaskFilter(
llm_serving=self.llm_serving
)
def forward(self):
self.operator.run(
storage=self.storage.step(),
input_composition_task_key="composition_task",
input_sub_tasks_keys=["atom_task", "subsequent_task"],
output_key="runable_label"
)
if __name__ == "__main__":
pl = CompositionTaskFilterExample()
pl.forward()🧾 Default Output Format
The operator filters the input data, returning only the rows corresponding to feasible and complete composition tasks. A new column is added to indicate the executability label.
| Field | Type | Description |
|---|---|---|
| original_columns | any | The original columns from the input data are preserved. |
| runable_label | int | The executability label generated by the LLM (1 for feasible). The output is filtered to only include rows where this is 1. |
Example Input:
{
"composition_task": "Find the airline offering the cheapest fare within a budget of $500 for a round-trip flight from New York to London, departing on November 15th and returning on November 22nd.",
"atom_task": "Search for a round-trip flight from New York to London, departing on November 15th and returning on November 22nd, with a budget of $500 or less.",
"subsequent_task": "What is the airline offering the cheapest fare within the budget for this round-trip flight?"
}Example Output:
{
"composition_task": "Find the airline offering the cheapest fare within a budget of $500 for a round-trip flight from New York to London, departing on November 15th and returning on November 22nd.",
"atom_task": "Search for a round-trip flight from New York to London, departing on November 15th and returning on November 22nd, with a budget of $500 or less.",
"subsequent_task": "What is the airline offering the cheapest fare within the budget for this round-trip flight?",
"runable_label": 1
}
