Async and concurrency

Run agents asynchronously with await agent.arun, and fan out many prompts concurrently using asyncio.gather over one shared agent.


Every agent run is asynchronous under the hood. Agent.run is a thin synchronous wrapper around the async Agent.arun, so once you are inside an event loop you should call arun directly. This guide covers the two methods, how to run many prompts at once with asyncio.gather, and why a single Agent is safe to reuse across concurrent runs.

run vs arun

Agent exposes one execution path with two front doors:

  • agent.run(prompt, *, deps=None, **kwargs) is synchronous. It calls asyncio.run(self.arun(...)) for you.
  • agent.arun(prompt, *, deps=None, **kwargs) is the coroutine. await it from any async context.

Both return a Result. Use run in scripts, notebooks, and the REPL; use arun inside async code (web handlers, background workers, or anywhere you want concurrency).

basic_arun.py
import asyncio
 
from agentroute import Agent
 
agent = Agent(
    name="haiku-bot",
    model="claude-sonnet-4",
    instructions="You write short, vivid haiku.",
)
 
 
async def main() -> None:
    result = await agent.arun("Write a haiku about deploying on a Friday.")
    print(result)  # str(result) is the output text
 
 
asyncio.run(main())
Do not call run() inside a running loop

Because agent.run calls asyncio.run internally, calling it from inside an already-running event loop raises RuntimeError: asyncio.run() cannot be called from a running event loop. In async code, FastAPI handlers, Jupyter cells, or any coroutine, always await agent.arun(...) instead.

Running many prompts concurrently

There is no separate batch API. The idiomatic way to process many prompts is plain Python: build a list of arun coroutines and await them together with asyncio.gather. Each call to a model provider is I/O-bound, so they overlap and the whole batch finishes in roughly the time of the slowest single run.

batch_gather.py
import asyncio
 
from agentroute import Agent
 
agent = Agent(
    name="summarizer",
    model="claude-sonnet-4",
    instructions="Summarize the input in one sentence.",
)
 
prompts = [
    "The mitochondria is the powerhouse of the cell...",
    "Quarterly revenue rose 12% on strong cloud demand...",
    "The committee voted to delay the bridge project...",
]
 
 
async def main() -> None:
    results = await asyncio.gather(*(agent.arun(p) for p in prompts))
    for prompt, result in zip(prompts, results):
        print(f"{prompt[:30]}... -> {result}")
 
 
asyncio.run(main())

asyncio.gather preserves order: results[i] corresponds to prompts[i], regardless of which run finished first. Each entry is a full Result, so you also get per-run usage and messages — see Results.

Collecting structured output across a batch

Concurrency composes with structured output. Pass output=YourModel once on the agent and read result.output from every entry in the batch.

batch_structured.py
import asyncio
 
from pydantic import BaseModel
 
from agentroute import Agent
 
 
class Sentiment(BaseModel):
    label: str  # "positive" | "negative" | "neutral"
    confidence: float
 
 
agent = Agent(
    name="classifier",
    model="claude-sonnet-4",
    instructions="Classify the sentiment of the review.",
    output=Sentiment,
)
 
reviews = [
    "Best purchase I've made all year.",
    "It broke after two days. Avoid.",
    "It's fine. Does the job, nothing special.",
]
 
 
async def main() -> None:
    results = await asyncio.gather(*(agent.arun(r) for r in reviews))
    for review, result in zip(reviews, results):
        s = result.output  # a Sentiment instance
        print(f"{s.label:8} ({s.confidence:.2f})  {review}")
 
 
asyncio.run(main())

Handling partial failures

By default asyncio.gather cancels the whole group on the first exception. To keep the successful runs and inspect failures individually, pass return_exceptions=True and branch on the result type.

batch_resilient.py
async def main() -> None:
    results = await asyncio.gather(
        *(agent.arun(p) for p in prompts),
        return_exceptions=True,
    )
    for prompt, result in zip(prompts, results):
        if isinstance(result, Exception):
            print(f"FAILED: {prompt[:30]}... -> {result!r}")
        else:
            print(f"OK:     {prompt[:30]}... -> {result}")

Budget and turn limits surface as exceptions here too — ErrorBudget and ErrorMaxTurns. See Errors and retries for the full hierarchy.

Reuse one agent across concurrent runs

An Agent is configuration, not session state. Its fields — model, instructions, tools, output schema, memory, history — are shared and read-only during a run. Each call to arun constructs its own fresh Context (with its own Usage, message list, retry counter, and step) for that run alone.

That means you should build the agent once and reuse it for the whole batch, rather than creating a new Agent per prompt.

shared_agent.py
import asyncio
 
from agentroute import Agent
 
# Build once: this is shared, immutable config.
agent = Agent(
    name="translator",
    model="claude-sonnet-4",
    instructions="Translate the input to French.",
)
 
 
async def translate_all(texts: list[str]) -> list[str]:
    # Each arun gets its own Context; the agent config is shared safely.
    results = await asyncio.gather(*(agent.arun(t) for t in texts))
    return [str(r) for r in results]
 
 
asyncio.run(translate_all(["Hello", "Goodbye", "Thank you"]))
Per-run state lives on Context

Anything that changes during a run — accumulated Usage, the running message list, the retry count — is stored on the run's Context, never on the Agent. That is what makes a single agent instance safe to drive from many concurrent arun calls.

Passing per-run dependencies

When runs need different inputs beyond the prompt (a user id, a DB handle, a tenant), pass them through deps. They land on ctx.deps inside your tools, isolated per run.

per_run_deps.py
import asyncio
from dataclasses import dataclass
 
from agentroute import Agent, Context
 
 
@dataclass
class Deps:
    user_id: str
 
 
agent = Agent(name="assistant", model="claude-sonnet-4")
 
 
@agent.tool
def whoami(ctx: Context) -> str:
    """Return the current user's id."""
    return ctx.deps.user_id
 
 
async def main() -> None:
    results = await asyncio.gather(
        agent.arun("Who am I?", deps=Deps(user_id="alice")),
        agent.arun("Who am I?", deps=Deps(user_id="bob")),
    )
    for r in results:
        print(r)
 
 
asyncio.run(main())
Mind shared external resources

Reusing the agent is safe. Reusing things your tools touch may not be — a database connection, an HTTP client, or a writable cache shared across concurrent runs must itself be concurrency-safe. Prefer per-run resources via deps, or use an async-safe client. Note also that mutating an Agent field (for example registering a new tool) while runs are in flight is not supported; finish your configuration before you start the batch.

Bounding concurrency

Firing thousands of arun coroutines at once can hit provider rate limits. Cap the number in flight with an asyncio.Semaphore.

bounded.py
import asyncio
 
from agentroute import Agent
 
agent = Agent(name="worker", model="claude-sonnet-4")
sem = asyncio.Semaphore(8)  # at most 8 concurrent runs
 
 
async def run_one(prompt: str) -> str:
    async with sem:
        result = await agent.arun(prompt)
        return str(result)
 
 
async def main(prompts: list[str]) -> list[str]:
    return await asyncio.gather(*(run_one(p) for p in prompts))

For repeated transient failures, set Agent(max_cost=...) to cap spend per run and combine it with the retry-friendly patterns in Errors and retries.

Next steps