Data transformer
Convert data between JSON, CSV, and YAML with a typed result, then transform many payloads concurrently with asyncio.gather.
What it does
This example builds an agent that converts a single payload between data formats (JSON, CSV, YAML) and returns a typed TransformResult so your code reads result.records_processed instead of parsing prose. It then shows how to run the same agent over a whole batch of payloads concurrently using asyncio.gather over agent.arun, which replaces any one-off batch runner.
It exercises two concepts:
- Structured output —
output=TransformResultmakes the agent return a validated Pydantic instance. - Async and concurrency —
asyncio.gatherfans out manyaruncalls at once.
The agent
The agent declares a Pydantic model as its output. AgentRoute derives a schema from it, instructs the model to fill it in, and parses the reply back into an instance on result.output.
import asyncio
from pydantic import BaseModel, Field
from agentroute import Agent
class TransformResult(BaseModel):
"""The outcome of a single format conversion."""
result: str = Field(description="The converted data, as a string in the target format.")
records_processed: int = Field(description="How many top-level records were converted.")
warnings: list[str] = Field(
default_factory=list,
description="Any non-fatal issues, e.g. dropped fields or coerced types.",
)
agent = Agent(
name="data-transformer",
model="claude-sonnet-4",
instructions=(
"You convert structured data between JSON, CSV, and YAML. "
"Preserve every field and value exactly; never invent data. "
"If a value cannot be represented cleanly in the target format, "
"convert it as best you can and add a warning explaining what changed. "
"Count each top-level record (object or row) in records_processed."
),
output=TransformResult,
)
def transform(data: str, to_format: str) -> TransformResult:
"""Convert one payload and return the typed result."""
result = agent.run(f"Convert this data to {to_format}:\n\n{data}")
return result.output
if __name__ == "__main__":
csv_data = "name,role,active\nAda,engineer,true\nGrace,admin,false"
out = transform(csv_data, to_format="JSON")
print(out.result)
print(f"records: {out.records_processed}")
for warning in out.warnings:
print(f"warning: {warning}")Because output=TransformResult, result.output is a TransformResult instance, not a string. The agent.run(...) call is synchronous; it wraps arun for you.
The model only sees the field names, types, and description text from your Pydantic model, so write descriptions as if they were instructions. Clear field docs are the cheapest way to improve structured output.
Transforming many payloads concurrently
Each transform is one independent agent run, which makes the batch a perfect fit for asyncio.gather. Use agent.arun (the async entrypoint) for every payload and await them together — the runs overlap on network I/O instead of executing one after another.
import asyncio
from data_transformer import TransformResult, agent
async def transform_one(data: str, to_format: str) -> TransformResult:
result = await agent.arun(f"Convert this data to {to_format}:\n\n{data}")
return result.output
async def transform_many(jobs: list[tuple[str, str]]) -> list[TransformResult]:
"""Run every (data, to_format) job concurrently."""
return await asyncio.gather(
*(transform_one(data, to_format) for data, to_format in jobs)
)
async def main() -> None:
jobs = [
('{"city": "Oslo", "pop": 700000}', "YAML"),
("a,b\n1,2\n3,4", "JSON"),
("- name: Ada\n- name: Grace", "CSV"),
]
results = await transform_many(jobs)
for (data, to_format), out in zip(jobs, results):
print(f"-> {to_format} ({out.records_processed} records)")
print(out.result)
print()
if __name__ == "__main__":
asyncio.run(main())One Agent instance is safe to reuse across concurrent arun calls: runtime state (usage, messages, retry counter) lives on a fresh Context created per run, not on the agent. To inspect cost across the batch, read result.usage from each run before unwrapping .output.
There is no dedicated batch processor in the SDK — and you do not need one. asyncio.gather over arun is the supported pattern for fan-out. See Async and concurrency for backpressure tips (chunking large batches, or bounding parallelism with an asyncio.Semaphore).
Run it
pip install agentroute
export AGENTROUTE_API_KEY=sk-... # one OpenRouter key works for every model
python data_transformer.py # single conversion
python batch_transform.py # concurrent batch