Skip to main content
    __  __     ____           ___                    __  _
   / / / /__  / / /___       /   | ____ ____  ____  / /_(_)________ _
  / /_/ / _ \/ / / __ \     / /| |/ __ `/ _ \/ __ \/ __/ / ___/ __ `/
 / __  /  __/ / / /_/ /    / ___ / /_/ /  __/ / / / /_/ / /__/ /_/ /
/_/ /_/\___/_/_/\____( )  /_/  |_\__, /\___/_/ /_/\__/_/\___/\__,_/
                     |/         /____/
Like this? Get Agentica to make it.

How do you use Agentica?

Prerequisites:
  • Install agentica
  • Add your AGENTICA_API_KEY
There are two main ways to use Agentica. They are:
  • creating a magic function
  • spawning an agent with the spawn function
See the references for more details.

What can you use Agentica for?

Below are a few examples that we believe highlight some of the best features of Agentica!

Grab and go

Install any prerequisites, copy and off you go.
  • Python
  • TypeScript
Prerequisites:
  • Run pip install slack-sdk or uv add slack-sdk
  • Add your SLACK_BOT_TOKEN
Read these instructions to generate a SLACK_BOT_TOKEN !
slack_bot.py
import os
import asyncio
from agentica import magic
from slack_sdk import WebClient

SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN")

# We know we will want to list users and send a message
slack_conn = WebClient(token=SLACK_BOT_TOKEN)
send_direct_message = slack_conn.chat_postMessage

@magic(send_direct_message,  model="openai:gpt-4.1" )
async def send_morning_message(user_name: str) -> None:
    """
    Uses the Slack API to send a direct message to a user. Light and cheerful!
    """
    ...

if __name__ == "__main__":
    import asyncio
    asyncio.run(send_morning_message('@John'))
    print("Morning message sent!")
Prerequisites:
  • Run pip install matplotlib pandas ipynb jupyter or uv add matplotlib pandas ipynb jupyter
  • Download the CSV and save as /movie_metadata.csv
  • Run jupyter notebook data_science.ipynb
data_science.ipynb
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
    "from agentica import spawn\n",
    "import pandas as pd\n",
    "import matplotlib.pyplot as plt\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
    "agent = await spawn()\n",
    "result = await agent.call(\n",
    "    dict[str, int],\n",
    "    \"Show the number of movies for each major genre. The results can be in any order.\",\n",
    "    movie_metadata_dataset=pd.read_csv(\"./movie_metadata.csv\").to_dict(),\n",
    ")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
    "plt.figure(figsize=(12, 8))\n",
    "plt.bar(list(result.keys()), list(result.values()))\n",
    "plt.xticks(rotation=45, ha='right')\n",
    "plt.tight_layout()\n",
    "plt.show()\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
    "result = await agent.call(\n",
    "    dict[str, int],\n",
    "    \"Update the result to only contain the genres that have more than 1000 movies.\",\n",
    ")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
    "plt.figure(figsize=(12, 8))\n",
    "plt.bar(list(result.keys()), list(result.values()))\n",
    "plt.xticks(rotation=45, ha='right')\n",
    "plt.tight_layout()\n",
    "plt.show()\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
    "name": "ipython",
    "version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Prerequisites:
  • If on macOS, install system dependencies with brew install pkg-config cairo meson ninja
  • Run pip install exa-py validators markdown xhtml2pdf
  • Create an EXA account, create an EXA_SERVICE_API_KEY and run export EXA_SERVICE_API_KEY="<your-key-here>"
deep_research.py
import asyncio
import itertools
import json
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

import httpx
import markdown
import validators
from exa_py import AsyncExa
from agentica.magic.agent import Agent
from agentica.std.caption import CaptionLogger
from xhtml2pdf import pisa

if not os.getenv("EXA_SERVICE_API_KEY"):
    raise ValueError("EXA_SERVICE_API_KEY is not set")


CITATION_SP = """
You are a citation agent.

# Task
You must:
1. Review the research report provided in the REPL as `research_report` line by line.
2. Identify which lines of the research report use information that could be from web search results.
3. List the web search results that were used in creating the research report.
4. For each of these lines, use the `load_web_search_result` function to load the web search result that was used.
5. Add a markdown citation with the URL of the web search result to the claim in the research report by modifying the `research_report` variable.
6. Once this is done, make sure the `research_report` is valid markdown - if not, change the markdown to make it valid.
7. Use the `save_report` function to save the research report to memory as a markdown file at the end.
8. Return saying you have finished.

# Rules
- Your citations MUST be consistent throughout the `research_report`.
- You MUST use the `list_web_search_results` function to list the web search results that were used in creating the research report
- You MUST use the `load_web_search_result` function to load the web search results.
- You MUST use the `research_report` variable provided in the REPL to modify the research report by adding citations.
- You MUST make sure the `research_report` is valid markdown.
- You MUST use the `save_report` function to save the research report to memory at the end.
"""

LEAD_RESEARCHER_SP = """
You are a lead researcher.

# Task
You must:
1. Create a plan to research the user query.
2. Determine how many specialised subagents (with access to the web) are necessary, each with a different specific research task.
3. In separate REPL sessions, call ALL subagents in parallel using asyncio.gather and asyncio.run to perform their research task.
4. Summarise the results of the subagents in a final research report as markdown. Use sections, sub-sections, list and formatting to make the report easy to read and understand. The formatting should be consistent and easy to follow.
5. Check the final research report, as this will be shown to the user.
5. Return the final research report by assigning using `return` at the very end in a separate REPL session.

# Rules
- Use the directory {directory}
- Do NOT need to check if the directory {directory} exists, it is local to the user therefore you.
- The planning process, subagents and final report MUST be done in SEPARATE REPL sessions.
- Do NOT construct the final report until you have run the subagents.
- Do NOT return the final report in the REPL until planning, assigning subagents and returning the final report is complete.
- Do NOT add citations to the final research report yourself, this will be done afterwards.
- Do NOT repeat yourself in the final research report.
- You MUST raise an AgentError if you cannot complete the task with what you have available.
- You MUST check the final research report before returning it to the user.

## Planning
- You MUST write the plan yourself.
- You MUST write the plan before assigning subagents to tasks.
- You MUST break down the task into small individual tasks.

## Subagents
- You MUST assign each small individual task to a subagent.
- You MUST instruct subagents to use the web_search and save_used_web_search functions if the task requires it.
- Do NOT ask subagents to cite the web, instead instruct them to use the save_used_web_search function.
- Subagents MUST be assigned independent tasks.
- If after subagents have returned their findings more research is needed, you can assign more subagents to tasks.

## Final Report
- Do NOT write the final report yourself without running subagents to do so.
- Do NOT add citations to the final research report yourself, this will be done afterwards.
- Do NOT repeat yourself in the final research report.
- You MUST load the plan from memory before returning the final research report to check that you have followed the plan.
- You MUST check the final research report before returning it to the user.
- You MUST return the final research report using `return` at the very end in a separate REPL session.
""" 
SUBAGENT_SP = """
You are a helpful assistant.

# Task
You must:
1. Construct a list of things to search for using the web_search function.
2. Execute ALL web_search calls in parallel using asyncio.gather and asyncio.run.
3. For each search result, `print()` the content of each search result by accessing the SearchResult.content attribute
4. Identify which lines of content you are going to use in your report.
5. Use the save_used_web_search function to save the SearchResult to memory and include the lines of the content that you have used.
6. Condense the search results into a single report with what you have found.
7. Return the report using `return` at the very end in a separate REPL session.

# Rules
- You MUST use `print()` to print the content of each search result by accessing the SearchResult.content attribute.
- You MUST use the web_search function if instructed to do so OR if the task requires finding information.
- Do NOT assume that the web_search function will return the information you need, you must go through the content of each search result line by line by accessing the SearchResult.content attribute
- Do NOT assume which lines of content you are going to use in your report, you must go through the content of each search result line by line by accessing the SearchResult.content attribute
- If you cannot find any information, do NOT provide information yourself, instead raise an error for the lead researcher in the REPL.
- You MUST save the SearchResult of any research that you have used to memory and include the lines of the content that you have used.
- Return the report using `return` at the very end in a separate REPL session.
"""


def save_plan(plan: str, directory: str) -> None:
    """Save a research plan."""
    path = f"{directory}/plan.md"
    os.makedirs(directory, exist_ok=True)
    with open(path, "w") as f:
        _ = f.write(plan)


def load_plan(directory: str) -> str:
    """Load a research plan."""
    path = f"{directory}/plan.md"
    if not os.path.exists(path):
        raise FileNotFoundError(f"Plan file {path} not made yet.")
    with open(path, "r") as f:
        return str(f.read())


def md_to_pdf(input_path: str, output_path: str):
    with open(input_path, "r", encoding="utf-8") as f:
        md_text = f.read()
    html_text = markdown.markdown(md_text)
    with open(output_path, "wb") as pdf_file:
        pisa.CreatePDF(html_text, dest=pdf_file)
    return output_path


@dataclass
class SearchResult:
    """
    Represents a single search result from the search engine.
    """

    title: str
    url: str
    content: str
    score: float | None = None
    lines_used: list[tuple[int, int]] | None = None

    def __post_init__(self):
        if not validators.url(self.url):
            raise ValueError("This is not a valid URL")

    def __repr__(self):
        return f"SearchResult(title={self.title}, score={self.score}, url={self.url}, ...)"

    def print_content_with_lines_numbers(self):
        """Print the content of the search result with the lines numbers that have been used."""
        for line_number, line in enumerate(self.content.split("\n"), start=1):
            print(f"{line_number}: {line}")

    def save(self, path: str, lines_used: list[tuple[int, int]]):
        """Save a web search result specifying which lines of the content have been used."""
        self.lines_used = lines_used
        if dir := os.path.dirname(path):
            os.makedirs(dir, exist_ok=True)
        with open(path, "w") as f:
            _ = f.write(json.dumps(self.__dict__))


_id_gen = itertools.count(0)  # Backup ID gen when listeners are disabled.


class SubAgent:
    id: int
    n: int
    directory: str
    _brain: Agent
    _exa_client: AsyncExa | None

    # Seen by the lead researcher
    def __init__(self, directory: str):
        """Create a subagent that has access to the web."""
        self.n = 0
        self.directory = directory
        self._brain = Agent(
            model="openai:gpt-4.1",
            premise=SUBAGENT_SP,
            scope={
                "web_search": self._web_search,
                "SearchResult": SearchResult,
                "save_used_web_search": self._save_used_web_search,
            },
        )
        self._exa_client = None
        id = None
        if (listener := self._brain._listener) is not None:
            id = listener.logger.local_id
        if id is None:
            id = next(_id_gen)
        self.id = id

    async def _ensure_exa_client(self) -> None:
        if self._exa_client is not None:
            return
        
        # Create a new API key using the admin endpoint
        admin_api_key = os.getenv("EXA_SERVICE_API_KEY")
        if not admin_api_key:
            raise ValueError("EXA_SERVICE_API_KEY environment variable is not set")
            
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://admin-api.exa.ai/team-management/api-keys",
                headers={
                    "x-api-key": admin_api_key,
                    "Content-Type": "application/json",
                },
                json={
                    "name": f"SubAgent_{self.id}_Key_{datetime.now().strftime('%Y%m%d%H%M%S')}",
                },
            )
            response.raise_for_status()
            data = response.json()
            
            # Extract the API key from the response
            # The actual key value should be in the response (check 'key' or 'value' field)
            new_api_key = data.get("apiKey", {}).get("id")
            if not new_api_key:
                raise ValueError(f"Could not extract API key from response: {data}")
            print(f"Created new EXA API key for subagent {self.id}: ...{new_api_key[-4:]}")
        
        # Initialize the Exa client with the newly created API key
        self._exa_client = AsyncExa(api_key=new_api_key)


    # Seen by the lead researcher
    async def __call__(self, task: str) -> str:
        """
        Run a subagent for a given task. The subagent will return its research, having saved the search results that it has used.
        """
        print(f"Running web-search subagent ({self.id})")
        await self._ensure_exa_client()
        with CaptionLogger():
            result = await self._brain.call(str, task)
        return result

    def _get_path(self) -> str:
        self.n += 1
        return f"{self.directory}/subagent_{self.id}/result_{self.n}.json"

    def _save_used_web_search(
        self, search_result: SearchResult, lines_used: list[tuple[int, int]]
    ) -> None:
        """Save a SearchResult object as JSON."""
        search_result.save(self._get_path(), lines_used)

    async def _web_search(self, query: str) -> list[SearchResult]:
        """Searches the web given a query, returning a list of SearchResults."""
        print(f"Searching the web for {query}")
        await self._ensure_exa_client()
        response = await self._exa_client.search_and_contents(
            query=query,
            num_results=2,
            text=True,  # Get text content in markdown format
        )
        results = []
        for result in response.results:
            # Exa automatically provides content in markdown format when text=True
            content = getattr(result, 'text', '<content missing/>')
            search_result = SearchResult(
                title=result.title or "<title missing/>",
                url=result.url,
                content=content,
                score=getattr(result, 'score', None),
            )
            results.append(search_result)
        return results


class CitationAgent:
    directory: str
    _brain: Agent

    def __init__(self, directory: str, system_prompt: str):
        self.directory = directory
        self._brain = Agent(
            model="openai:gpt-4.1",
            premise=system_prompt,
            scope={
                "list_web_search_results": self.list_web_search_results,
                "load_web_search_result": self.load_web_search_result,
                "save_report": self.save_report,
                "SearchResult": SearchResult,
            },
        )

    async def __call__(self, md_report: str) -> str:
        print(f"Running citation agent")
        return await self._brain.call(str, f"The `research_report = {md_report[:10]}...[truncated]` has been provided to you in the REPL.", research_report=md_report)

    def load_web_search_result(self, path: str) -> SearchResult:
        """Load a search result queried by a subagent."""
        if not path.startswith(self.directory):
            raise ValueError(f"The file path must start with {self.directory}")
        if not os.path.exists(path):
            raise FileNotFoundError(f"Web search file {path} does not exist.")
        with open(path, "r") as f:
            return SearchResult(**json.loads(f.read()))

    def list_web_search_results(self) -> list[str]:
        """List all saved web search results queried by subagents."""
        directory_path = Path(self.directory)
        files: list[str] = []
        # Find all subagent_* directories
        for subagent_dir in directory_path.glob("subagent_*"):
            # Find result_<integer>.json files in each subagent directory
            if not subagent_dir.is_dir():
                continue
            for file in subagent_dir.iterdir():
                if (
                    file.is_file()
                    and file.suffix == '.json'
                    and re.match(r'^result_\d+$', file.stem)
                ):
                    files.append(str(file))
        return files

    def save_report(self, md_report: str) -> None:
        """Save a research report as markdown."""
        path = f"{self.directory}/report.md"
        if dir := os.path.dirname(path):
            os.makedirs(dir, exist_ok=True)
        with open(path, "w") as f:
            _ = f.write(md_report)
        try:
            _ = md_to_pdf(path, f"{self.directory}/report.pdf")
        except Exception as e:
            print(f"Error converting markdown to PDF: {e}")


class DeepResearchSession:
    directory: str
    lr_system_prompt: str
    ca_system_prompt: str

    lead_researcher: Agent
    citation_agent: CitationAgent

    def __init__(
        self,
        directory: str,
        lr_system_prompt: str = LEAD_RESEARCHER_SP,
        ca_system_prompt: str = CITATION_SP,
    ):
        self.directory = directory
        if not os.path.exists(self.directory):
            os.makedirs(self.directory, exist_ok=True)
        self.lr_system_prompt = lr_system_prompt.format(directory=directory)
        self.ca_system_prompt = ca_system_prompt
        self.lead_researcher = Agent(
            premise=self.lr_system_prompt,
            model="openai:gpt-4.1",
            scope={
                "save_plan": save_plan,
                "load_plan": load_plan,
                "SubAgent": SubAgent,
            },
        )
        self.citation_agent = CitationAgent(
            directory=self.directory,
            system_prompt=self.ca_system_prompt,
        )

    async def __call__(self, query: str) -> str:
        """Run the deep research process and include citations at the end if it is generating a report for the first time."""
        with CaptionLogger():
            result = await self.lead_researcher(str, query)
            _ = await self.citation_agent(result)
        if not os.path.exists(f"{self.directory}/report.md"):
            raise RuntimeError("Report file does not exist")
        _ = md_to_pdf(f"{self.directory}/report.md", f"{self.directory}/report.pdf")
        return (
            f"Check out the research report at {self.directory}/report.pdf. Ask me any questions!"
        )

if __name__ == "__main__":
    sys.path.append(os.path.dirname(__file__))
    dr_session = DeepResearchSession("deep_research_test")
    result = asyncio.run(
        dr_session(
            "What are all of the companies in the US working on AI agents in 2025? make a list of at least 10. "
            + "For each, include the name, website and product, description of what they do, type of agents they build, and their vertical/industry."
        )
    )
    print(result)
View the generated report

Walk-throughs

  • Python
  • TypeScript
Prerequisites:
  • Run pip install slack-sdk or uv add slack-sdk
  • Add your SLACK_BOT_TOKEN
Python objects are tools. They are there to be manipulated and used. Agentica lets agents do just that, including using functions, classes and objects from any Python SDK.
As a simple example, let’s say you want to use the Slack client from the Slack SDK to send someone a message with some custom business logic inside it. Let’s start by creating a client.
import os

from agentica import magic
from slack_sdk import WebClient

SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN")

slack_conn = WebClient(token=SLACK_BOT_TOKEN)
Read these instructions to generate a SLACK_BOT_TOKEN !
Then isolate the relevant Slack methods.
send_direct_message = slack_conn.chat_postMessage
And simply pass them to your magic function using the @magic decorator. Note that the prompt to the model is specified in the docstring and the method definition is empty.
@magic(send_direct_message,  model="openai:gpt-4.1" )
async def send_morning_message(user_name: str) -> None:
    """
    Uses the Slack API to send a direct message to a user. Light and cheerful!
    """
    ...

import asyncio
asyncio.run(send_morning_message("@John"))
print("Morning message sent!")
For more information on what objects you can pass in via the @magic decorator, see the references. If you prefer more agentic syntax, try the following:
from agentica import spawn

async def main():
    morning_messenger = await spawn(
        """
        Use the Slack API to send the user a direct message. Light and cheerful!
        """,
        scope={
            "send_direct_message": send_direct_message,
        }
    )
    _ = await morning_messenger(None, "@John") # `None` return-type
    print("Morning message sent!")

asyncio.run(main())
Prerequisites:
  • Run pip install matplotlib pandas ipynb jupyter or uv add matplotlib pandas ipynb jupyter
  • Download the CSV and save as /movie_metadata.csv
Let’s take an example from the DSEval benchmark and use an agent in Agentica to answer questions on a dataset in a Jupyter notebook. Let’s start by importing a few things.
from agentica import spawn
import pandas as pd
import matplotlib.pyplot as plt
Now let’s get an agent to help us answer a question on the dataset and stream in its thinking.
agent = await spawn()
result = await agent.call(
    dict[str, int],
    "Show the number of movies for each major genre. The results can be in any order.",
    movie_metadata_dataset=pd.read_csv("./movie_metadata.csv").to_dict(),
)
To determine the number of movies for each major genre, we can follow these steps:

1. Access the `'genres'` field in the `movie_metadata_dataset` dictionary, which should contain the genres of the movies.
2. Initialize a dictionary to keep track of the count of movies in each genre.
3. Iterate over the genres for each movie, and for movies with multiple genres (assuming they are separated by '|'), split the string and count each genre separately.
4. Update the count of each genre in our dictionary.
5. Return the dictionary with the genre counts as the result.

Let's get started by inspecting the `movie_metadata_dataset` to understand its structure and find how genres are stored.No code was executed. Use ```python code blocks to execute code.
```python
# Inspect the structure of movie_metadata_dataset to locate the genres information
movie_metadata_dataset.keys()
```dict_keys(['color', 'director_name', 'num_critic_for_reviews', 'duration', 'director_facebook_likes', 'actor_3_facebook_likes', 'actor_2_name', 'actor_1_facebook_likes', 'gross', 'genres', 'actor_1_name', 'movie_title', 'num_voted_users', 'cast_total_facebook_likes', 'actor_3_name', 'facenumber_in_poster', 'plot_keywords', 'movie_imdb_link', 'num_user_for_reviews', 'language', 'country', 'content_rating', 'budget', 'title_year', 'actor_2_facebook_likes', 'imdb_score', 'aspect_ratio', 'movie_facebook_likes'])
The `movie_metadata_dataset` contains a field `'genres'`, which indicates that we can use this to count the number of movies for each genre. Let's examine a few entries from the `'genres'` field to understand its format. This will help us properly split the genres if they are present as a delimited string.No code was executed. Use ```python code blocks to execute code.
```python
# Look at the first few entries in the 'genres' field
list(movie_metadata_dataset['genres'].values())[:5]
```['Action|Adventure|Fantasy|Sci-Fi', 'Action|Adventure|Fantasy', 'Action|Adventure|Thriller', 'Action|Thriller', 'Documentary']
The genres are stored as strings, with each genre for a movie separated by a '|'. We will split these strings and count each genre separately.

Let's proceed to compute the number of movies for each genre.No code was executed. Use ```python code blocks to execute code.
```python
from collections import defaultdict

# Create a defaultdict to store the count of each genre
genre_count = defaultdict(int)

# Iterate over each movie's genres
for genres in movie_metadata_dataset['genres'].values():
# Split the genres string by '|'
for genre in genres.split('|'):
    # Increment the count for each genre
    genre_count[genre] += 1

# Convert defaultdict to a regular dictionary for the result
result = dict(genre_count)
Let’s print the result.
result
{'Action': 1153, 'Adventure': 923, 'Fantasy': 610, 'Sci-Fi': 616, 'Thriller': 1411, 'Documentary': 121, 'Romance': 1107, 'Animation': 242, 'Comedy': 1872, 'Family': 546, 'Musical': 132, 'Mystery': 500, 'Western': 97, 'Drama': 2594, 'History': 207, 'Sport': 182, 'Crime': 889, 'Horror': 565, 'War': 213, 'Biography': 293, 'Music': 214, 'Game-Show': 1, 'Reality-TV': 2, 'News': 3, 'Short': 5, 'Film-Noir': 6}
Now let’s make a plot with the returned data, since it has passed us back the appropriate object!
plt.figure(figsize=(12, 8))
plt.bar(list(result.keys()), list(result.values()))
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()
Movies by genreBut what if we want only the genres with over 1000 movies? Our agent still has access to our result in its execution environment and can manipulate that variable by reference!
result = await agent.call(
dict[str, int],
"Update the result to only contain the genres that have more than 1000 movies.",
)
To update the result to contain only the genres with more than 1000 movies, we'll filter the dictionary accordingly. Let's do that now.No code was executed. Use ```python code blocks to execute code.
```python
# Filter the genre_count dictionary to include only genres with more than 1000 movies
result = {genre: count for genre, count in genre_count.items() if count > 1000}
Now we can remake the plot!
plt.figure(figsize=(12, 8))
plt.bar(list(result.keys()), list(result.values()))
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()
Top 5 movies by genreFor more information on what objects you can pass to spawn, see the references.
Prerequisites:
  • If on macOS, install system dependencies with brew install pkg-config cairo meson ninja
  • Run pip install exa-py validators markdown xhtml2pdf or uv add exa-py validators markdown xhtml2pdf
  • Create an EXA account, create an EXA_SERVICE_API_KEY and run export EXA_SERVICE_API_KEY="<your-key-here>"
Let’s replicate the Anthropic’s deep research multi-agent system. The high level architecture and the iterative process are outlined in the images below.Let’s start building.We want a few utility functions that the lead researcher agent can use.
First, let’s import a few things.
import asyncio
import itertools
import json
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

import httpx
import markdown
import validators
from exa_py import AsyncExa
from agentica.magic.agent import Agent
from agentica.std.caption import CaptionLogger
from datetime import datetime
from xhtml2pdf import pisa
We need to be able to save and read a plan as a .txt file locally in some local directory. Likewise we need to be able to make a pdf out of the final markdown report.
def save_plan(plan: str, directory: str) -> None:
    """Save a research plan."""
    path = f"{directory}/plan.md"
    os.makedirs(directory, exist_ok=True)
    with open(path, "w") as f:
        _ = f.write(plan)


def load_plan(directory: str) -> str:
    """Load a research plan."""
    path = f"{directory}/plan.md"
    if not os.path.exists(path):
        raise FileNotFoundError(f"Plan file {path} not made yet.")
    with open(path, "r") as f:
        return str(f.read())


def md_to_pdf(input_path: str, output_path: str):
    with open(input_path, "r", encoding="utf-8") as f:
        md_text = f.read()
    html_text = markdown.markdown(md_text)
    with open(output_path, "wb") as pdf_file:
        pisa.CreatePDF(html_text, dest=pdf_file)
    return output_path
We will need to include web search. Since we know the final report will be in markdown, let’s use the Exa AI Python SDK.
Let’s add some bonus features:
  • all web search URLs should be valid URLs
  • the citation agent should be able to easilyaccess past web searches and know what specific content of the web search has been used in any findings
The lead researcher should be able to create and run as many subagents as it deems necessary to work on independent tasks (with web search).
Let’s add some bonus features:
  • the lead researcher should have the option to reuse a subagent with persistent context e.g. asking a subagent to redo a task that it got wrong
  • subagents should save the web search results that they use specifying what they have used for the citation agent to review
Let’s create a custom class as follows:
_id_gen = itertools.count(0)  # Backup ID gen when listeners are disabled.


class SubAgent:
    id: int
    n: int
    directory: str
    _brain: Agent
    _exa_client: AsyncExa | None

    # Seen by the lead researcher
    def __init__(self, directory: str):
        """Create a subagent that has access to the web."""
        self.n = 0
        self.directory = directory
        self._brain = Agent(
            model="openai:gpt-4.1",
            premise=SUBAGENT_SP,
            scope={
                "web_search": self._web_search,
                "SearchResult": SearchResult,
                "save_used_web_search": self._save_used_web_search,
            },
        )
        self._exa_client = None
        id = None
        if (listener := self._brain._listener) is not None:
            id = listener.logger.local_id
        if id is None:
            id = next(_id_gen)
        self.id = id

    async def _ensure_exa_client(self) -> None:
        if self._exa_client is not None:
            return
        
        # Create a new API key using the admin endpoint
        admin_api_key = os.getenv("EXA_SERVICE_API_KEY")
        if not admin_api_key:
            raise ValueError("EXA_SERVICE_API_KEY environment variable is not set")
            
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://admin-api.exa.ai/team-management/api-keys",
                headers={
                    "x-api-key": admin_api_key,
                    "Content-Type": "application/json",
                },
                json={
                    "name": f"SubAgent_{self.id}_Key_{datetime.now().strftime('%Y%m%d%H%M%S')}",
                },
            )
            response.raise_for_status()
            data = response.json()
            
            # Extract the API key from the response
            # The actual key value should be in the response (check 'key' or 'value' field)
            new_api_key = data.get("apiKey", {}).get("id")
            if not new_api_key:
                raise ValueError(f"Could not extract API key from response: {data}")
            print(f"Created new EXA API key for subagent {self.id}: ...{new_api_key[-4:]}")
        
        # Initialize the Exa client with the newly created API key
        self._exa_client = AsyncExa(api_key=new_api_key)


    # Seen by the lead researcher
    async def __call__(self, task: str) -> str:
        """
        Run a subagent for a given task. The subagent will return its research, having saved the search results that it has used.
        """
        print(f"Running web-search subagent ({self.id})")
        await self._ensure_exa_client()
        with CaptionLogger():
            result = await self._brain.call(str, task)
        return result

    def _get_path(self) -> str:
        self.n += 1
        return f"{self.directory}/subagent_{self.id}/result_{self.n}.json"

    def _save_used_web_search(
        self, search_result: SearchResult, lines_used: list[tuple[int, int]]
    ) -> None:
        """Save a SearchResult object as JSON."""
        search_result.save(self._get_path(), lines_used)

    async def _web_search(self, query: str) -> list[SearchResult]:
        """Searches the web given a query, returning a list of SearchResults."""
        print(f"Searching the web for {query}")
        await self._ensure_exa_client()
        response = await self._exa_client.search_and_contents(
            query=query,
            num_results=2,
            text=True,  # Get text content in markdown format
        )
        results = []
        for result in response.results:
            # Exa automatically provides content in markdown format when text=True
            content = getattr(result, 'text', '<content missing/>')
            search_result = SearchResult(
                title=result.title or "<title missing/>",
                url=result.url,
                content=content,
                score=getattr(result, 'score', None),
            )
            results.append(search_result)
        return results
The citation agent should be able to list and look back through web searches made by subagents as well as save the final report as an .md file.
class CitationAgent:
    directory: str
    _brain: Agent

    def __init__(self, directory: str, system_prompt: str):
        self.directory = directory
        self._brain = Agent(
            model="openai:gpt-4.1",
            premise=system_prompt,
            scope={
                "list_web_search_results": self.list_web_search_results,
                "load_web_search_result": self.load_web_search_result,
                "save_report": self.save_report,
                "SearchResult": SearchResult,
            },
        )

    async def __call__(self, md_report: str) -> str:
        print(f"Running citation agent")
        return await self._brain.call(str, f"The `research_report = {md_report[:10]}...[truncated]` has been provided to you in the REPL.", research_report=md_report)

    def load_web_search_result(self, path: str) -> SearchResult:
        """Load a search result queried by a subagent."""
        if not path.startswith(self.directory):
            raise ValueError(f"The file path must start with {self.directory}")
        if not os.path.exists(path):
            raise FileNotFoundError(f"Web search file {path} does not exist.")
        with open(path, "r") as f:
            return SearchResult(**json.loads(f.read()))

    def list_web_search_results(self) -> list[str]:
        """List all saved web search results queried by subagents."""
        directory_path = Path(self.directory)
        files: list[str] = []
        # Find all subagent_* directories
        for subagent_dir in directory_path.glob("subagent_*"):
            # Find result_<integer>.json files in each subagent directory
            if not subagent_dir.is_dir():
                continue
            for file in subagent_dir.iterdir():
                if (
                    file.is_file()
                    and file.suffix == '.json'
                    and re.match(r'^result_\d+$', file.stem)
                ):
                    files.append(str(file))
        return files

    def save_report(self, md_report: str) -> None:
        """Save a research report as markdown."""
        path = f"{self.directory}/report.md"
        if dir := os.path.dirname(path):
            os.makedirs(dir, exist_ok=True)
        with open(path, "w") as f:
            _ = f.write(md_report)
        try:
            _ = md_to_pdf(path, f"{self.directory}/report.pdf")
        except Exception as e:
            print(f"Error converting markdown to PDF: {e}")
Finally, let’s put it all together, making sure that
  • the local directory argument is user-defined for security,
  • the citation agent is always called after the research report is generated by the lead researcher, and
  • the user has the opportunity to ask follow-up questions after receiving the research report.
class DeepResearchSession:
    directory: str
    lr_system_prompt: str
    ca_system_prompt: str

    lead_researcher: Agent
    citation_agent: CitationAgent

    def __init__(
        self,
        directory: str,
        lr_system_prompt: str = LEAD_RESEARCHER_SP,
        ca_system_prompt: str = CITATION_SP,
    ):
        self.directory = directory
        if not os.path.exists(self.directory):
            os.makedirs(self.directory, exist_ok=True)
        self.lr_system_prompt = lr_system_prompt.format(directory=directory)
        self.ca_system_prompt = ca_system_prompt
        self.lead_researcher = Agent(
            premise=self.lr_system_prompt,
            model="openai:gpt-4.1",
            scope={
                "save_plan": save_plan,
                "load_plan": load_plan,
                "SubAgent": SubAgent,
            },
        )
        self.citation_agent = CitationAgent(
            directory=self.directory,
            system_prompt=self.ca_system_prompt,
        )

    async def __call__(self, query: str) -> str:
        """Run the deep research process and include citations at the end if it is generating a report for the first time."""
        with CaptionLogger():
            result = await self.lead_researcher(str, query)
            _ = await self.citation_agent(result)
        if not os.path.exists(f"{self.directory}/report.md"):
            raise RuntimeError("Report file does not exist")
        _ = md_to_pdf(f"{self.directory}/report.md", f"{self.directory}/report.pdf")
        return (
            f"Check out the research report at {self.directory}/report.pdf. Ask me any questions!"
        )
Now we can run the session!
if __name__ == "__main__":
    sys.path.append(os.path.dirname(__file__))
    dr_session = DeepResearchSession("deep_research_test")
    result = asyncio.run(
        dr_session(
            "What are all of the companies in the US working on AI agents in 2025? make a list of at least 10. "
            + "For each, include the name, website and product, description of what they do, type of agents they build, and their vertical/industry."
        )
    )
    print(result)
View the generated report