Defining Control Flow

Here are some examples of how to represent different graphs using the Vellum Workflows SDK.

Single Node Workflow

A single Node Workflow is the simplest possible Workflow, consisting of just one Node.

graph1.png

1class MyWorkflow(BaseWorkflow):
2 graph = StartNode

Serial Execution Between Two Nodes

Two Nodes connected in sequence, where the output of the first Node flows into the second Node. This is a fundamental pattern in Workflow design where a downstream operation directly depends on the output of an upstream operation.

graph2.png

1class MyWorkflow(BaseWorkflow):
2 graph = StartNode >> EndNode

Single Node Branches to Two Parallel Nodes

Here a single Node branches into two parallel execution paths, allowing multiple operations to run independently after the initial Node completes. This pattern is useful when you need to perform parallel processing or handle multiple aspects of a Workflow simultaneously.

graph3.png

1class MyWorkflow(BaseWorkflow):
2 graph = StartNode >> {
3 TopNode,
4 BottomNode,
5 }

Two Parallel Nodes Merge into One Node

Here two parallel execution paths merge back together into a single Node. This pattern is useful when you need to combine the results of multiple parallel operations before proceeding with the rest of the Workflow.

graph4.png

1class MyWorkflow(BaseWorkflow):
2 graph = StartNode >> {
3 TopNode,
4 BottomNode,
5 } >> EndNode

Conditional Routing with Ports

Ports enable conditional routing in workflows by allowing nodes to direct execution flow based on specific conditions. This is particularly useful for implementing branching logic where different paths should be taken based on the results of upstream nodes.

1class MyWorkflow(BaseWorkflow):
2 graph = {
3 ConditionalNode.Ports.if_port >> SomeNode,
4 ConditionalNode.Ports.else_port >> SomeOtherNode,
5 } >> FinalOutputNode

In this example, the ConditionalNode evaluates a condition and routes execution to either SomeNode (if the condition is true) or SomeOtherNode (if the condition is false). Both paths then converge at the FinalOutputNode.

For more details on how to define conditions and work with ports, see the Ports and Conditionals section in Core Concepts.

Loops

Loops are a powerful pattern in Workflow design that allow you to repeat a series of operations until a certain condition is met. Many agentic AI systems tend to include loops.

image.png

1class LoopNode(BaseNode):
2 class Ports(BaseNode.Ports):
3 loop = Port.on_if(Input.score.less_than(0.5))
4 exit = Port.on_else()
5
6class MyWorkflow(BaseWorkflow):
7 graph = StartNode >> {
8 LoopNode.Ports.loop >> StartNode,
9 LoopNode.Ports.exit >> ExitNode,
10 }

Map

The Map pattern is useful when you need to apply the same operation to a list of items.

image.png

1@MapNode.wrap(items=Input.items)
2class PromptNode(BaseNode):
3 ...
4
5class MyWorkflow(BaseWorkflow):
6 graph = PromptNode

Merge Strategies for Parallel Execution

When you have parallel execution paths that converge into a single downstream node, different node types handle this convergence differently. Understanding these merge strategies is crucial for building robust workflows.

Prompt Nodes: Use AWAIT_ATTRIBUTES

For Prompt Nodes, it’s recommended to use AWAIT_ATTRIBUTES merge strategy, which waits only for the specific input attributes that the node actually uses:

1from vellum import ChatMessagePromptBlock, JinjaPromptBlock, PromptParameters, PromptSettings
2from vellum.workflows.nodes.displayable import InlinePromptNode
3from vellum.workflows.types.core import MergeBehavior
4
5from .constant_one import ConstantOne
6from .constant_two import ConstantTwo
7
8
9class MergePrompt(InlinePromptNode):
10 ml_model = "gpt-5-responses"
11 blocks = [
12 ChatMessagePromptBlock(
13 chat_role="SYSTEM",
14 blocks=[
15 JinjaPromptBlock(
16 template="""\
17You will receive two messages. Please combine them into a single response.
18
19First message: {{ message_one }}
20Second message: {{ message_two }}
21
22Please create a thoughtful response that incorporates both messages.\
23"""
24 )
25 ],
26 ),
27 ]
28 prompt_inputs = {
29 "message_one": ConstantOne.Outputs.result,
30 "message_two": ConstantTwo.Outputs.result,
31 }
32 parameters = PromptParameters(
33 stop=[],
34 temperature=None,
35 max_tokens=500,
36 top_p=0,
37 top_k=None,
38 frequency_penalty=0,
39 presence_penalty=0,
40 logit_bias=None,
41 custom_parameters={
42 "json_mode": False,
43 },
44 )
45 settings = PromptSettings(stream_enabled=False)
46
47 class Trigger(InlinePromptNode.Trigger):
48 merge_behavior = MergeBehavior.AWAIT_ALL
49
50class MyWorkflow(BaseWorkflow):
51 graph = {
52 ConstantOne,
53 ConstantTwo,
54 } >> MergePrompt

Custom Nodes: Default AWAIT_ATTRIBUTES Behavior

Custom Nodes (extending BaseNode) automatically use AWAIT_ATTRIBUTES by default, waiting only for their referenced inputs:

1class ProcessingNode(BaseNode):
2 # Inputs from parallel upstream nodes
3 data_a = NodeA.Outputs.result
4 data_b = NodeB.Outputs.result
5
6 class Outputs(BaseNode.Outputs):
7 combined_result: str
8
9 def run(self) -> Outputs:
10 # This node automatically waits for both NodeA and NodeB to complete
11 # before executing, since it references both outputs as inputs
12 return self.Outputs(
13 combined_result=f"Combined: {self.data_a} + {self.data_b}"
14 )
15
16class MyWorkflow(BaseWorkflow):
17 graph = {
18 NodeA,
19 NodeB,
20 } >> ProcessingNode

Merge Nodes: Use AWAIT_ALL

For dedicated merge operations, use AWAIT_ALL to ensure all upstream nodes complete before proceeding:

1from vellum.workflows.nodes.displayable import MergeNode
2from vellum.workflows.types import MergeBehavior
3
4
5class Merge(MergeNode):
6 class Trigger(MergeNode.Trigger):
7 merge_behavior = MergeBehavior.AWAIT_ALL
8
9class MyWorkflow(BaseWorkflow):
10 graph = {
11 NodeA,
12 NodeB,
13 } >> Merge >> TemplatingNode

When you need to merge parallel paths before node types that don’t support automatic input waiting (like Templating Nodes), you should use a MergeNode with AWAIT_ALL strategy first.