Source code for pydantic_ai_toolsets.toolsets.search.toolset

"""Search toolset for pydantic-ai agents."""

from __future__ import annotations

import os
import sys
import time
import uuid
from datetime import datetime
from typing import Any

from dotenv import load_dotenv
from firecrawl import Firecrawl
from pydantic_ai import Agent
from pydantic_ai.toolsets import FunctionToolset
import trafilatura

from .storage import SearchStorage, SearchStorageProtocol
from .types import (
    ExtractWebContentItem,
    ExtractedContent,
    OutputFormat,
    SearchImagesItem,
    SearchNewsItem,
    SearchResult,
    SearchSource,
    SearchWebItem,
    TimeFilter,
)

load_dotenv()

# =============================================================================
# SYSTEM PROMPT - Contains "when and why" to use the toolset
# =============================================================================

SEARCH_SYSTEM_PROMPT = """
## Web Search, News Search, and Image Search

You have access to tools for searching the web, news, and images, and extracting content:
- `search_web`: Search the web for information using Firecrawl
- `search_news`: Search for news articles using Firecrawl (supports time filtering)
- `search_images`: Search for images using Firecrawl (supports resolution filtering)
- `extract_web_content`: Extract main content from webpages using Trafilatura (works with web and news results only)

### When to Use Each Search Type

**Web Search (`search_web`):**
1. Finding current information on the web
2. Researching topics that require up-to-date data
3. Gathering information from multiple sources
4. Verifying facts or finding authoritative sources

**News Search (`search_news`):**
1. Finding recent news articles and developments
2. Searching for time-specific news (use time_filter parameter)
3. Getting news from specific date ranges
4. When you need news-focused results rather than general web results

**Image Search (`search_images`):**
1. Finding images related to a topic
2. Searching for high-resolution images (use resolution parameters)
3. Finding images of specific sizes
4. When visual content is needed

### Workflow

1. **Search**: Choose the appropriate search tool based on your needs
   - `search_web`: General web search
   - `search_news`: News articles (use time_filter for recent news: PAST_HOUR, PAST_DAY, PAST_WEEK, PAST_MONTH, PAST_YEAR, or CUSTOM with dates)
   - `search_images`: Images (use exact_width/exact_height for exact size, or min_width/min_height for minimum size)
   - Provide specific, keyword-rich queries for better results
   - Specify number of results needed (default: 5, max: 50)
   - Results include titles, URLs, and descriptions (images also include image dimensions)

2. **Extract**: Use `extract_web_content` to get content from specific URLs
   - **Only works with web and news search results** (not image results)
   - Choose URLs from search results that are most relevant
   - Choose output format: 'txt' for plain text or 'markdown' for markdown
   - Previously extracted content is stored and can be accessed without re-extraction

### Key Principles

- **Specific Queries**: Use specific, keyword-rich queries for better results
- **Time Filtering**: Use `search_news` with time_filter when you need recent news (e.g., "news from past week")
- **Resolution Filtering**: Use `search_images` with exact_width/exact_height for exact sizes (e.g., "1920x1080 images") or min_width/min_height for minimum sizes (e.g., "high-resolution images at least 2560x1440")
- **Relevant URLs**: Extract content from URLs that are most relevant to your task
- **Format Choice**: Use markdown format if you need structured content, txt for simple text
- **Efficiency**: Previously extracted content is stored and can be accessed without re-extraction
- **Content Extraction**: Only web and news results support content extraction; image results cannot be extracted
"""

# =============================================================================
# TOOL DESCRIPTIONS - Contains "how" to use each specific tool
# =============================================================================

SEARCH_WEB_DESCRIPTION = """Search the web for information using Firecrawl.

Parameters:
- query: Specific, keyword-rich search query
- limit: Maximum results (default: 10, max: 50)

Returns list of search results with titles, URLs, and descriptions.
Results are stored for future reference.
"""

EXTRACT_WEB_CONTENT_DESCRIPTION = """Extract main content from a webpage using Trafilatura.

Parameters:
- url: Valid HTTP or HTTPS URL from web or news search results (image results not supported)
- output_format: 'txt' for plain text, 'markdown' for markdown (default: 'txt')

Returns extracted content as a string.
Content is stored to avoid re-extraction.
Note: Only works with URLs from web or news search results, not image search results.
"""

SEARCH_NEWS_DESCRIPTION = """Search for news articles using Firecrawl. Supports time-based filtering via time_filter parameter (past hour/day/week/month/year or custom date range).

Parameters:
- query: Specific, keyword-rich search query for news articles
- limit: Maximum results (default: 5, max: 50)
- time_filter: Optional time filter (PAST_HOUR, PAST_DAY, PAST_WEEK, PAST_MONTH, PAST_YEAR, or CUSTOM)
- custom_date_min: Minimum date for custom range (format: MM/DD/YYYY, required if time_filter is CUSTOM)
- custom_date_max: Maximum date for custom range (format: MM/DD/YYYY, required if time_filter is CUSTOM)

Returns list of news search results with titles, URLs, descriptions, and dates.
Results are stored for future reference.
"""

SEARCH_IMAGES_DESCRIPTION = """Search for images using Firecrawl. Supports resolution filtering via exact_width/exact_height (for exact size) or min_width/min_height (for minimum size).

Parameters:
- query: Specific, keyword-rich search query for images
- limit: Maximum results (default: 5, max: 50)
- exact_width: Optional exact image width in pixels (use with exact_height for exact size matching)
- exact_height: Optional exact image height in pixels (use with exact_width for exact size matching)
- min_width: Optional minimum image width in pixels (use with min_height for minimum size filtering)
- min_height: Optional minimum image height in pixels (use with min_width for minimum size filtering)

Returns list of image search results with titles, URLs, image URLs, and dimensions.
Results are stored for future reference.
Note: Content extraction is not supported for image results.
"""

# Legacy constant for backward compatibility
SEARCH_TOOL_DESCRIPTION = SEARCH_WEB_DESCRIPTION


[docs] def create_search_toolset( storage: SearchStorageProtocol | None = None, *, id: str | None = None, track_usage: bool = False, ) -> FunctionToolset[Any]: """Create a search toolset for web search and content extraction. This toolset provides tools for AI agents to search the web and extract content from webpages, with support for tracking search history and extracted content. Args: storage: Optional storage backend. Defaults to in-memory SearchStorage. id: Optional unique ID for the toolset. track_usage: If True, enables usage metrics collection. Returns: FunctionToolset compatible with any pydantic-ai agent. Example: ```python from pydantic_ai import Agent from pydantic_ai_toolsets import create_search_toolset, SearchStorage # With storage and metrics storage = SearchStorage(track_usage=True) agent = Agent("openai:gpt-4.1", toolsets=[create_search_toolset(storage)]) print(storage.metrics.total_tokens()) ``` """ if storage is not None: _storage = storage else: _storage = SearchStorage(track_usage=track_usage) toolset: FunctionToolset[Any] = FunctionToolset(id=id) _metrics = getattr(_storage, "metrics", None) if hasattr(_storage, "metrics") else None @toolset.tool(description=SEARCH_WEB_DESCRIPTION) async def search_web(search: SearchWebItem) -> str: """Search the web for information using Firecrawl.""" start_time = time.perf_counter() input_text = search.model_dump_json() if _metrics else "" try: api_key = os.getenv("FIRECRAWL_API_KEY") if not api_key: result = "Error: FIRECRAWL_API_KEY not found in environment variables" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_web", input_text, result, duration_ms) return result firecrawl = Firecrawl(api_key=api_key) results = firecrawl.search( query=search.query, limit=search.limit, sources=["web"], tbs="qdr:d", # Search results from the past day ) timestamp = datetime.now().isoformat() stored_results: list[str] = [] # Parse web results specifically if isinstance(results, dict): if "data" in results and isinstance(results["data"], dict): results_list = results["data"].get("web", []) elif "data" in results: results_list = results["data"] if isinstance(results["data"], list) else [] elif "results" in results: results_list = results["results"] if isinstance(results["results"], list) else [] else: results_list = list(results.values()) if results else [] elif isinstance(results, list): results_list = results else: results_list = [results] if results else [] for idx, result in enumerate(results_list): if isinstance(result, dict): result_id = str(uuid.uuid4()) search_result = SearchResult( result_id=result_id, query=search.query, title=result.get("title", result.get("name", "Untitled")), url=result.get("url", result.get("link", "")), description=result.get("description", result.get("snippet", None)), timestamp=timestamp, source_type=SearchSource.WEB, ) _storage.search_results = search_result stored_results.append(result_id) if stored_results: lines = [ f"Found {len(stored_results)} search result(s) for query: '{search.query}'", "", ] for result_id in stored_results: result = _storage.search_results[result_id] lines.append(f"[{result_id}] {result.title}") lines.append(f" URL: {result.url}") if result.description: lines.append(f" Description: {result.description}") lines.append("") result = "\n".join(lines) else: result = f"Search completed for query: '{search.query}'. Results: {results}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_web", input_text, result, duration_ms) return result except Exception as e: result = f"Error searching the web: {str(e)}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_web", input_text, result, duration_ms) return result @toolset.tool(description=SEARCH_NEWS_DESCRIPTION) async def search_news(search: SearchNewsItem) -> str: """Search for news articles using Firecrawl.""" start_time = time.perf_counter() input_text = search.model_dump_json() if _metrics else "" try: api_key = os.getenv("FIRECRAWL_API_KEY") if not api_key: result = "Error: FIRECRAWL_API_KEY not found in environment variables" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_news", input_text, result, duration_ms) return result firecrawl = Firecrawl(api_key=api_key) # Build search parameters search_params: dict[str, Any] = { "query": search.query, "limit": search.limit, "sources": ["news"], } # Handle time filter if search.time_filter: if search.time_filter == TimeFilter.CUSTOM: if search.custom_date_min and search.custom_date_max: search_params["tbs"] = f"cdr:1,cd_min:{search.custom_date_min},cd_max:{search.custom_date_max}" else: result = "Error: custom_date_min and custom_date_max are required when time_filter is CUSTOM" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_news", input_text, result, duration_ms) return result else: search_params["tbs"] = search.time_filter.value results = firecrawl.search(**search_params) timestamp = datetime.now().isoformat() stored_results: list[str] = [] # Parse news results specifically if isinstance(results, dict): if "data" in results and isinstance(results["data"], dict): results_list = results["data"].get("news", []) elif "data" in results: results_list = results["data"] if isinstance(results["data"], list) else [] else: results_list = [] elif isinstance(results, list): results_list = results else: results_list = [] for idx, result in enumerate(results_list): if isinstance(result, dict): result_id = str(uuid.uuid4()) search_result = SearchResult( result_id=result_id, query=search.query, title=result.get("title", result.get("name", "Untitled")), url=result.get("url", result.get("link", "")), description=result.get("description", result.get("snippet", None)), timestamp=timestamp, source_type=SearchSource.NEWS, date=result.get("date", None), ) _storage.search_results = search_result stored_results.append(result_id) if stored_results: lines = [ f"Found {len(stored_results)} news result(s) for query: '{search.query}'", "", ] for result_id in stored_results: result = _storage.search_results[result_id] lines.append(f"[{result_id}] {result.title}") lines.append(f" URL: {result.url}") if result.date: lines.append(f" Date: {result.date}") if result.description: lines.append(f" Description: {result.description}") lines.append("") result = "\n".join(lines) else: result = f"Search completed for query: '{search.query}'. Results: {results}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_news", input_text, result, duration_ms) return result except Exception as e: result = f"Error searching news: {str(e)}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_news", input_text, result, duration_ms) return result @toolset.tool(description=SEARCH_IMAGES_DESCRIPTION) async def search_images(search: SearchImagesItem) -> str: """Search for images using Firecrawl.""" start_time = time.perf_counter() input_text = search.model_dump_json() if _metrics else "" try: api_key = os.getenv("FIRECRAWL_API_KEY") if not api_key: result = "Error: FIRECRAWL_API_KEY not found in environment variables" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_images", input_text, result, duration_ms) return result # Build query with resolution operators if provided query = search.query if search.exact_width and search.exact_height: query = f"{query} imagesize:{search.exact_width}x{search.exact_height}" elif search.min_width and search.min_height: query = f"{query} larger:{search.min_width}x{search.min_height}" firecrawl = Firecrawl(api_key=api_key) results = firecrawl.search( query=query, limit=search.limit, sources=["images"], ) timestamp = datetime.now().isoformat() stored_results: list[str] = [] # Parse image results specifically if isinstance(results, dict): if "data" in results and isinstance(results["data"], dict): results_list = results["data"].get("images", []) elif "data" in results: results_list = results["data"] if isinstance(results["data"], list) else [] else: results_list = [] elif isinstance(results, list): results_list = results else: results_list = [] for idx, result in enumerate(results_list): if isinstance(result, dict): result_id = str(uuid.uuid4()) search_result = SearchResult( result_id=result_id, query=search.query, title=result.get("title", result.get("name", "Untitled")), url=result.get("url", result.get("link", "")), description=result.get("description", None), timestamp=timestamp, source_type=SearchSource.IMAGES, image_url=result.get("imageUrl", None), image_width=result.get("imageWidth", None), image_height=result.get("imageHeight", None), ) _storage.search_results = search_result stored_results.append(result_id) if stored_results: lines = [ f"Found {len(stored_results)} image result(s) for query: '{search.query}'", "", ] for result_id in stored_results: result = _storage.search_results[result_id] lines.append(f"[{result_id}] {result.title}") if result.image_url: lines.append(f" Image URL: {result.image_url}") lines.append(f" Page URL: {result.url}") if result.image_width and result.image_height: lines.append(f" Dimensions: {result.image_width}x{result.image_height}") if result.description: lines.append(f" Description: {result.description}") lines.append("") result = "\n".join(lines) else: result = f"Search completed for query: '{search.query}'. Results: {results}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_images", input_text, result, duration_ms) return result except Exception as e: result = f"Error searching images: {str(e)}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("search_images", input_text, result, duration_ms) return result @toolset.tool(description=EXTRACT_WEB_CONTENT_DESCRIPTION) async def extract_web_content(extract: ExtractWebContentItem) -> str: """Extract main content from a webpage using Trafilatura.""" start_time = time.perf_counter() input_text = extract.model_dump_json() if _metrics else "" try: # Check if URL is from an image search result for search_result in _storage.search_results.values(): if search_result.url == extract.url and search_result.source_type == SearchSource.IMAGES: result = "Error: Content extraction is not supported for image search results. Use web or news search results instead." if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("extract_web_content", input_text, result, duration_ms) return result for content in _storage.extracted_contents.values(): if content.url == extract.url and content.output_format == extract.output_format: result = ( f"Using previously extracted content [{content.content_id}]:\n\n" f"{content.content}" ) if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("extract_web_content", input_text, result, duration_ms) return result downloaded = trafilatura.fetch_url(extract.url) if downloaded is None: result = f"Error: Could not fetch content from URL: {extract.url}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("extract_web_content", input_text, result, duration_ms) return result if extract.output_format == OutputFormat.MARKDOWN: extracted = trafilatura.extract(downloaded, output_format="markdown") else: extracted = trafilatura.extract(downloaded, output_format="txt") if extracted is None or not extracted.strip(): metadata = trafilatura.extract_metadata(downloaded) if metadata: result = ( f"Could not extract main content, but found metadata:\n" f"Title: {metadata.title}\n" f"Author: {metadata.author}\n" f"Date: {metadata.date}" ) else: result = f"Error: Could not extract content from URL: {extract.url}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("extract_web_content", input_text, result, duration_ms) return result content_id = str(uuid.uuid4()) timestamp = datetime.now().isoformat() extracted_content = ExtractedContent( content_id=content_id, url=extract.url, content=extracted, output_format=extract.output_format, timestamp=timestamp, ) _storage.extracted_contents = extracted_content result = ( f"Extracted content [{content_id}] from {extract.url} " f"(format: {extract.output_format.value}):\n\n{extracted}" ) if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("extract_web_content", input_text, result, duration_ms) return result except Exception as e: result = f"Error extracting content from {extract.url}: {str(e)}" if _metrics is not None: duration_ms = (time.perf_counter() - start_time) * 1000 _metrics.record_invocation("extract_web_content", input_text, result, duration_ms) return result return toolset
[docs] def get_search_system_prompt() -> str: """Get the system prompt for search-based reasoning. Returns: System prompt string that can be used with pydantic-ai agents. """ return SEARCH_SYSTEM_PROMPT
def create_search_toolset_agent(model: str = "openrouter:x-ai/grok-4.1-fast") -> Agent: """Create a Pydantic-ai agent with the search toolset. Args: model: The model to use for the agent. Returns: Pydantic-ai agent with the search toolset. """ storage = SearchStorage() toolset = create_search_toolset(storage=storage) agent = Agent( model, system_prompt=SEARCH_SYSTEM_PROMPT, toolsets=[toolset] ) @agent.instructions async def add_prompt() -> str: """Add the search system prompt.""" return get_search_system_prompt() return agent