Async and concurrency
Run agents asynchronously with await agent.arun, and fan out many prompts concurrently using asyncio.gather over one shared agent.
Every agent run is asynchronous under the hood. Agent.run is a thin synchronous wrapper around the async Agent.arun, so once you are inside an event loop you should call arun directly. This guide covers the two methods, how to run many prompts at once with asyncio.gather, and why a single Agent is safe to reuse across concurrent runs.
run vs arun
Agent exposes one execution path with two front doors:
agent.run(prompt, *, deps=None, **kwargs)is synchronous. It callsasyncio.run(self.arun(...))for you.agent.arun(prompt, *, deps=None, **kwargs)is the coroutine.awaitit from any async context.
Both return a Result. Use run in scripts, notebooks, and the REPL; use arun inside async code (web handlers, background workers, or anywhere you want concurrency).
import asyncio
from agentroute import Agent
agent = Agent(
name="haiku-bot",
model="claude-sonnet-4",
instructions="You write short, vivid haiku.",
)
async def main() -> None:
result = await agent.arun("Write a haiku about deploying on a Friday.")
print(result) # str(result) is the output text
asyncio.run(main())Because agent.run calls asyncio.run internally, calling it from inside an already-running event loop raises RuntimeError: asyncio.run() cannot be called from a running event loop. In async code, FastAPI handlers, Jupyter cells, or any coroutine, always await agent.arun(...) instead.
Running many prompts concurrently
There is no separate batch API. The idiomatic way to process many prompts is plain Python: build a list of arun coroutines and await them together with asyncio.gather. Each call to a model provider is I/O-bound, so they overlap and the whole batch finishes in roughly the time of the slowest single run.
import asyncio
from agentroute import Agent
agent = Agent(
name="summarizer",
model="claude-sonnet-4",
instructions="Summarize the input in one sentence.",
)
prompts = [
"The mitochondria is the powerhouse of the cell...",
"Quarterly revenue rose 12% on strong cloud demand...",
"The committee voted to delay the bridge project...",
]
async def main() -> None:
results = await asyncio.gather(*(agent.arun(p) for p in prompts))
for prompt, result in zip(prompts, results):
print(f"{prompt[:30]}... -> {result}")
asyncio.run(main())asyncio.gather preserves order: results[i] corresponds to prompts[i], regardless of which run finished first. Each entry is a full Result, so you also get per-run usage and messages — see Results.
Collecting structured output across a batch
Concurrency composes with structured output. Pass output=YourModel once on the agent and read result.output from every entry in the batch.
import asyncio
from pydantic import BaseModel
from agentroute import Agent
class Sentiment(BaseModel):
label: str # "positive" | "negative" | "neutral"
confidence: float
agent = Agent(
name="classifier",
model="claude-sonnet-4",
instructions="Classify the sentiment of the review.",
output=Sentiment,
)
reviews = [
"Best purchase I've made all year.",
"It broke after two days. Avoid.",
"It's fine. Does the job, nothing special.",
]
async def main() -> None:
results = await asyncio.gather(*(agent.arun(r) for r in reviews))
for review, result in zip(reviews, results):
s = result.output # a Sentiment instance
print(f"{s.label:8} ({s.confidence:.2f}) {review}")
asyncio.run(main())Handling partial failures
By default asyncio.gather cancels the whole group on the first exception. To keep the successful runs and inspect failures individually, pass return_exceptions=True and branch on the result type.
async def main() -> None:
results = await asyncio.gather(
*(agent.arun(p) for p in prompts),
return_exceptions=True,
)
for prompt, result in zip(prompts, results):
if isinstance(result, Exception):
print(f"FAILED: {prompt[:30]}... -> {result!r}")
else:
print(f"OK: {prompt[:30]}... -> {result}")Budget and turn limits surface as exceptions here too — ErrorBudget and ErrorMaxTurns. See Errors and retries for the full hierarchy.
Reuse one agent across concurrent runs
An Agent is configuration, not session state. Its fields — model, instructions, tools, output schema, memory, history — are shared and read-only during a run. Each call to arun constructs its own fresh Context (with its own Usage, message list, retry counter, and step) for that run alone.
That means you should build the agent once and reuse it for the whole batch, rather than creating a new Agent per prompt.
import asyncio
from agentroute import Agent
# Build once: this is shared, immutable config.
agent = Agent(
name="translator",
model="claude-sonnet-4",
instructions="Translate the input to French.",
)
async def translate_all(texts: list[str]) -> list[str]:
# Each arun gets its own Context; the agent config is shared safely.
results = await asyncio.gather(*(agent.arun(t) for t in texts))
return [str(r) for r in results]
asyncio.run(translate_all(["Hello", "Goodbye", "Thank you"]))Anything that changes during a run — accumulated Usage, the running message list, the retry count — is stored on the run's Context, never on the Agent. That is what makes a single agent instance safe to drive from many concurrent arun calls.
Passing per-run dependencies
When runs need different inputs beyond the prompt (a user id, a DB handle, a tenant), pass them through deps. They land on ctx.deps inside your tools, isolated per run.
import asyncio
from dataclasses import dataclass
from agentroute import Agent, Context
@dataclass
class Deps:
user_id: str
agent = Agent(name="assistant", model="claude-sonnet-4")
@agent.tool
def whoami(ctx: Context) -> str:
"""Return the current user's id."""
return ctx.deps.user_id
async def main() -> None:
results = await asyncio.gather(
agent.arun("Who am I?", deps=Deps(user_id="alice")),
agent.arun("Who am I?", deps=Deps(user_id="bob")),
)
for r in results:
print(r)
asyncio.run(main())Reusing the agent is safe. Reusing things your tools touch may not be — a database connection, an HTTP client, or a writable cache shared across concurrent runs must itself be concurrency-safe. Prefer per-run resources via deps, or use an async-safe client. Note also that mutating an Agent field (for example registering a new tool) while runs are in flight is not supported; finish your configuration before you start the batch.
Bounding concurrency
Firing thousands of arun coroutines at once can hit provider rate limits. Cap the number in flight with an asyncio.Semaphore.
import asyncio
from agentroute import Agent
agent = Agent(name="worker", model="claude-sonnet-4")
sem = asyncio.Semaphore(8) # at most 8 concurrent runs
async def run_one(prompt: str) -> str:
async with sem:
result = await agent.arun(prompt)
return str(result)
async def main(prompts: list[str]) -> list[str]:
return await asyncio.gather(*(run_one(p) for p in prompts))For repeated transient failures, set Agent(max_cost=...) to cap spend per run and combine it with the retry-friendly patterns in Errors and retries.
Next steps
Full signatures for run, arun, deps, and every constructor field.
What arun returns: output, usage, messages, and interrupted.
Type your batch results with a pydantic model and read result.output.
Handle ErrorBudget, ErrorMaxTurns, and Retry across concurrent runs.