Skip to main content

is_generic_role_name

def is_generic_role_name(role_name: str):
Check if a role name is generic and should trigger fallback logic. Generic role names are common, non-specific identifiers that don’t provide meaningful information about an agent’s actual purpose. When a role name is generic, fallback logic should be used to find a more specific identifier (e.g., from LLM-generated agent_title or description). Parameters:
  • role_name (str): The role name to check (will be converted to lowercase for case-insensitive comparison).
Returns: bool: True if the role name is generic, False otherwise.

WorkflowMetadata

class WorkflowMetadata(BaseModel):
Pydantic model for workflow metadata tracking. This model defines the formal schema for workflow metadata that tracks versioning, timestamps, and contextual information about saved workflows. Used to maintain workflow history and enable proper version management.

WorkflowConfig

class WorkflowConfig(BaseModel):
Configuration for workflow memory management. Centralizes all workflow-related configuration options to avoid scattered settings across multiple files and methods.

WorkerConf

class WorkerConf(BaseModel):
The configuration of a worker.

TaskResult

class TaskResult(BaseModel):
The result of a task.

QualityEvaluation

class QualityEvaluation(BaseModel):
Quality evaluation result for a completed task. .. deprecated:: Use :class:TaskAnalysisResult instead. This class is kept for backward compatibility.

TaskAssignment

class TaskAssignment(BaseModel):
An individual task assignment within a batch.

_split_and_strip

def _split_and_strip(dep_str: str):
Utility to split a comma separated string and strip whitespace.

validate_dependencies

def validate_dependencies(cls, v):

TaskAssignResult

class TaskAssignResult(BaseModel):
The result of task assignment for both single and batch assignments.

RecoveryStrategy

class RecoveryStrategy(str, Enum):
Strategies for handling failed tasks.

str

def __str__(self):

repr

def __repr__(self):

FailureContext

class FailureContext(BaseModel):
Context information about a task failure.

TaskAnalysisResult

class TaskAnalysisResult(BaseModel):
Unified result for task failure analysis and quality evaluation. This model combines both failure recovery decisions and quality evaluation results into a single structure. For failure analysis, only the recovery strategy and reasoning fields are populated. For quality evaluation, all fields including quality_score and issues are populated.

is_quality_evaluation

def is_quality_evaluation(self):
Returns: bool: True if this is a quality evaluation (has quality_score), False if this is a failure analysis.

quality_sufficient

def quality_sufficient(self):
Returns: bool: True if quality is sufficient (score >= 70 and no recovery strategy recommended), False otherwise. Always False for failure analysis results.

PipelineTaskBuilder

class PipelineTaskBuilder:
Helper class for building pipeline tasks with dependencies.

init

def __init__(self):
Initialize an empty pipeline task builder.

add

def add(
    self,
    content: str,
    task_id: Optional[str] = None,
    dependencies: Optional[List[str]] = None,
    additional_info: Optional[dict] = None,
    auto_depend: bool = True
):
Add a task to the pipeline with support for chaining. Parameters:
  • content (str): The content/description of the task.
  • task_id (str, optional): Unique identifier for the task. If None, a unique ID will be generated. (default: :obj:None)
  • dependencies (List[str], optional): List of task IDs that this task depends on. If None and auto_depend=True, will depend on the last added task. (default: :obj:None)
  • additional_info (dict, optional): Additional information for the task. (default: :obj:None)
  • auto_depend (bool, optional): If True and dependencies is None, automatically depend on the last added task. (default: :obj:True)
Returns: PipelineTaskBuilder: Self for method chaining.

add_parallel_tasks

def add_parallel_tasks(
    self,
    task_contents: List[str],
    dependencies: Optional[List[str]] = None,
    task_id_prefix: str = 'parallel',
    auto_depend: bool = True
):
Add multiple parallel tasks that can execute simultaneously. Parameters:
  • task_contents (List[str]): List of task content strings.
  • dependencies (List[str], optional): Common dependencies for all parallel tasks. If None and auto_depend=True, will depend on the last added task. (default: :obj:None)
  • task_id_prefix (str, optional): Prefix for generated task IDs. (default: :obj:"parallel")
  • auto_depend (bool, optional): If True and dependencies is None, automatically depend on the last added task. (default: :obj:True)
Returns: PipelineTaskBuilder: Self for method chaining.

add_sync_task

def add_sync_task(
    self,
    content: str,
    wait_for: Optional[List[str]] = None,
    task_id: Optional[str] = None
):
Add a synchronization task that waits for multiple tasks. Parameters:
  • content (str): Content of the synchronization task.
  • wait_for (List[str], optional): List of task IDs to wait for. If None, will automatically wait for the last parallel tasks. (default: :obj:None)
  • task_id (str, optional): ID for the sync task. If None, a unique ID will be generated. (default: :obj:None)
Returns: PipelineTaskBuilder: Self for method chaining.

build

def build(self):
Returns: List[Task]: List of tasks with proper dependency relationships.

clear

def clear(self):
Clear all tasks from the builder.

fork

def fork(self, task_contents: List[str]):
Create parallel branches from the current task (alias for add_parallel_tasks). Parameters:
  • task_contents (List[str]): List of task content strings for parallel execution.
Returns: PipelineTaskBuilder: Self for method chaining.

join

def join(self, content: str, task_id: Optional[str] = None):
Join parallel branches with a synchronization task (alias for add_sync_task). Parameters:
  • content (str): Content of the join/sync task.
  • task_id (str, optional): ID for the sync task.
Returns: PipelineTaskBuilder: Self for method chaining.

_validate_dependencies

def _validate_dependencies(self):

get_task_info

def get_task_info(self):
Returns: dict: Dictionary containing task count and task details.

check_if_running

def check_if_running(
    running: bool,
    max_retries: int = 3,
    retry_delay: float = 1.0,
    handle_exceptions: bool = False
):
Check if the workforce is (not) running, specified by the boolean value. Provides fault tolerance through automatic retries and exception handling. Parameters:
  • running (bool): Expected running state (True or False).
  • max_retries (int, optional): Maximum number of retry attempts if the operation fails. Set to 0 to disable retries. (default: :obj:3)
  • retry_delay (float, optional): Delay in seconds between retry attempts. (default: :obj:1.0)
  • handle_exceptions (bool, optional): If True, catch and log exceptions instead of propagating them. (default: :obj:False)
Raises:
  • RuntimeError: If the workforce is not in the expected status and
  • Exception: Any exception raised by the decorated function if