Skip to main content

TaskValidationMode

class TaskValidationMode(Enum):
Validation modes for different use cases.

validate_task_content

def validate_task_content(
    content: str,
    task_id: str = 'unknown',
    min_length: int = 1,
    mode: TaskValidationMode = TaskValidationMode.INPUT,
    check_failure_patterns: bool = True
):
Unified validation for task content and results to avoid silent failures. Performs comprehensive checks to ensure content meets quality standards. Parameters:
  • content (str): The task content or result to validate.
  • task_id (str): Task ID for logging purposes. (default: :obj:"unknown")
  • min_length (int): Minimum content length after stripping whitespace. (default: :obj:1)
  • mode (TaskValidationMode): Validation mode - INPUT for task content, OUTPUT for task results. (default: :obj:TaskValidationMode.INPUT)
  • check_failure_patterns (bool): Whether to check for failure indicators in the content. Only effective in OUTPUT mode. (default: :obj:True)
Returns: bool: True if content passes validation, False otherwise.

is_task_result_insufficient

def is_task_result_insufficient(task: 'Task'):
Check if a task result is insufficient and should be treated as failed. This is a convenience wrapper around validate_task_content for backward compatibility and semantic clarity when checking task results. Parameters:
  • task (Task): The task to check.
Returns: bool: True if the result is insufficient, False otherwise.

parse_response

def parse_response(response: str, task_id: Optional[str] = None):
Parse Tasks from a response. Parameters:
  • response (str): The model response.
  • task_id (str, optional): a parent task id, the default value is “0”
Returns: List[Task]: A list of tasks which is :obj:Task instance.

TaskState

class TaskState(str, Enum):

states

def states(cls):

Task

class Task(BaseModel):
Task is specific assignment that can be passed to a agent. Parameters:
  • content (str): string content for task.
  • id (str): An unique string identifier for the task. This should ideally be provided by the provider/model which created the task. (default: :obj:uuid.uuid4())
  • state (TaskState): The state which should be OPEN, RUNNING, DONE or DELETED. (default: :obj:TaskState.FAILED)
  • type (Optional[str]): task type. (default: :obj:None)
  • parent (Optional[Task]): The parent task, None for root task. (default: :obj:None)
  • subtasks (List[Task]): The childrent sub-tasks for the task. (default: :obj:[])
  • result (Optional[str]): The answer for the task. (default: :obj:"")
  • failure_count (int): The failure count for the task. (default: :obj:0)
  • assigned_worker_id (Optional[str]): The ID of the worker assigned to this task. (default: :obj:None)
  • dependencies (List[Task]): The dependencies for the task. (default: :obj:[])
  • additional_info (Optional[Dict[str, Any]]): Additional information for the task. (default: :obj:None)
  • image_list (Optional[List[Union[Image.Image, str]]]): Optional list of PIL Image objects or image URLs (strings) associated with the task. (default: :obj:None)
  • image_detail (Literal["auto", "low", "high"]): Detail level of the images associated with the task. (default: :obj:auto)
  • video_bytes (Optional[bytes]): Optional bytes of a video associated with the task. (default: :obj:None)
  • video_detail (Literal["auto", "low", "high"]): Detail level of the videos associated with the task. (default: :obj:auto)

repr

def __repr__(self):
Return a string representation of the task.

from_message

def from_message(cls, message: BaseMessage):
Create a task from a message. Parameters:
  • message (BaseMessage): The message to the task.
Returns: Task

to_message

def to_message():
Convert a Task to a Message.

reset

def reset(self):
Reset Task to initial state.

update_result

def update_result(self, result: str):
Set task result and mark the task as DONE. Parameters:
  • result (str): The task result.

set_id

def set_id(self, id: str):
Set the id of the task. Parameters:
  • id (str): The id of the task.

set_state

def set_state(self, state: TaskState):
Recursively set the state of the task and its subtasks. Parameters:
  • state (TaskState): The giving state.

add_subtask

def add_subtask(self, task: 'Task'):
Add a subtask to the current task. Parameters:
  • task (Task): The subtask to be added.

remove_subtask

def remove_subtask(self, id: str):
Remove a subtask from the current task. Parameters:
  • id (str): The id of the subtask to be removed.

get_running_task

def get_running_task(self):
Get RUNNING task.

to_string

def to_string(self, indent: str = '', state: bool = False):
Convert task to a string. Parameters:
  • indent (str): The ident for hierarchical tasks.
  • state (bool): Include or not task state.
Returns: str: The printable task string.

get_result

def get_result(self, indent: str = ''):
Get task result to a string. Parameters:
  • indent (str): The ident for hierarchical tasks.
Returns: str: The printable task string.

decompose

def decompose(
    self,
    agent: 'ChatAgent',
    prompt: Optional[str] = None,
    task_parser: Callable[[str, str], List['Task']] = parse_response
):
Decompose a task to a list of sub-tasks. Automatically detects streaming or non-streaming based on agent configuration. Parameters:
  • agent (ChatAgent): An agent that used to decompose the task.
  • prompt (str, optional): A prompt to decompose the task. If not provided, the default prompt will be used.
  • task_parser (Callable[[str, str], List[Task]], optional): A function to extract Task from response. If not provided, the default parse_response will be used.
Returns: Union[List[Task], Generator[List[Task], None, None]]: If agent is configured for streaming, returns a generator that yields lists of new tasks as they are parsed. Otherwise returns a list of all tasks.

_decompose_streaming

def _decompose_streaming(
    self,
    response: 'StreamingChatAgentResponse',
    task_parser: Callable[[str, str], List['Task']]
):
Handle streaming response for task decomposition. Parameters:
  • response: Streaming response from agent
  • task_parser: Function to parse tasks from response
  • Yields: List[Task]: New tasks as they are parsed from streaming response

_decompose_non_streaming

def _decompose_non_streaming(self, response, task_parser: Callable[[str, str], List['Task']]):
Handle non-streaming response for task decomposition. Parameters:
  • response: Regular response from agent
  • task_parser: Function to parse tasks from response
Returns: List[Task]: All parsed tasks

_parse_partial_tasks

def _parse_partial_tasks(self, response: str):
Parse tasks from potentially incomplete response. Parameters:
  • response: Partial response content
Returns: List[Task]: Tasks parsed from complete <task>``</task> blocks

compose

def compose(
    self,
    agent: 'ChatAgent',
    template: TextPrompt = TASK_COMPOSE_PROMPT,
    result_parser: Optional[Callable[[str], str]] = None
):
compose task result by the sub-tasks. Parameters:
  • agent (ChatAgent): An agent that used to compose the task result.
  • template (TextPrompt, optional): The prompt template to compose task. If not provided, the default template will be used.
  • result_parser (Callable[[str, str], List[Task]], optional): A function to extract Task from response.

get_depth

def get_depth(self):
Get current task depth.

TaskManager

class TaskManager:
TaskManager is used to manage tasks. Parameters:
  • task (Task): The root Task.

init

def __init__(self, task: Task):

gen_task_id

def gen_task_id(self):
Generate a new task id.

exist

def exist(self, task_id: str):
Check if a task with the given id exists.

current_task

def current_task(self):
Get the current task.

topological_sort

def topological_sort(tasks: List[Task]):
Sort a list of tasks by topological way. Parameters:
  • tasks (List[Task]): The giving list of tasks.
Returns: The sorted list of tasks.

set_tasks_dependence

def set_tasks_dependence(
    root: Task,
    others: List[Task],
    type: Literal['serial', 'parallel'] = 'parallel'
):
Set relationship between root task and other tasks. Two relationships are currently supported: serial and parallel. serial : root -> other1 -> other2 parallel: root -> other1 -> other2 Parameters:
  • root (Task): A root task.
  • others (List[Task]): A list of tasks.

add_tasks

def add_tasks(self, tasks: Union[Task, List[Task]]):
self.tasks and self.task_map will be updated by the input tasks.

evolve

def evolve(
    self,
    task: Task,
    agent: 'ChatAgent',
    template: Optional[TextPrompt] = None,
    task_parser: Optional[Callable[[str, str], List[Task]]] = None
):
Evolve a task to a new task. Evolve is only used for data generation. Parameters:
  • task (Task): A given task.
  • agent (ChatAgent): An agent that used to evolve the task.
  • template (TextPrompt, optional): A prompt template to evolve task. If not provided, the default template will be used.
  • task_parser (Callable, optional): A function to extract Task from response. If not provided, the default parser will be used.
Returns: Task: The created :obj:Task instance or None.