Source code for pydantic_ai_toolsets.toolsets.meta_orchestrator.helpers
"""Helper functions for combining toolsets and managing workflows."""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai.toolsets import AbstractToolset, CombinedToolset
from .._shared.aliasing import create_aliased_toolset, get_prefix_for_toolset
from .._shared.system_prompts import combine_system_prompts
from .types import WorkflowTemplate
[docs]
def create_combined_toolset(
toolsets: list[AbstractToolset[Any]],
storages: dict[str, Any] | None = None,
prefix_map: dict[str, str] | None = None,
orchestrator: AbstractToolset[Any] | None = None,
workflow_template: WorkflowTemplate | None = None,
auto_prefix: bool = True,
) -> tuple[CombinedToolset[Any], str]:
"""Combine multiple toolsets with automatic collision resolution.
Uses official pydantic-ai API:
- AbstractToolset.prefixed() to create aliased toolsets
- CombinedToolset to combine toolsets
Strategy:
1. If auto_prefix=True, apply prefixes to all toolsets based on prefix_map
(prevents collisions proactively)
2. If auto_prefix=False, rely on CombinedToolset to detect collisions
(raises UserError if collisions exist - user must handle)
3. Use CombinedToolset to combine all toolsets
4. Add orchestrator tools if provided
5. Combine system prompts from all toolsets
Args:
toolsets: List of toolsets to combine
storages: Optional dictionary mapping toolset IDs to storage instances
prefix_map: Optional dictionary mapping toolset IDs to prefixes.
If not provided, prefixes are inferred from toolset IDs.
orchestrator: Optional meta-orchestrator toolset to add
workflow_template: Optional workflow template for context
auto_prefix: If True, automatically prefix all toolsets to prevent collisions
Returns:
Tuple of (CombinedToolset, combined_system_prompt)
Example:
```python
from pydantic_ai_toolsets import create_cot_toolset, create_tot_toolset
from pydantic_ai_toolsets.toolsets.meta_orchestrator.helpers import create_combined_toolset
cot_toolset = create_cot_toolset()
tot_toolset = create_tot_toolset()
prefix_map = {
"cot": "cot_",
"tot": "tot_",
}
combined_toolset, combined_prompt = create_combined_toolset(
toolsets=[cot_toolset, tot_toolset],
prefix_map=prefix_map,
)
```
"""
# 1. Apply prefixes if auto_prefix is enabled
if auto_prefix:
aliased_toolsets = []
for toolset in toolsets:
# Get toolset ID (from toolset.id or infer from prefix_map keys)
toolset_id = toolset.id if hasattr(toolset, "id") and toolset.id else None
# Get prefix from prefix_map or infer from toolset
if prefix_map and toolset_id and toolset_id in prefix_map:
prefix = prefix_map[toolset_id]
elif toolset_id:
prefix = get_prefix_for_toolset(toolset_id)
else:
# Try to infer from prefix_map keys (fallback)
prefix = None
if prefix_map:
# Find matching prefix by checking toolset type
for key, value in prefix_map.items():
if key.lower() in str(type(toolset)).lower():
prefix = value
break
if prefix:
# Use official prefixed() method!
aliased_toolsets.append(create_aliased_toolset(toolset, prefix))
else:
# No prefix available, use original
aliased_toolsets.append(toolset)
else:
# No auto-prefixing, use toolsets as-is
# CombinedToolset will raise UserError on collisions
aliased_toolsets = toolsets
# 2. Add orchestrator tools if provided
all_toolsets = aliased_toolsets.copy()
if orchestrator:
all_toolsets.append(orchestrator)
# 3. Use official CombinedToolset to combine all toolsets
# This will raise UserError if there are still collisions (when auto_prefix=False)
combined_toolset = CombinedToolset(all_toolsets)
# 4. Combine system prompts from all toolsets
# Build toolset_id -> toolset mapping for prompt combination
toolset_id_map: dict[str, AbstractToolset[Any]] = {}
for i, toolset in enumerate(toolsets):
toolset_id = toolset.id if hasattr(toolset, "id") and toolset.id else f"toolset_{i}"
toolset_id_map[toolset_id] = toolset
combined_prompt = combine_system_prompts(
toolsets=toolsets, # Use original toolsets (before prefixing) to get original prompts
storages=storages,
prefix_map=prefix_map,
workflow_template=workflow_template,
)
return combined_toolset, combined_prompt
[docs]
def register_toolsets_with_orchestrator(
orchestrator_storage: Any,
toolsets: list[AbstractToolset[Any]],
storages: dict[str, Any] | None = None,
) -> None:
"""Register toolsets with the meta-orchestrator storage.
Args:
orchestrator_storage: MetaOrchestratorStorage instance
toolsets: List of toolsets to register
storages: Optional dictionary mapping toolset IDs to storage instances
"""
for toolset in toolsets:
toolset_id = toolset.id if hasattr(toolset, "id") and toolset.id else "unknown"
toolset_info: dict[str, Any] = {
"type": type(toolset).__name__,
"label": toolset_id,
}
if storages and toolset_id in storages:
toolset_info["storage"] = storages[toolset_id]
orchestrator_storage.register_toolset(toolset_id, toolset_info)
[docs]
def create_workflow_agent(
model: str,
workflow_template: WorkflowTemplate,
toolsets: list[AbstractToolset[Any]],
storages: dict[str, Any] | None = None,
prefix_map: dict[str, str] | None = None,
orchestrator_storage: Any | None = None,
auto_prefix: bool = True,
additional_system_prompt: str | None = None,
output_type: type[BaseModel] | list[type[BaseModel]] | None = None,
) -> Agent[Any]:
"""Create an agent configured with a workflow template and combined toolsets.
This is a convenience function that:
1. Creates a meta-orchestrator toolset (if orchestrator_storage is provided)
2. Registers all toolsets with the orchestrator
3. Combines all toolsets with automatic prefixing
4. Creates an agent with the combined toolset and workflow-aware system prompt
Args:
model: Model string for the agent (e.g., "openai:gpt-4")
workflow_template: Workflow template to use
toolsets: List of toolsets to combine
storages: Optional dictionary mapping toolset IDs to storage instances
prefix_map: Optional dictionary mapping toolset IDs to prefixes.
If not provided, prefixes are inferred from toolset IDs.
orchestrator_storage: Optional MetaOrchestratorStorage instance.
If provided, creates and registers orchestrator toolset.
auto_prefix: If True, automatically prefix all toolsets to prevent collisions
additional_system_prompt: Optional additional system prompt that will be appended
to the combined prompt. Use this to add custom instructions
or context without overriding the workflow-specific prompts.
output_type: Optional Pydantic BaseModel class or list of BaseModel classes
to use as the structured output type for the agent. If provided,
the agent will return structured outputs matching this schema.
Returns:
Configured Agent instance with combined toolsets and workflow template
Example:
```python
from pydantic_ai_toolsets import (
RESEARCH_ASSISTANT,
create_search_toolset,
create_self_ask_toolset,
create_self_refine_toolset,
create_todo_toolset,
SearchStorage,
SelfAskStorage,
SelfRefineStorage,
TodoStorage,
MetaOrchestratorStorage,
)
from pydantic_ai_toolsets.toolsets.meta_orchestrator.helpers import create_workflow_agent
# Create storages
storages = {
"search": SearchStorage(),
"self_ask": SelfAskStorage(),
"self_refine": SelfRefineStorage(),
"todo": TodoStorage(),
}
# Create toolsets
toolsets = [
create_search_toolset(storages["search"], id="search"),
create_self_ask_toolset(storages["self_ask"], id="self_ask"),
create_self_refine_toolset(storages["self_refine"], id="self_refine"),
create_todo_toolset(storages["todo"], id="todo"),
]
# Create orchestrator storage
orchestrator_storage = MetaOrchestratorStorage()
# Create agent with workflow template and additional instructions
agent = create_workflow_agent(
model="openai:gpt-4",
workflow_template=RESEARCH_ASSISTANT,
toolsets=toolsets,
storages=storages,
orchestrator_storage=orchestrator_storage,
additional_system_prompt="Always cite sources and provide URLs when available.",
)
# Use the agent
result = await agent.run("Research quantum computing breakthroughs")
# Example with output type:
from pydantic import BaseModel
class ResearchResult(BaseModel):
summary: str
sources: list[str]
key_findings: list[str]
agent_with_output = create_workflow_agent(
model="openai:gpt-4",
workflow_template=RESEARCH_ASSISTANT,
toolsets=toolsets,
storages=storages,
orchestrator_storage=orchestrator_storage,
output_type=ResearchResult,
)
result = await agent_with_output.run("Research quantum computing breakthroughs")
print(result.output.summary) # Access structured output
```
"""
# Import here to avoid circular imports
from .toolset import create_meta_orchestrator_toolset
# 1. Create orchestrator toolset if storage is provided
orchestrator_toolset: AbstractToolset[Any] | None = None
if orchestrator_storage:
orchestrator_toolset = create_meta_orchestrator_toolset(orchestrator_storage, id="orchestrator")
# Register toolsets with orchestrator
register_toolsets_with_orchestrator(
orchestrator_storage=orchestrator_storage,
toolsets=toolsets,
storages=storages,
)
# 2. Create combined toolset
combined_toolset, combined_prompt = create_combined_toolset(
toolsets=toolsets,
storages=storages,
prefix_map=prefix_map,
orchestrator=orchestrator_toolset,
workflow_template=workflow_template,
auto_prefix=auto_prefix,
)
# 3. Combine system prompts: append additional prompt if provided
if additional_system_prompt:
system_prompt = f"{combined_prompt}\n\n{additional_system_prompt}"
else:
system_prompt = combined_prompt
# 4. Create agent with combined toolset and prompt
agent_kwargs: dict[str, Any] = {
"system_prompt": system_prompt,
"toolsets": [combined_toolset],
}
if output_type is not None:
agent_kwargs["output_type"] = output_type
agent = Agent(model, **agent_kwargs)
return agent