Source code for pydantic_ai_toolsets.toolsets.self_ask.toolset
"""Self-ask toolset for pydantic-ai agents."""
from __future__ import annotations
import sys
import time
import uuid
from typing import Any
from pydantic_ai import Agent
from pydantic_ai.toolsets import FunctionToolset
from .storage import SelfAskStorage, SelfAskStorageProtocol
from .types import (
Answer,
AnswerQuestionItem,
AskMainQuestionItem,
AskSubQuestionItem,
ComposeFinalAnswerItem,
FinalAnswer,
Question,
QuestionStatus,
MAX_DEPTH,
)
# =============================================================================
# SYSTEM PROMPT - Contains "when and why" to use the toolset
# =============================================================================
SELF_ASK_SYSTEM_PROMPT = f"""
## Self-Ask
You have access to tools for decomposing complex questions into simpler sub-questions:
- `read_self_ask_state`: Read current self-ask state
- `ask_main_question`: Initialize main question (depth 0)
- `ask_sub_question`: Generate sub-question from parent question
- `answer_question`: Answer a question/sub-question
- `compose_final_answer`: Compose final answer from sub-question answers
- `get_final_answer`: Retrieve the final composed answer
### When to Use Self-Ask
Use these tools in these scenarios:
1. Complex questions requiring multi-hop reasoning
2. Questions that need to be broken down into simpler parts
3. Problems where intermediate answers build toward a final answer
4. Questions requiring information gathering from multiple sources
5. Situations where explicit decomposition makes reasoning transparent
### Self-Ask Process
1. **Main Question**: Initialize the main question (depth 0)
2. **Decompose**: Generate sub-questions that help answer the main question
- Sub-questions are at depth 1
- Sub-sub-questions are at depth 2
- Sub-sub-sub-questions are at depth 3 (maximum)
- **CRITICAL**: Maximum depth is {MAX_DEPTH} (main = 0, sub = 1-3)
3. **Answer**: Answer each sub-question sequentially or in parallel
- Use answers from earlier sub-questions to answer later ones
- Mark if a sub-question needs further decomposition
4. **Compose**: Synthesize answers from sub-questions into final answer
5. **Retrieve**: Get the final composed answer
### Depth Constraint
**IMPORTANT**: Maximum depth of {MAX_DEPTH} levels:
- Main question: depth 0
- Sub-questions: depth 1
- Sub-sub-questions: depth 2
- Sub-sub-sub-questions: depth 3 (maximum)
When the depth limit is reached:
- Answer remaining questions at maximum depth
- Use those answers to compose the final answer
- Do not attempt to create questions beyond depth {MAX_DEPTH}
### Question Types
- **Sequential Dependency**: Later questions depend on earlier answers
- Example: "Where were the 2016 Olympics?" → "What was the population there?"
- **Parallel Questions**: Independent questions that can be answered in any order
- Example: "Japan's GDP growth?" and "Germany's GDP growth?" (both independent)
- **Recursive Decomposition**: Sub-questions spawn sub-sub-questions
- Example: "How did WWI affect art?" → "What art movements emerged?" → "What was Dadaism?"
### Key Principles
- **Explicit Decomposition**: Make reasoning transparent by showing sub-questions
- **Sequential Dependency**: Build answers from earlier sub-question answers
- **Compositional Reasoning**: Combine simple facts into complex insights
- **Depth Management**: Respect the depth limit and compose final answer when limit reached
- **Answer Tracking**: Track which answers contribute to the final composition
### Workflow
1. Call `read_self_ask_state` to see current state
2. Initialize main question using `ask_main_question` (if none exists)
3. Generate sub-questions using `ask_sub_question` (respecting depth limit)
4. Answer questions using `answer_question`
5. Continue decomposition and answering until all necessary information is gathered
6. Compose final answer using `compose_final_answer`
7. Use `get_final_answer` to retrieve the final result
**IMPORTANT**: Always call `read_self_ask_state` before asking questions, answering, or composing.
"""
# =============================================================================
# TOOL DESCRIPTIONS - Contains "how" to use each specific tool
# =============================================================================
READ_SELF_ASK_STATE_DESCRIPTION = """
Read the current self-ask state.
**CRITICAL**: Call this BEFORE every ask_main_question, ask_sub_question, answer_question, or compose_final_answer call to:
- Review the current question tree structure
- See which questions exist and their depth levels
- Understand which questions have been answered
- Track depth levels to ensure you don't exceed the maximum depth
- Know which answers are available for composition
- Make informed decisions about next steps
Returns:
- All questions organized by depth with their status
- All answers with their corresponding question IDs
- Final answers if any have been composed
- Question tree structure showing parent-child relationships
- Summary statistics (total questions, max depth reached, answered questions)
- Warning if depth limit has been reached
"""
ASK_MAIN_QUESTION_DESCRIPTION = """
Initialize the main question (depth 0).
Use this tool to start the self-ask process with the main question that needs to be answered.
This question will be at depth 0 and serves as the root of the question tree.
**CRITICAL**: Call read_self_ask_state first to see existing questions.
When asking the main question:
- This should be the original complex question you need to answer
- It will be at depth 0 (the root)
- Sub-questions will be generated from this main question
- Only one main question should exist per self-ask session
"""
ASK_SUB_QUESTION_DESCRIPTION = f"""
Generate a sub-question from a parent question.
Use this tool to decompose a complex question into simpler sub-questions.
Each sub-question helps answer its parent question.
**CRITICAL**: Call read_self_ask_state first to see existing questions and their depths.
**DEPTH CONSTRAINT**: Maximum depth is {MAX_DEPTH}:
- Main question: depth 0
- Sub-questions: depth 1
- Sub-sub-questions: depth 2
- Sub-sub-sub-questions: depth 3 (maximum)
When asking sub-questions:
- Check the parent question's depth first
- If parent depth >= {MAX_DEPTH}, you cannot create more sub-questions
- New sub-question depth = parent depth + 1
- Provide reasoning for why this sub-question is needed
- Sub-questions should be simpler and more focused than their parent
- Consider whether sub-questions can be answered in parallel or sequentially
"""
ANSWER_QUESTION_DESCRIPTION = """
Answer a question or sub-question.
Use this tool to provide an answer to a question. The answer can then be used
to answer parent questions or compose the final answer.
**CRITICAL**: Call read_self_ask_state first to see which questions need answers.
When answering:
- Provide a complete answer that directly addresses the question
- Use answers from sub-questions if available
- Optionally provide a confidence score (0-100)
- Indicate if the answer requires further sub-questions (requires_followup)
- If requires_followup is true, you may need to ask sub-sub-questions (respecting depth limit)
"""
COMPOSE_FINAL_ANSWER_DESCRIPTION = """
Compose the final answer from sub-question answers.
Use this tool to synthesize answers from sub-questions into a complete answer
to the main question.
**CRITICAL**: Call read_self_ask_state first to see all available answers.
When composing:
- Reference the main question ID
- List all answer IDs that contributed to the final answer
- Synthesize the information from sub-question answers
- Create a coherent, complete answer to the main question
- The final answer should address the original main question comprehensively
"""
GET_FINAL_ANSWER_DESCRIPTION = """
Retrieve the final composed answer.
Use this tool to get the final answer that was composed from sub-question answers.
This is the complete answer to the main question.
**CRITICAL**: Call read_self_ask_state first to see if a final answer exists.
Returns:
- Final answer content
- Which answers were used in composition
- Main question that was answered
- Completion status
"""
[docs]
def create_self_ask_toolset(
storage: SelfAskStorageProtocol | None = None,
*,
id: str | None = None,
track_usage: bool = False,
) -> FunctionToolset[Any]:
"""Create a self-ask toolset for question decomposition.
This toolset provides tools for AI agents to decompose complex questions into
simpler sub-questions, answer them sequentially, and compose final answers.
Args:
storage: Optional storage backend. Defaults to in-memory SelfAskStorage.
You can provide a custom storage implementing SelfAskStorageProtocol
for persistence or integration with other systems.
id: Optional unique ID for the toolset.
track_usage: If True, enables usage metrics collection in storage.
Returns:
FunctionToolset compatible with any pydantic-ai agent.
Example (standalone):
```python
from pydantic_ai import Agent
from pydantic_ai_toolsets import create_self_ask_toolset
agent = Agent("openai:gpt-4.1", toolsets=[create_self_ask_toolset()])
result = await agent.run("What was the population of the city where the 2016 Summer Olympics were held?")
```
Example (with custom storage):
```python
from pydantic_ai_toolsets import create_self_ask_toolset, SelfAskStorage
storage = SelfAskStorage()
toolset = create_self_ask_toolset(storage=storage)
# After agent runs, access questions, answers, and final answers directly
print(storage.questions)
print(storage.answers)
print(storage.final_answers)
```
"""
if storage is not None:
_storage = storage
else:
_storage = SelfAskStorage(track_usage=track_usage)
toolset: FunctionToolset[Any] = FunctionToolset(id=id)
_metrics = getattr(_storage, "metrics", None) if hasattr(_storage, "metrics") else None
def _get_status_summary() -> str:
"""Get one-line status summary."""
if not _storage.questions:
return "Status: ○ Empty"
main_questions = [q for q in _storage.questions.values() if q.is_main]
if not main_questions:
return "Status: ● No main question"
main_q = main_questions[0]
answered_count = sum(
1 for q in _storage.questions.values() if q.status == QuestionStatus.ANSWERED
)
total_questions = len(_storage.questions)
max_depth = max((q.depth for q in _storage.questions.values()), default=0)
final_answers = len(_storage.final_answers)
if final_answers > 0:
return f"Status: ✓ Complete | {total_questions} questions, depth {max_depth}, {answered_count} answered"
depth_warning = f" (max depth {MAX_DEPTH} reached)" if max_depth >= MAX_DEPTH else ""
return f"Status: ● Active | {total_questions} questions, depth {max_depth}, {answered_count} answered{depth_warning}"
def _get_next_hint() -> str:
"""Get contextual hint for next action."""
if not _storage.questions:
return "Use ask_main_question to initialize the main question."
main_questions = [q for q in _storage.questions.values() if q.is_main]
if not main_questions:
return "Use ask_main_question to initialize the main question."
if _storage.final_answers:
return "Self-ask complete. Use get_final_answer to retrieve the final result."
# Find unanswered questions
unanswered = [
q
for q in _storage.questions.values()
if q.status == QuestionStatus.PENDING
]
if unanswered:
# Check if we can ask sub-questions
can_ask_sub = [
q for q in unanswered if q.depth < MAX_DEPTH
]
if can_ask_sub:
return f"Use ask_sub_question or answer_question on [{can_ask_sub[0].question_id}] (depth {can_ask_sub[0].depth})."
else:
return f"Use answer_question on [{unanswered[0].question_id}] (depth {unanswered[0].depth}, max depth reached)."
# All questions answered, need to compose
answered_questions = [
q for q in _storage.questions.values() if q.status == QuestionStatus.ANSWERED
]
if answered_questions:
main_q = main_questions[0]
return f"All questions answered. Use compose_final_answer for main question [{main_q.question_id}]."
return "Continue asking sub-questions or answering questions."
@toolset.tool(description=READ_SELF_ASK_STATE_DESCRIPTION)
async def read_self_ask_state() -> str:
"""Read the current self-ask state."""
start_time = time.perf_counter()
if not _storage.questions:
result = f"{_get_status_summary()}\n\nNo questions in self-ask state.\n\nNext: {_get_next_hint()}"
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("read_self_ask_state", "", result, duration_ms)
return result
lines: list[str] = [_get_status_summary(), "", "Self-Ask State:"]
lines.append("")
# Display questions by depth
questions_by_depth: dict[int, list[Question]] = {}
for question in _storage.questions.values():
if question.depth not in questions_by_depth:
questions_by_depth[question.depth] = []
questions_by_depth[question.depth].append(question)
lines.append("Questions by Depth:")
for depth in sorted(questions_by_depth.keys()):
questions = questions_by_depth[depth]
depth_label = "Main" if depth == 0 else f"Depth {depth}"
depth_warning = f" ⚠️ MAX DEPTH" if depth >= MAX_DEPTH else ""
lines.append(f" {depth_label} (depth {depth}):{depth_warning}")
for question in questions:
status_icon = "✓" if question.status == QuestionStatus.ANSWERED else "○"
main_str = " [MAIN]" if question.is_main else ""
lines.append(f" {status_icon} [{question.question_id}]{main_str} - {question.question_text}")
if question.parent_question_id:
parent = _storage.questions.get(question.parent_question_id)
parent_ref = (
f"[{question.parent_question_id}]"
if parent
else f"[{question.parent_question_id}] (missing)"
)
lines.append(f" Parent: {parent_ref}")
lines.append("")
# Display answers
if _storage.answers:
lines.append("Answers:")
for answer in _storage.answers.values():
question = _storage.questions.get(answer.question_id)
question_ref = (
f"[{answer.question_id}]"
if question
else f"[{answer.question_id}] (missing)"
)
confidence_str = (
f" (confidence: {answer.confidence_score:.1f})"
if answer.confidence_score is not None
else ""
)
followup_str = " [needs followup]" if answer.requires_followup else ""
lines.append(f" Answer [{answer.answer_id}] for question {question_ref}{confidence_str}{followup_str}:")
lines.append(f" {answer.answer_text}")
lines.append("")
# Display final answers
if _storage.final_answers:
lines.append("Final Answers:")
for final_answer in _storage.final_answers.values():
main_q = _storage.questions.get(final_answer.main_question_id)
main_ref = (
f"[{final_answer.main_question_id}]"
if main_q
else f"[{final_answer.main_question_id}] (missing)"
)
complete_str = " ✓ COMPLETE" if final_answer.is_complete else ""
lines.append(f" Final Answer [{final_answer.final_answer_id}] for {main_ref}{complete_str}:")
lines.append(f" {final_answer.final_answer_text}")
if final_answer.composed_from_answers:
answer_refs = ", ".join([f"[{aid}]" for aid in final_answer.composed_from_answers])
lines.append(f" Composed from: {answer_refs}")
lines.append("")
# Display question tree
main_questions = [q for q in _storage.questions.values() if q.is_main]
if main_questions:
lines.append("Question Tree:")
for main_q in main_questions:
lines.append(f" Main: [{main_q.question_id}] - {main_q.question_text}")
# Build tree recursively
def _print_tree(question_id: str, prefix: str = " ", depth: int = 0) -> None:
if depth > MAX_DEPTH:
return
children = [
q
for q in _storage.questions.values()
if q.parent_question_id == question_id
]
for i, child in enumerate(children):
is_last = i == len(children) - 1
connector = "└── " if is_last else "├── "
status_icon = "✓" if child.status == QuestionStatus.ANSWERED else "○"
lines.append(f"{prefix}{connector}{status_icon} [{child.question_id}] (depth {child.depth}) - {child.question_text}")
if not is_last:
_print_tree(child.question_id, prefix + "│ ", depth + 1)
else:
_print_tree(child.question_id, prefix + " ", depth + 1)
_print_tree(main_q.question_id)
lines.append("")
# Summary statistics
stats = _storage.get_statistics()
lines.append("Summary:")
lines.append(f" Total questions: {stats['total_questions']}")
lines.append(f" Main questions: {stats['main_questions']}")
lines.append(f" Answered questions: {stats['answered_questions']}")
lines.append(f" Maximum depth reached: {stats['max_depth_reached']}")
if stats['max_depth_reached'] >= MAX_DEPTH:
lines.append(f" ⚠️ WARNING: Maximum depth {MAX_DEPTH} reached. Cannot create deeper sub-questions.")
lines.append(f" Total answers: {stats['total_answers']}")
lines.append(f" Total final answers: {stats['total_final_answers']}")
if 'avg_confidence_score' in stats:
lines.append(f" Average confidence: {stats['avg_confidence_score']:.1f}")
lines.append("")
lines.append(f"Next: {_get_next_hint()}")
result = "\n".join(lines)
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("read_self_ask_state", "", result, duration_ms)
return result
@toolset.tool(description=ASK_MAIN_QUESTION_DESCRIPTION)
async def ask_main_question(item: AskMainQuestionItem) -> str:
"""Initialize the main question (depth 0)."""
start_time = time.perf_counter()
input_text = item.model_dump_json() if _metrics else ""
# Check if main question already exists
existing_main = [q for q in _storage.questions.values() if q.is_main]
if existing_main:
main_q = existing_main[0]
return (
f"Main question already exists: [{main_q.question_id}] - {main_q.question_text}. "
"Use ask_sub_question to create sub-questions."
)
question_id = str(uuid.uuid4())
new_question = Question(
question_id=question_id,
question_text=item.question_text,
is_main=True,
parent_question_id=None,
depth=0,
status=QuestionStatus.PENDING,
)
_storage.questions = new_question
result = f"Created main question [{question_id}] at depth 0: {item.question_text}"
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("ask_main_question", input_text, result, duration_ms)
return result
@toolset.tool(description=ASK_SUB_QUESTION_DESCRIPTION)
async def ask_sub_question(item: AskSubQuestionItem) -> str:
"""Generate a sub-question from a parent question."""
start_time = time.perf_counter()
input_text = item.model_dump_json() if _metrics else ""
if item.parent_question_id not in _storage.questions:
available = ", ".join([q.question_id for q in _storage.questions.values()])
return (
f"Error: Parent question '{item.parent_question_id}' not found. "
f"Available: [{available}]. Call read_self_ask_state."
)
parent_question = _storage.questions[item.parent_question_id]
# CRITICAL: Check depth limit
if parent_question.depth >= MAX_DEPTH:
return (
f"Error: Cannot create sub-question. Parent question [{item.parent_question_id}] "
f"is at depth {parent_question.depth}, which is the maximum depth ({MAX_DEPTH}). "
f"Answer the parent question and compose the final answer instead."
)
new_depth = parent_question.depth + 1
question_id = str(uuid.uuid4())
new_question = Question(
question_id=question_id,
question_text=item.sub_question_text,
is_main=False,
parent_question_id=item.parent_question_id,
depth=new_depth,
status=QuestionStatus.PENDING,
)
_storage.questions = new_question
depth_warning = f" (max depth {MAX_DEPTH} reached)" if new_depth >= MAX_DEPTH else ""
result = (
f"Created sub-question [{question_id}] at depth {new_depth} "
f"from parent [{item.parent_question_id}]{depth_warning}: {item.sub_question_text}"
)
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("ask_sub_question", input_text, result, duration_ms)
return result
@toolset.tool(description=ANSWER_QUESTION_DESCRIPTION)
async def answer_question(item: AnswerQuestionItem) -> str:
"""Answer a question or sub-question."""
start_time = time.perf_counter()
input_text = item.model_dump_json() if _metrics else ""
if item.question_id not in _storage.questions:
available = ", ".join([q.question_id for q in _storage.questions.values()])
return (
f"Error: Question '{item.question_id}' not found. "
f"Available: [{available}]. Call read_self_ask_state."
)
question = _storage.questions[item.question_id]
answer_id = str(uuid.uuid4())
new_answer = Answer(
answer_id=answer_id,
question_id=item.question_id,
answer_text=item.answer_text,
confidence_score=item.confidence_score,
requires_followup=item.requires_followup,
)
_storage.answers = new_answer
# Update question status
question.status = QuestionStatus.ANSWERED
_storage.questions = question
confidence_str = (
f" (confidence: {item.confidence_score:.1f})"
if item.confidence_score is not None
else ""
)
followup_str = " [needs followup]" if item.requires_followup else ""
result = (
f"Answered question [{item.question_id}] "
f"with answer [{answer_id}]{confidence_str}{followup_str}"
)
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("answer_question", input_text, result, duration_ms)
return result
@toolset.tool(description=COMPOSE_FINAL_ANSWER_DESCRIPTION)
async def compose_final_answer(item: ComposeFinalAnswerItem) -> str:
"""Compose the final answer from sub-question answers."""
start_time = time.perf_counter()
input_text = item.model_dump_json() if _metrics else ""
if item.main_question_id not in _storage.questions:
available = ", ".join([q.question_id for q in _storage.questions.values()])
return (
f"Error: Main question '{item.main_question_id}' not found. "
f"Available: [{available}]. Call read_self_ask_state."
)
main_question = _storage.questions[item.main_question_id]
if not main_question.is_main:
return (
f"Error: Question [{item.main_question_id}] is not a main question. "
"Use the main question ID to compose the final answer."
)
# Verify answer IDs exist
missing_answers = [
aid for aid in item.answer_ids_used if aid not in _storage.answers
]
if missing_answers:
return (
f"Error: Some answer IDs not found: {missing_answers}. "
"Call read_self_ask_state to see available answers."
)
final_answer_id = str(uuid.uuid4())
new_final_answer = FinalAnswer(
final_answer_id=final_answer_id,
main_question_id=item.main_question_id,
final_answer_text=item.final_answer_text,
composed_from_answers=item.answer_ids_used,
is_complete=True,
)
_storage.final_answers = new_final_answer
# Update main question status
main_question.status = QuestionStatus.COMPOSED
_storage.questions = main_question
result = (
f"Composed final answer [{final_answer_id}] for main question "
f"[{item.main_question_id}] using {len(item.answer_ids_used)} answer(s)"
)
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("compose_final_answer", input_text, result, duration_ms)
return result
@toolset.tool(description=GET_FINAL_ANSWER_DESCRIPTION)
async def get_final_answer() -> str:
"""Retrieve the final composed answer."""
start_time = time.perf_counter()
if not _storage.final_answers:
return "No final answer found. Use compose_final_answer to create one."
# Get the most recent final answer (or first one)
final_answer = list(_storage.final_answers.values())[0]
main_question = _storage.questions.get(final_answer.main_question_id)
main_text = (
main_question.question_text
if main_question
else f"[{final_answer.main_question_id}] (missing)"
)
lines: list[str] = [f"Final Answer: [{final_answer.final_answer_id}]"]
lines.append(f"Main Question: {main_text}")
if final_answer.is_complete:
lines.append("Status: ✓ COMPLETE")
lines.append("")
lines.append("Final Answer:")
lines.append(final_answer.final_answer_text)
lines.append("")
# Show which answers were used
if final_answer.composed_from_answers:
lines.append("Composed from answers:")
for answer_id in final_answer.composed_from_answers:
answer = _storage.answers.get(answer_id)
question = (
_storage.questions.get(answer.question_id)
if answer
else None
)
if answer and question:
lines.append(f" [{answer_id}] - Answer to: {question.question_text}")
lines.append(f" {answer.answer_text}")
else:
lines.append(f" [{answer_id}] - (missing)")
lines.append("")
result = "\n".join(lines)
if _metrics is not None:
duration_ms = (time.perf_counter() - start_time) * 1000
_metrics.record_invocation("get_final_answer", "", result, duration_ms)
return result
return toolset
[docs]
def get_self_ask_system_prompt() -> str:
"""Get the system prompt for self-ask reasoning.
Returns:
System prompt string that can be used with pydantic-ai agents.
"""
return SELF_ASK_SYSTEM_PROMPT
def create_self_ask_toolset_agent(model: str = "openrouter:x-ai/grok-4.1-fast") -> Agent:
"""Create a Pydantic-ai agent with the self-ask toolset.
Args:
model: The model to use for the agent.
Returns:
Pydantic-ai agent with the self-ask toolset.
"""
storage = SelfAskStorage()
toolset = create_self_ask_toolset(storage=storage)
agent = Agent(
model,
system_prompt="""
You are a self-ask agent. You have access to tools for decomposing complex questions:
- `read_self_ask_state`: Read the current self-ask state
- `ask_main_question`: Initialize main question (depth 0)
- `ask_sub_question`: Generate sub-question from parent question
- `answer_question`: Answer a question/sub-question
- `compose_final_answer`: Compose final answer from sub-question answers
- `get_final_answer`: Retrieve the final composed answer
**IMPORTANT**: Use these tools to decompose complex questions into simpler sub-questions,
answer them sequentially, and compose final answers. Respect the maximum depth limit of 3.
""",
toolsets=[toolset]
)
@agent.instructions
async def add_prompt() -> str:
"""Add the self-ask system prompt."""
return get_self_ask_system_prompt()
return agent