Source code for pydantic_ai_toolsets.toolsets.self_refine.storage

"""Storage abstraction for self-refinement."""

from __future__ import annotations

import sys
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable

from .types import Feedback, RefinementOutput

if TYPE_CHECKING:
    from .._shared.metrics import UsageMetrics


[docs] @runtime_checkable class SelfRefineStorageProtocol(Protocol): """Protocol for self-refinement storage implementations. Any class that has `outputs` and `feedbacks` properties can be used as storage for the self-refinement toolset. Example: ```python class MyCustomStorage: def __init__(self): self._outputs: dict[str, RefinementOutput] = {} self._feedbacks: dict[str, Feedback] = {} @property def outputs(self) -> dict[str, RefinementOutput]: return self._outputs @outputs.setter def outputs(self, value: RefinementOutput) -> None: self._outputs[value.output_id] = value @property def feedbacks(self) -> dict[str, Feedback]: return self._feedbacks @feedbacks.setter def feedbacks(self, value: Feedback) -> None: self._feedbacks[value.feedback_id] = value ``` """ @property def outputs(self) -> dict[str, RefinementOutput]: """Get the current dictionary of outputs (output_id -> RefinementOutput).""" ... @outputs.setter def outputs(self, value: RefinementOutput) -> None: """Add or update an output in the dictionary.""" ... @property def feedbacks(self) -> dict[str, Feedback]: """Get the current dictionary of feedbacks (feedback_id -> Feedback).""" ... @feedbacks.setter def feedbacks(self, value: Feedback) -> None: """Add or update a feedback in the dictionary.""" ...
[docs] def summary(self) -> dict[str, Any]: """Get comprehensive JSON summary of storage state and metrics. Returns: Dictionary containing storage state, statistics, and usage metrics. """ ...
[docs] def add_linked_from(self, link_id: str) -> None: """Add an incoming link. Args: link_id: ID of the link """ ...
[docs] @dataclass class SelfRefineStorage: """Default in-memory self-refinement storage. Simple implementation that stores outputs and feedbacks in memory. Use this for standalone agents or testing. Example: ```python from pydantic_ai_toolsets import create_self_refine_toolset, SelfRefineStorage storage = SelfRefineStorage() toolset = create_self_refine_toolset(storage=storage) # After agent runs, access outputs and feedbacks directly print(storage.outputs) print(storage.feedbacks) # With metrics tracking storage = SelfRefineStorage(track_usage=True) toolset = create_self_refine_toolset(storage=storage) print(storage.metrics.total_tokens()) ``` """ _outputs: dict[str, RefinementOutput] = field(default_factory=dict) _feedbacks: dict[str, Feedback] = field(default_factory=dict) _metrics: UsageMetrics | None = field(default=None) _links: dict[str, list[str]] = field(default_factory=dict) # item_id -> list of link IDs _linked_from: list[str] = field(default_factory=list) # list of link IDs where this storage is target
[docs] def __init__(self, *, track_usage: bool = False) -> None: """Initialize storage with optional metrics tracking. Args: track_usage: If True, enables usage metrics collection. """ self._outputs = {} self._feedbacks = {} self._metrics = None self._links = {} self._linked_from = [] if track_usage: import os toolsets_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if toolsets_dir not in sys.path: sys.path.insert(0, toolsets_dir) from .._shared.metrics import UsageMetrics self._metrics = UsageMetrics()
@property def outputs(self) -> dict[str, RefinementOutput]: """Get the current dictionary of outputs.""" return self._outputs @outputs.setter def outputs(self, value: RefinementOutput) -> None: """Add or update an output in the dictionary.""" self._outputs[value.output_id] = value @property def feedbacks(self) -> dict[str, Feedback]: """Get the current dictionary of feedbacks.""" return self._feedbacks @feedbacks.setter def feedbacks(self, value: Feedback) -> None: """Add or update a feedback in the dictionary.""" self._feedbacks[value.feedback_id] = value @property def metrics(self) -> UsageMetrics | None: """Get usage metrics if tracking is enabled.""" return self._metrics
[docs] def get_statistics(self) -> dict[str, int | float]: """Get summary statistics about self-refinement operations. Returns: Dictionary with output and feedback counts. """ total_outputs = len(self._outputs) final_outputs = sum(1 for o in self._outputs.values() if o.is_final) max_iteration = max((o.iteration for o in self._outputs.values()), default=0) avg_quality = None quality_scores = [o.quality_score for o in self._outputs.values() if o.quality_score is not None] if quality_scores: avg_quality = sum(quality_scores) / len(quality_scores) stats: dict[str, int | float] = { "total_outputs": total_outputs, "final_outputs": final_outputs, "max_iteration": max_iteration, "total_feedbacks": len(self._feedbacks), } if avg_quality is not None: stats["avg_quality_score"] = avg_quality return stats
[docs] def summary(self) -> dict[str, Any]: """Get comprehensive JSON summary of storage state and metrics. Returns: Dictionary containing storage state, statistics, and usage metrics. """ summary_dict: dict[str, Any] = { "toolset": "self_refine", "statistics": self.get_statistics(), } # Add storage-specific data summary_dict["storage"] = { "outputs": { output_id: { "output_id": output.output_id, "content": output.content, "iteration": output.iteration, "is_final": output.is_final, "quality_score": output.quality_score, } for output_id, output in self._outputs.items() }, "feedbacks": { feedback_id: { "feedback_id": feedback.feedback_id, "output_id": feedback.output_id, "content": feedback.content, "aspects": feedback.aspects, } for feedback_id, feedback in self._feedbacks.items() }, } # Add metrics if available if self._metrics: summary_dict["usage_metrics"] = self._metrics.to_dict() return summary_dict
[docs] def clear(self) -> None: """Clear all outputs, feedbacks, and reset metrics.""" self._outputs.clear() self._feedbacks.clear() self._links.clear() self._linked_from.clear() if self._metrics: self._metrics.clear()
@property def links(self) -> dict[str, list[str]]: """Get outgoing links dictionary (item_id -> list of link IDs).""" return self._links @property def linked_from(self) -> list[str]: """Get incoming links list (link IDs where this storage is target).""" return self._linked_from
[docs] def add_linked_from(self, link_id: str) -> None: """Add an incoming link. Args: link_id: ID of the link """ if link_id not in self._linked_from: self._linked_from.append(link_id)
[docs] def get_state_summary(self) -> str: """Get a human-readable summary of the storage state. Returns: Formatted string summary of outputs and feedbacks. """ stats = self.get_statistics() lines: list[str] = [] lines.append(f"Self-Refine: {stats['total_outputs']} outputs, {stats['total_feedbacks']} feedbacks") if stats["final_outputs"] > 0: lines.append(f" - {stats['final_outputs']} final outputs") if stats["max_iteration"] > 0: lines.append(f" - Max iteration: {stats['max_iteration']}") if self._outputs: latest_output = list(self._outputs.values())[-1] lines.append(f" Latest output: {latest_output.content}") return "\n".join(lines)
[docs] def get_outputs_for_linking(self) -> list[dict[str, str]]: """Get list of linkable items with their IDs and descriptions. Returns: List of dictionaries with 'id' and 'description' keys for outputs and feedbacks. """ linkable_items: list[dict[str, str]] = [] # Add outputs for output_id, output in self._outputs.items(): description = f"Output {output_id} (iteration {output.iteration}): {output.content}" if output.is_final: description += " [FINAL]" linkable_items.append({"id": output_id, "description": description}) # Add feedbacks for feedback_id, feedback in self._feedbacks.items(): description = f"Feedback {feedback_id} for output {feedback.output_id}: {feedback.description} - {feedback.suggestion}" linkable_items.append({"id": feedback_id, "description": description}) return linkable_items