Graphs
Don't use a nail gun unless you need a nail gun
If Pydantic AI agents are a hammer, and multi-agent workflows are a sledgehammer, then graphs are a nail gun:
- sure, nail guns look cooler than hammers
- but nail guns take a lot more setup than hammers
- and nail guns don't make you a better builder, they make you a builder with a nail gun
- Lastly, (and at the risk of torturing this metaphor), if you're a fan of medieval tools like mallets and untyped Python, you probably won't like nail guns or our approach to graphs. (But then again, if you're not a fan of type hints in Python, you've probably already bounced off Pydantic AI to use one of the toy agent frameworks — good luck, and feel free to borrow my sledgehammer when you realize you need it)
In short, graphs are a powerful tool, but they're not the right tool for every job. Please consider other multi-agent approaches before proceeding.
If you're not confident a graph-based approach is a good idea, it might be unnecessary.
Graphs and finite state machines (FSMs) are a powerful abstraction to model, execute, control and visualize complex workflows.
Alongside Pydantic AI, we've developed pydantic-graph — an async graph and state machine library for Python where nodes and edges are defined using type hints.
While this library is developed as part of Pydantic AI; it has no dependency on pydantic-ai and can be considered as a pure graph-based state machine library. You may find it useful whether or not you're using Pydantic AI or even building with GenAI.
pydantic-graph is designed for advanced users and makes heavy use of Python generics and type hints. It is not designed to be as beginner-friendly as Pydantic AI.
Installation
pydantic-graph is a required dependency of pydantic-ai, and an optional dependency of pydantic-ai-slim, see installation instructions for more information. You can also install it directly:
pip install pydantic-graph
uv add pydantic-graph
Graph Types
pydantic-graph is made up of a few key components:
GraphRunContext
GraphRunContext — The context for the graph run, similar to Pydantic AI's RunContext. This holds the state of the graph and dependencies and is passed to nodes when they're run.
GraphRunContext is generic in the state type of the graph it's used in, StateT.
End
End — return value to indicate the graph run should end.
End is generic in the graph return type of the graph it's used in, RunEndT.
Nodes
Subclasses of BaseNode define nodes for execution in the graph.
Nodes, which are generally dataclasses, generally consist of:
- fields containing any parameters required/optional when calling the node
- the business logic to execute the node, in the
runmethod - return annotations of the
runmethod, which are read bypydantic-graphto determine the outgoing edges of the node
Nodes are generic in:
- state, which must have the same type as the state of graphs they're included in,
StateThas a default ofNone, so if you're not using state you can omit this generic parameter, see stateful graphs for more information - deps, which must have the same type as the deps of the graph they're included in,
DepsThas a default ofNone, so if you're not using deps you can omit this generic parameter, see dependency injection for more information - graph return type — this only applies if the node returns
End.RunEndThas a default of Never so this generic parameter can be omitted if the node doesn't returnEnd, but must be included if it does.
Here's an example of a start or intermediate node in a graph — it can't end the run as it doesn't return End:
from dataclasses import dataclass
from pydantic_graph import BaseNode, GraphRunContext
@dataclass
class MyNode(BaseNode[MyState]): # (1)!
foo: int # (2)!
async def run(
self,
ctx: GraphRunContext[MyState], # (3)!
) -> AnotherNode: # (4)!
...
return AnotherNode()
- State in this example is
MyState(not shown), henceBaseNodeis parameterized withMyState. This node can't end the run, so theRunEndTgeneric parameter is omitted and defaults toNever. MyNodeis a dataclass and has a single fieldfoo, anint.- The
runmethod takes aGraphRunContextparameter, again parameterized with stateMyState. - The return type of the
runmethod isAnotherNode(not shown), this is used to determine the outgoing edges of the node.
We could extend MyNode to optionally end the run if foo is divisible by 5:
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, GraphRunContext
@dataclass
class MyNode(BaseNode[MyState, None, int]): # (1)!
foo: int
async def run(
self,
ctx: GraphRunContext[MyState],
) -> AnotherNode | End[int]: # (2)!
if self.foo % 5 == 0:
return End(self.foo)
else:
return AnotherNode()
- We parameterize the node with the return type (
intin this case) as well as state. Because generic parameters are positional-only, we have to includeNoneas the second parameter representing deps. - The return type of the
runmethod is now a union ofAnotherNodeandEnd[int], this allows the node to end the run iffoois divisible by 5.
Graph
Graph — the executable graph produced by a GraphBuilder. The builder is the entry point for assembling a graph from step functions, BaseNode classes, and the edges connecting them.
GraphBuilder is generic in:
- state the state type of the graph,
StateT - deps the deps type of the graph,
DepsT - input the type of the initial input passed to the graph,
InputT - output the type of the final output produced by the graph,
OutputT
Here's an example of a simple graph built from two BaseNode subclasses:
from __future__ import annotations
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, GraphBuilder, GraphRunContext, StepContext
@dataclass
class DivisibleBy5(BaseNode[None, None, int]): # (1)!
foo: int
async def run(
self,
ctx: GraphRunContext,
) -> Increment | End[int]:
if self.foo % 5 == 0:
return End(self.foo)
else:
return Increment(self.foo)
@dataclass
class Increment(BaseNode): # (2)!
foo: int
async def run(self, ctx: GraphRunContext) -> DivisibleBy5:
return DivisibleBy5(self.foo + 1)
g = GraphBuilder(input_type=int, output_type=int) # (3)!
@g.step
async def start(ctx: StepContext[None, None, int]) -> DivisibleBy5: # (4)!
return DivisibleBy5(ctx.inputs)
g.add(
g.node(DivisibleBy5), # (5)!
g.node(Increment),
g.edge_from(g.start_node).to(start), # (6)!
)
fives_graph = g.build() # (7)!
async def main():
result = await fives_graph.run(inputs=4) # (8)!
print(result)
#> 5
- The
DivisibleBy5node is parameterized withNonefor the state param andNonefor the deps param as this graph doesn't use state or deps, andintas it can end the run. - The
Incrementnode doesn't returnEnd, so theRunEndTgeneric parameter is omitted, state can also be omitted as the graph doesn't use state. - Create a
GraphBuilderdeclaring the input and output types of the graph. - Define a step that wraps the initial input as the first
BaseNode. The builder calls this when execution leavesg.start_node. - Register each
BaseNodesubclass withg.node()so the builder knows about it; outgoing edges are inferred from each node'srunreturn type. - Wire the start node into the entry step.
g.build()returns aGraphready to execute.graph.run()is async and returns the raw output value (theintreturned by theEndnode).
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
A mermaid diagram for this graph can be generated with print(fives_graph), or by calling fives_graph.render():
stateDiagram-v2
start
DivisibleBy5
state decision <<choice>>
Increment
[*] --> start
start --> DivisibleBy5
DivisibleBy5 --> decision
decision --> Increment
decision --> [*]
Increment --> DivisibleBy5
Stateful Graphs
The "state" concept in pydantic-graph provides an optional way to access and mutate an object (often a dataclass or Pydantic model) as nodes run in a graph. If you think of Graphs as a production line, then your state is the engine being passed along the line and built up by each node as the graph is run.
Here's an example of a graph which represents a vending machine where the user may insert coins and select a product to purchase.
from __future__ import annotations
from dataclasses import dataclass
from rich.prompt import Prompt
from pydantic_graph import BaseNode, End, GraphBuilder, GraphRunContext, StepContext
@dataclass
class MachineState: # (1)!
user_balance: float = 0.0
product: str | None = None
@dataclass
class InsertCoin(BaseNode[MachineState]): # (3)!
async def run(self, ctx: GraphRunContext[MachineState]) -> CoinsInserted: # (14)!
return CoinsInserted(float(Prompt.ask('Insert coins'))) # (4)!
@dataclass
class CoinsInserted(BaseNode[MachineState]):
amount: float # (5)!
async def run(
self, ctx: GraphRunContext[MachineState]
) -> SelectProduct | Purchase: # (15)!
ctx.state.user_balance += self.amount # (6)!
if ctx.state.product is not None: # (7)!
return Purchase(ctx.state.product)
else:
return SelectProduct()
@dataclass
class SelectProduct(BaseNode[MachineState]):
async def run(self, ctx: GraphRunContext[MachineState]) -> Purchase:
return Purchase(Prompt.ask('Select product'))
PRODUCT_PRICES = { # (2)!
'water': 1.25,
'soda': 1.50,
'crisps': 1.75,
'chocolate': 2.00,
}
@dataclass
class Purchase(BaseNode[MachineState, None, None]): # (16)!
product: str
async def run(
self, ctx: GraphRunContext[MachineState]
) -> End | InsertCoin | SelectProduct:
if price := PRODUCT_PRICES.get(self.product): # (8)!
ctx.state.product = self.product # (9)!
if ctx.state.user_balance >= price: # (10)!
ctx.state.user_balance -= price
return End(None)
else:
diff = price - ctx.state.user_balance
print(f'Not enough money for {self.product}, need {diff:0.2f} more')
#> Not enough money for crisps, need 0.75 more
return InsertCoin() # (11)!
else:
print(f'No such product: {self.product}, try again')
return SelectProduct() # (12)!
g = GraphBuilder(state_type=MachineState) # (13)!
@g.step
async def start(ctx: StepContext[MachineState, None, None]) -> InsertCoin:
return InsertCoin()
g.add(
g.node(InsertCoin),
g.node(CoinsInserted),
g.node(SelectProduct),
g.node(Purchase),
g.edge_from(g.start_node).to(start),
)
vending_machine_graph = g.build()
async def main():
state = MachineState() # (17)!
await vending_machine_graph.run(state=state) # (18)!
print(f'purchase successful item={state.product} change={state.user_balance:0.2f}')
#> purchase successful item=crisps change=0.25
- The state of the vending machine is defined as a dataclass with the user's balance and the product they've selected, if any.
- A dictionary of products mapped to prices.
- The
InsertCoinnode,BaseNodeis parameterized withMachineStateas that's the state used in this graph. - The
InsertCoinnode prompts the user to insert coins. We keep things simple by just entering a monetary amount as a float. - The
CoinsInsertednode; again this is adataclasswith one fieldamount. - Update the user's balance with the amount inserted.
- If the user has already selected a product, go to
Purchase, otherwise go toSelectProduct. - In the
Purchasenode, look up the price of the product if the user entered a valid product. - If the user did enter a valid product, set the product in the state so we don't revisit
SelectProduct. - If the balance is enough to purchase the product, adjust the balance to reflect the purchase and return
Endto end the graph. We're not using the run return type, so we callEndwithNone. - If the balance is insufficient, go to
InsertCointo prompt the user to insert more coins. - If the product is invalid, go to
SelectProductto prompt the user to select a product again. - Build the graph with
GraphBuilder, declaring theMachineStatetype. EachBaseNodesubclass is registered withg.node(); outgoing edges are inferred from therunreturn types. Thestartstep constructs the first node. - The return type of the node's
runmethod is important as it is used to determine the outgoing edges of the node. This information in turn is used to render mermaid diagrams and is enforced at runtime to detect misbehavior as soon as possible. - The return type of
CoinsInserted'srunmethod is a union, meaning multiple outgoing edges are possible. - Unlike other nodes,
Purchasecan end the run, so theRunEndTgeneric parameter must be set. In this case it'sNonesince the graph run return type isNone. - Initialize the state. This will be passed to the graph run and mutated as the graph runs.
- Run the graph with the initial state. The first node to execute is determined by the
startstep we wired intog.start_node.
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
A mermaid diagram for this graph can be generated with print(vending_machine_graph):
stateDiagram-v2
start
InsertCoin
CoinsInserted
state decision <<choice>>
Purchase
SelectProduct
state decision_2 <<choice>>
[*] --> start
start --> InsertCoin
InsertCoin --> CoinsInserted
CoinsInserted --> decision
decision --> Purchase
decision --> SelectProduct
SelectProduct --> Purchase
Purchase --> decision_2
decision_2 --> InsertCoin
decision_2 --> SelectProduct
decision_2 --> [*]
See below for more information on generating diagrams.
GenAI Example
So far we haven't shown an example of a Graph that actually uses Pydantic AI or GenAI at all.
In this example, one agent generates a welcome email to a user and the other agent provides feedback on the email.
This graph has a very simple structure:
---
title: feedback_graph
---
stateDiagram-v2
[*] --> WriteEmail
WriteEmail --> Feedback
Feedback --> WriteEmail
Feedback --> [*]
from __future__ import annotations as _annotations
from dataclasses import dataclass, field
from pydantic import BaseModel, EmailStr
from pydantic_ai import Agent, ModelMessage, format_as_xml
from pydantic_graph import BaseNode, End, GraphBuilder, GraphRunContext, StepContext
@dataclass
class User:
name: str
email: EmailStr
interests: list[str]
@dataclass
class Email:
subject: str
body: str
@dataclass
class State:
user: User
write_agent_messages: list[ModelMessage] = field(default_factory=list)
email_writer_agent = Agent(
'gateway/google:gemini-3-pro-preview',
output_type=Email,
instructions='Write a welcome email to our tech blog.',
)
@dataclass
class WriteEmail(BaseNode[State]):
email_feedback: str | None = None
async def run(self, ctx: GraphRunContext[State]) -> Feedback:
if self.email_feedback:
prompt = (
f'Rewrite the email for the user:\n'
f'{format_as_xml(ctx.state.user)}\n'
f'Feedback: {self.email_feedback}'
)
else:
prompt = (
f'Write a welcome email for the user:\n'
f'{format_as_xml(ctx.state.user)}'
)
result = await email_writer_agent.run(
prompt,
message_history=ctx.state.write_agent_messages,
)
ctx.state.write_agent_messages += result.new_messages()
return Feedback(result.output)
class EmailRequiresWrite(BaseModel):
feedback: str
class EmailOk(BaseModel):
pass
feedback_agent = Agent[object, EmailRequiresWrite | EmailOk](
'openai:gpt-5.2',
output_type=EmailRequiresWrite | EmailOk, # type: ignore
instructions=(
'Review the email and provide feedback, email must reference the users specific interests.'
),
)
@dataclass
class Feedback(BaseNode[State, None, Email]):
email: Email
async def run(
self,
ctx: GraphRunContext[State],
) -> WriteEmail | End[Email]:
prompt = format_as_xml({'user': ctx.state.user, 'email': self.email})
result = await feedback_agent.run(prompt)
if isinstance(result.output, EmailRequiresWrite):
return WriteEmail(email_feedback=result.output.feedback)
else:
return End(self.email)
g = GraphBuilder(state_type=State, output_type=Email)
@g.step
async def start(ctx: StepContext[State, None, None]) -> WriteEmail:
return WriteEmail()
g.add(
g.node(WriteEmail),
g.node(Feedback),
g.edge_from(g.start_node).to(start),
)
feedback_graph = g.build()
async def main():
user = User(
name='John Doe',
email='john.joe@example.com',
interests=['Haskel', 'Lisp', 'Fortran'],
)
state = State(user)
result = await feedback_graph.run(state=state)
print(result)
"""
Email(
subject='Welcome to our tech blog!',
body='Hello John, Welcome to our tech blog! ...',
)
"""
from __future__ import annotations as _annotations
from dataclasses import dataclass, field
from pydantic import BaseModel, EmailStr
from pydantic_ai import Agent, ModelMessage, format_as_xml
from pydantic_graph import BaseNode, End, GraphBuilder, GraphRunContext, StepContext
@dataclass
class User:
name: str
email: EmailStr
interests: list[str]
@dataclass
class Email:
subject: str
body: str
@dataclass
class State:
user: User
write_agent_messages: list[ModelMessage] = field(default_factory=list)
email_writer_agent = Agent(
'google:gemini-3-pro-preview',
output_type=Email,
instructions='Write a welcome email to our tech blog.',
)
@dataclass
class WriteEmail(BaseNode[State]):
email_feedback: str | None = None
async def run(self, ctx: GraphRunContext[State]) -> Feedback:
if self.email_feedback:
prompt = (
f'Rewrite the email for the user:\n'
f'{format_as_xml(ctx.state.user)}\n'
f'Feedback: {self.email_feedback}'
)
else:
prompt = (
f'Write a welcome email for the user:\n'
f'{format_as_xml(ctx.state.user)}'
)
result = await email_writer_agent.run(
prompt,
message_history=ctx.state.write_agent_messages,
)
ctx.state.write_agent_messages += result.new_messages()
return Feedback(result.output)
class EmailRequiresWrite(BaseModel):
feedback: str
class EmailOk(BaseModel):
pass
feedback_agent = Agent[object, EmailRequiresWrite | EmailOk](
'openai:gpt-5.2',
output_type=EmailRequiresWrite | EmailOk, # type: ignore
instructions=(
'Review the email and provide feedback, email must reference the users specific interests.'
),
)
@dataclass
class Feedback(BaseNode[State, None, Email]):
email: Email
async def run(
self,
ctx: GraphRunContext[State],
) -> WriteEmail | End[Email]:
prompt = format_as_xml({'user': ctx.state.user, 'email': self.email})
result = await feedback_agent.run(prompt)
if isinstance(result.output, EmailRequiresWrite):
return WriteEmail(email_feedback=result.output.feedback)
else:
return End(self.email)
g = GraphBuilder(state_type=State, output_type=Email)
@g.step
async def start(ctx: StepContext[State, None, None]) -> WriteEmail:
return WriteEmail()
g.add(
g.node(WriteEmail),
g.node(Feedback),
g.edge_from(g.start_node).to(start),
)
feedback_graph = g.build()
async def main():
user = User(
name='John Doe',
email='john.joe@example.com',
interests=['Haskel', 'Lisp', 'Fortran'],
)
state = State(user)
result = await feedback_graph.run(state=state)
print(result)
"""
Email(
subject='Welcome to our tech blog!',
body='Hello John, Welcome to our tech blog! ...',
)
"""
(This example is complete, it can be run "as is" — you'll need to add asyncio.run(main()) to run main)
Iterating Over a Graph
For step-by-step execution — inspecting each task as it runs, overriding the next step, or driving the loop manually — use graph.iter() instead of graph.run(). See Advanced Execution Control in the graph builder docs for the iteration model and examples.
Dependency Injection
As with Pydantic AI, pydantic-graph supports dependency injection. Pass a deps_type to GraphBuilder, parameterize each BaseNode subclass with the deps type, and read it via GraphRunContext.deps inside run() (or StepContext.deps inside step functions).
As an example, let's modify the DivisibleBy5 example above to use a ProcessPoolExecutor to run the compute load in a separate process (this is a contrived example, ProcessPoolExecutor wouldn't actually improve performance in this example):
from __future__ import annotations
import asyncio
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, GraphBuilder, GraphRunContext, StepContext
@dataclass
class GraphDeps:
executor: ProcessPoolExecutor
@dataclass
class DivisibleBy5(BaseNode[None, GraphDeps, int]):
foo: int
async def run(
self,
ctx: GraphRunContext[None, GraphDeps],
) -> Increment | End[int]:
if self.foo % 5 == 0:
return End(self.foo)
else:
return Increment(self.foo)
@dataclass
class Increment(BaseNode[None, GraphDeps]):
foo: int
async def run(self, ctx: GraphRunContext[None, GraphDeps]) -> DivisibleBy5:
loop = asyncio.get_running_loop()
compute_result = await loop.run_in_executor(
ctx.deps.executor,
self.compute,
)
return DivisibleBy5(compute_result)
def compute(self) -> int:
return self.foo + 1
g = GraphBuilder(deps_type=GraphDeps, input_type=int, output_type=int)
@g.step
async def start(ctx: StepContext[None, GraphDeps, int]) -> DivisibleBy5:
return DivisibleBy5(ctx.inputs)
g.add(
g.node(DivisibleBy5),
g.node(Increment),
g.edge_from(g.start_node).to(start),
)
fives_graph = g.build()
async def main():
with ProcessPoolExecutor() as executor:
deps = GraphDeps(executor)
result = await fives_graph.run(inputs=3, deps=deps)
print(result)
#> 5
(This example is complete, it can be run "as is" — you'll need to add asyncio.run(main()) to run main)
Mermaid Diagrams
Pydantic Graph can render mermaid stateDiagram-v2 diagrams for any built graph. Call graph.render() (or just print(graph)) to get the mermaid source — pass direction ('TB', 'LR', 'RL', or 'BT') to control layout. See the graph builder mermaid section for the full set of rendering options.