Source code for pydantic_ai_toolsets.toolsets.meta_orchestrator.toolset

"""Meta-orchestrator toolset for pydantic-ai agents."""

from __future__ import annotations

import sys
import time
import uuid
from typing import Any

from pydantic_ai import Agent
from pydantic_ai.toolsets import FunctionToolset

from .storage import MetaOrchestratorStorage, MetaOrchestratorStorageProtocol
from .types import (
    CrossToolsetLink,
    GetWorkflowStatusItem,
    LinkToolsetOutputsItem,
    LinkType,
    StartWorkflowItem,
    SuggestTransitionItem,
    ToolsetTransition,
    WorkflowState,
)
from .workflow_templates import ALL_TEMPLATES

# =============================================================================
# SYSTEM PROMPT - Contains "when and why" to use the toolset
# =============================================================================

META_ORCHESTRATOR_SYSTEM_PROMPT = """
## Meta-Orchestrator

You have access to tools for orchestrating multi-toolset workflows:
- `read_unified_state`: View state across all active toolsets
- `suggest_toolset_transition`: Get recommendations for transitioning between toolsets
- `start_workflow`: Initialize a workflow template
- `link_toolset_outputs`: Create links between outputs from different toolsets
- `get_workflow_status`: Check the current status of an active workflow

### When to Use Meta-Orchestrator

Use these tools in these scenarios:
1. Coordinating multiple toolsets in a complex workflow
2. Managing transitions between different reasoning stages
3. Creating explicit links between toolset outputs
4. Tracking workflow progress across multiple stages
5. Understanding the overall state of a multi-toolset system

### Workflow Management

1. **Start Workflow**: Use `start_workflow` to initialize a predefined workflow template
2. **Monitor State**: Use `read_unified_state` to see the current state across all toolsets
3. **Transition Guidance**: Use `suggest_toolset_transition` when unsure which toolset to use next
4. **Create Links**: Use `link_toolset_outputs` to explicitly connect outputs between toolsets
5. **Check Status**: Use `get_workflow_status` to see workflow progress

### Key Principles

- **Workflow Templates**: Use predefined templates for common patterns (research_assistant, creative_problem_solver, etc.)
- **Explicit Transitions**: Create clear transitions between toolsets to guide the agent
- **Cross-Toolset Links**: Link related outputs to maintain context across toolsets
- **State Awareness**: Regularly check unified state to understand the full picture
"""

# =============================================================================
# TOOL DESCRIPTIONS - Contains "how" to use each specific tool
# =============================================================================

READ_UNIFIED_STATE_DESCRIPTION = """Read the unified state across all active toolsets.

Returns a comprehensive view of:
- All registered toolsets and their states
- Active workflows and their progress
- Cross-toolset links
- Recent transitions

Use this to understand the overall state of the multi-toolset system.
"""

SUGGEST_TRANSITION_DESCRIPTION = """Suggest when to transition from one toolset to another.

Parameters:
- current_toolset_id: Optional ID of current toolset (will infer if not provided)
- current_state_summary: Optional summary of current state

Returns a recommendation for the next toolset to use, including:
- Recommended next toolset
- Reason for the recommendation
- Confidence score
- Conditions that triggered the suggestion

Use this when you're unsure which toolset to use next in a workflow.
"""

START_WORKFLOW_DESCRIPTION = """Start a new workflow using a predefined template.

Parameters:
- template_name: Name of the workflow template (e.g., 'research_assistant', 'creative_problem_solver')
- initial_context: Optional initial context to pass to the workflow

Returns confirmation with workflow ID and initial stage information.

Available templates:
- research_assistant: Search → Self-Ask → Self-Refine → Todo
- creative_problem_solver: Multi-Persona Analysis → Graph of Thoughts → Reflection
- strategic_decision_maker: Multi-Persona Debate → MCTS → Reflection
- code_architect: Self-Ask → Tree of Thoughts → Reflection → Todo
"""

LINK_TOOLSET_OUTPUTS_DESCRIPTION = """Create a link between outputs from different toolsets.

Parameters:
- source_toolset_id: ID of the source toolset
- source_item_id: ID of the item in the source toolset
- target_toolset_id: ID of the target toolset
- target_item_id: ID of the item in the target toolset
- link_type: Type of link ('refines', 'explores', 'synthesizes', or 'references')

Returns confirmation with link ID.

Link types:
- refines: Target output refines/improves source output
- explores: Target output explores/expands on source output
- synthesizes: Target output synthesizes multiple source outputs
- references: Target output references source output

Use this to create explicit relationships between outputs from different toolsets.
"""

GET_WORKFLOW_STATUS_DESCRIPTION = """Get the current status of an active workflow.

Parameters:
- workflow_id: Optional workflow ID (returns active workflow if not provided)

Returns workflow status including:
- Current stage
- Completed stages
- Active toolsets
- Recent transitions
- Cross-toolset links

Use this to check progress and understand where you are in a workflow.
"""


[docs] def create_meta_orchestrator_toolset( storage: MetaOrchestratorStorageProtocol | None = None, *, id: str | None = None, track_usage: bool = False, ) -> FunctionToolset[Any]: """Create a meta-orchestrator toolset for workflow management. This toolset provides tools for AI agents to orchestrate multi-toolset workflows, manage transitions between toolsets, and create cross-toolset links. Args: storage: Optional storage backend. Defaults to in-memory MetaOrchestratorStorage. You can provide a custom storage implementing MetaOrchestratorStorageProtocol for persistence or integration with other systems. id: Optional unique ID for the toolset. track_usage: If True, enables usage metrics collection in storage. Returns: FunctionToolset compatible with any pydantic-ai agent. Example (standalone): ```python from pydantic_ai import Agent from pydantic_ai_toolsets import create_meta_orchestrator_toolset agent = Agent("openai:gpt-4.1", toolsets=[create_meta_orchestrator_toolset()]) result = await agent.run("Start a research assistant workflow") ``` Example (with storage access): ```python from pydantic_ai_toolsets import create_meta_orchestrator_toolset, MetaOrchestratorStorage storage = MetaOrchestratorStorage() toolset = create_meta_orchestrator_toolset(storage=storage) # After agent runs, access workflow state directly workflow = storage.get_active_workflow() print(workflow.current_stage) print(storage.links) ``` """ if storage is not None: _storage = storage else: _storage = MetaOrchestratorStorage(track_usage=track_usage) toolset: FunctionToolset[Any] = FunctionToolset(id=id) _metrics = getattr(_storage, "metrics", None) if hasattr(_storage, "metrics") else None # Register all workflow templates for template in ALL_TEMPLATES: _storage.workflow_registry.register(template) def _get_status_summary() -> str: """Get one-line status summary.""" active_workflow = _storage.get_active_workflow() if active_workflow: return f"Status: ● Active | Workflow: {active_workflow.template_name} | Stage: {active_workflow.current_stage + 1}/{len(active_workflow.active_toolsets)}" return f"Status: ○ No active workflow | Toolsets: {len(_storage.registered_toolsets)}" def _get_next_hint() -> str: """Get contextual hint for next action.""" active_workflow = _storage.get_active_workflow() if not active_workflow: return "Use start_workflow to begin a workflow template." if active_workflow.current_stage < len(active_workflow.active_toolsets) - 1: next_toolset = active_workflow.active_toolsets[active_workflow.current_stage + 1] return f"Consider transitioning to {next_toolset} for the next stage." return "Workflow is in final stage. Complete current tasks and provide final output." def _get_toolset_state_summary(toolset_id: str, toolset_info: dict[str, Any]) -> str: """Get state summary for a single toolset from its storage. Args: toolset_id: ID of the toolset toolset_info: Toolset info dictionary (may contain storage) Returns: Formatted state summary string """ storage = toolset_info.get("storage") if not storage: return f" {toolset_id}: No storage available" lines: list[str] = [f" {toolset_id}:"] # Chain of Thought if hasattr(storage, "thoughts") and storage.thoughts: total = len(storage.thoughts) revisions = sum(1 for t in storage.thoughts if t.is_revision) final = sum(1 for t in storage.thoughts if not t.next_thought_needed) lines.append(f" Thoughts: {total} total, {revisions} revisions, {final} final") # Self-Ask if hasattr(storage, "questions") and storage.questions: if isinstance(storage.questions, dict): total_q = len(storage.questions) answered = sum(1 for q in storage.questions.values() if hasattr(q, "status") and str(q.status) == "answered") final_answers = len(storage.final_answers) if hasattr(storage, "final_answers") and storage.final_answers else 0 lines.append(f" Questions: {total_q} total, {answered} answered, {final_answers} final answers") # Self-Refine / Reflection if hasattr(storage, "outputs") and storage.outputs: if isinstance(storage.outputs, dict): total_outputs = len(storage.outputs) final_outputs = sum(1 for o in storage.outputs.values() if hasattr(o, "is_final") and o.is_final) lines.append(f" Outputs: {total_outputs} total, {final_outputs} final") elif isinstance(storage.outputs, list): total_outputs = len(storage.outputs) final_outputs = sum(1 for o in storage.outputs if hasattr(o, "is_final") and o.is_final) lines.append(f" Outputs: {total_outputs} total, {final_outputs} final") # Todo if hasattr(storage, "todos") and storage.todos: total = len(storage.todos) pending = sum(1 for t in storage.todos if t.status == "pending") in_progress = sum(1 for t in storage.todos if t.status == "in_progress") completed = sum(1 for t in storage.todos if t.status == "completed") lines.append(f" Todos: {total} total ({pending} pending, {in_progress} in progress, {completed} completed)") # Tree of Thought if hasattr(storage, "nodes") and storage.nodes: if isinstance(storage.nodes, dict): total_nodes = len(storage.nodes) solution_nodes = sum(1 for n in storage.nodes.values() if hasattr(n, "is_solution") and n.is_solution) lines.append(f" Nodes: {total_nodes} total, {solution_nodes} solutions") # Graph of Thought if hasattr(storage, "edges") and storage.edges: if isinstance(storage.edges, dict): total_edges = len(storage.edges) lines.append(f" Edges: {total_edges} total") # MCTS if hasattr(storage, "_iteration_count"): iterations = storage._iteration_count nodes = len(storage.nodes) if hasattr(storage, "nodes") else 0 lines.append(f" MCTS: {iterations} iterations, {nodes} nodes") # Beam Search if hasattr(storage, "candidates") and storage.candidates: if isinstance(storage.candidates, dict): total_candidates = len(storage.candidates) lines.append(f" Candidates: {total_candidates} total") # Multi-Persona Analysis if hasattr(storage, "personas") and storage.personas: if isinstance(storage.personas, dict): total_personas = len(storage.personas) total_responses = len(storage.responses) if hasattr(storage, "responses") and storage.responses else 0 lines.append(f" Personas: {total_personas} total, {total_responses} responses") # Multi-Persona Debate if hasattr(storage, "positions") and storage.positions: if isinstance(storage.positions, dict): total_positions = len(storage.positions) total_critiques = len(storage.critiques) if hasattr(storage, "critiques") and storage.critiques else 0 lines.append(f" Positions: {total_positions} total, {total_critiques} critiques") # Search if hasattr(storage, "search_results") and storage.search_results: if isinstance(storage.search_results, dict): total_results = len(storage.search_results) total_extracted = len(storage.extracted_contents) if hasattr(storage, "extracted_contents") and storage.extracted_contents else 0 lines.append(f" Search Results: {total_results} total, {total_extracted} extracted") # Statistics if available if hasattr(storage, "get_statistics"): try: stats = storage.get_statistics() if stats: stats_str = ", ".join(f"{k}: {v}" for k, v in stats.items() if isinstance(v, (int, float))) if stats_str: lines.append(f" Stats: {stats_str}") except Exception: pass # Ignore errors in statistics if len(lines) == 1: # Only header line lines.append(" No active state") return "\n".join(lines) @toolset.tool(description=READ_UNIFIED_STATE_DESCRIPTION) async def read_unified_state() -> str: """Read the unified state across all active toolsets.""" start_time = time.perf_counter() unified_state = _storage.get_unified_state() active_workflow = _storage.get_active_workflow() lines: list[str] = [ "Unified State:", "==============", "", f"Active Toolsets: {len(_storage.registered_toolsets)}", ] # Show state for each registered toolset if _storage.registered_toolsets: lines.append("") for toolset_id, info in _storage.registered_toolsets.items(): state_summary = _get_toolset_state_summary(toolset_id, info) lines.append(state_summary) lines.append("") # Show cross-toolset links if _storage.links: lines.append("Cross-Toolset Links:") for link in _storage.links: lines.append( f" {link.source_toolset_id}:{link.source_item_id}{link.target_toolset_id}:{link.target_item_id} ({link.link_type.value})" ) lines.append("") # Show workflow progress if active_workflow: template = _storage.workflow_registry.get(active_workflow.template_name) total_stages = len(template.stages) if template else len(active_workflow.active_toolsets) current_stage_name = "" if template and active_workflow.current_stage < len(template.stages): current_stage = template.stages[active_workflow.current_stage] current_stage_name = f" ({current_stage.name})" lines.append("Workflow Progress:") lines.append(f" Workflow: {active_workflow.template_name}") lines.append(f" Current Stage: {active_workflow.current_stage + 1}/{total_stages}{current_stage_name}") lines.append(f" Completed Stages: {len(active_workflow.completed_stages)}") if active_workflow.completed_stages: lines.append(f" {', '.join(active_workflow.completed_stages)}") lines.append(f" Active Toolsets: {', '.join(active_workflow.active_toolsets)}") lines.append("") else: lines.append("Workflow Progress: No active workflow") lines.append("") lines.append(_get_status_summary()) lines.append("") lines.append(f"Next: {_get_next_hint()}") result = "\n".join(lines) if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("read_unified_state", "", result, duration_ms) return result @toolset.tool(description=SUGGEST_TRANSITION_DESCRIPTION) async def suggest_toolset_transition(suggestion: SuggestTransitionItem) -> str: """Suggest when to transition from one toolset to another.""" start_time = time.perf_counter() active_workflow = _storage.get_active_workflow() current_toolset_id = suggestion.current_toolset_id # If no current toolset provided, try to infer from workflow if not current_toolset_id and active_workflow: if active_workflow.current_stage < len(active_workflow.active_toolsets): current_toolset_id = active_workflow.active_toolsets[active_workflow.current_stage] if not active_workflow: result = "No active workflow. Use start_workflow to begin a workflow template." elif not current_toolset_id: result = "Cannot determine current toolset. Please provide current_toolset_id." else: # Determine next toolset from workflow current_index = active_workflow.active_toolsets.index(current_toolset_id) if current_toolset_id in active_workflow.active_toolsets else -1 if current_index >= 0 and current_index < len(active_workflow.active_toolsets) - 1: next_toolset_id = active_workflow.active_toolsets[current_index + 1] template = _storage.workflow_registry.get(active_workflow.template_name) reason = "Next stage in workflow" if template and current_index < len(template.stages) - 1: next_stage = template.stages[current_index + 1] reason = f"Transition to {next_stage.name} stage: {next_stage.description or next_stage.transition_condition}" transition = ToolsetTransition( from_toolset_id=current_toolset_id, to_toolset_id=next_toolset_id, reason=reason, confidence=0.9, conditions_met=[suggestion.current_state_summary] if suggestion.current_state_summary else None, ) _storage.track_transition(transition) result = f"Recommended transition: {current_toolset_id}{next_toolset_id}\nReason: {reason}\nConfidence: {transition.confidence}" else: result = f"Current toolset '{current_toolset_id}' is the final stage. Workflow is complete." if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 input_text = suggestion.model_dump_json() if hasattr(suggestion, "model_dump_json") else str(suggestion) _metrics.record_invocation("suggest_toolset_transition", input_text, result, duration_ms) return result @toolset.tool(description=START_WORKFLOW_DESCRIPTION) async def start_workflow(workflow: StartWorkflowItem) -> str: """Start a new workflow using a predefined template.""" start_time = time.perf_counter() template = _storage.workflow_registry.get(workflow.template_name) if not template: result = f"Workflow template '{workflow.template_name}' not found. Available templates: {', '.join(_storage.workflow_registry.list_all())}" else: workflow_id = str(uuid.uuid4()) new_workflow = WorkflowState( workflow_id=workflow_id, template_name=workflow.template_name, current_stage=0, active_toolsets=template.toolsets.copy(), started_at=time.time(), updated_at=time.time(), ) _storage.start_workflow(new_workflow) # Register toolsets if not already registered for toolset_id in template.toolsets: if toolset_id not in _storage.registered_toolsets: _storage.register_toolset(toolset_id, {"type": "unknown", "label": toolset_id}) stage_info = "" if template.stages: current_stage = template.stages[0] stage_info = f"\nCurrent Stage: {current_stage.name} ({current_stage.toolset_id})" result = f"Started workflow '{workflow.template_name}' (ID: {workflow_id}){stage_info}\nToolsets: {', '.join(template.toolsets)}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 input_text = workflow.model_dump_json() if hasattr(workflow, "model_dump_json") else str(workflow) _metrics.record_invocation("start_workflow", input_text, result, duration_ms) return result @toolset.tool(description=LINK_TOOLSET_OUTPUTS_DESCRIPTION) async def link_toolset_outputs(link_item: LinkToolsetOutputsItem) -> str: """Create a link between outputs from different toolsets.""" start_time = time.perf_counter() link_id = str(uuid.uuid4()) link = CrossToolsetLink( link_id=link_id, source_toolset_id=link_item.source_toolset_id, source_item_id=link_item.source_item_id, target_toolset_id=link_item.target_toolset_id, target_item_id=link_item.target_item_id, link_type=link_item.link_type, created_at=time.time(), ) _storage.create_link(link) # Update individual storage link fields if storages are registered source_toolset_info = _storage.registered_toolsets.get(link_item.source_toolset_id) target_toolset_info = _storage.registered_toolsets.get(link_item.target_toolset_id) if source_toolset_info and "storage" in source_toolset_info: source_storage = source_toolset_info["storage"] if hasattr(source_storage, "add_link"): source_storage.add_link(link_item.source_item_id, link_id) if target_toolset_info and "storage" in target_toolset_info: target_storage = target_toolset_info["storage"] if hasattr(target_storage, "add_linked_from"): target_storage.add_linked_from(link_id) # Update active workflow if exists active_workflow = _storage.get_active_workflow() if active_workflow: active_workflow.links.append(link) _storage.update_workflow(active_workflow.workflow_id, {"links": active_workflow.links}) result = f"Created link {link_id}: {link_item.source_toolset_id}:{link_item.source_item_id}{link_item.target_toolset_id}:{link_item.target_item_id} ({link_item.link_type.value})" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 input_text = link_item.model_dump_json() if hasattr(link_item, "model_dump_json") else str(link_item) _metrics.record_invocation("link_toolset_outputs", input_text, result, duration_ms) return result @toolset.tool(description=GET_WORKFLOW_STATUS_DESCRIPTION) async def get_workflow_status(status_item: GetWorkflowStatusItem) -> str: """Get the current status of an active workflow.""" start_time = time.perf_counter() workflow_id = status_item.workflow_id if workflow_id: workflow = _storage.active_workflows.get(workflow_id) else: workflow = _storage.get_active_workflow() if not workflow: result = "No active workflow found." else: template = _storage.workflow_registry.get(workflow.template_name) lines: list[str] = [ f"Workflow Status: {workflow.template_name}", f"Workflow ID: {workflow.workflow_id}", f"Current Stage: {workflow.current_stage + 1}/{len(workflow.active_toolsets)}", ] if template and workflow.current_stage < len(template.stages): current_stage = template.stages[workflow.current_stage] lines.append(f"Current Stage Name: {current_stage.name}") lines.append(f"Current Toolset: {current_stage.toolset_id}") lines.append(f"Transition Condition: {current_stage.transition_condition}") lines.append(f"Completed Stages: {len(workflow.completed_stages)}") if workflow.completed_stages: lines.append(f" {', '.join(workflow.completed_stages)}") lines.append(f"Active Toolsets: {', '.join(workflow.active_toolsets)}") lines.append(f"Total Transitions: {len(workflow.transitions)}") lines.append(f"Total Links: {len(workflow.links)}") if workflow.started_at: elapsed = time.time() - workflow.started_at lines.append(f"Elapsed Time: {elapsed:.1f}s") result = "\n".join(lines) if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 input_text = status_item.model_dump_json() if hasattr(status_item, "model_dump_json") else str(status_item) _metrics.record_invocation("get_workflow_status", input_text, result, duration_ms) return result return toolset
[docs] def get_meta_orchestrator_system_prompt() -> str: """Get the system prompt for the meta-orchestrator toolset. Returns: System prompt string describing when and how to use the meta-orchestrator tools. """ return META_ORCHESTRATOR_SYSTEM_PROMPT