Core Concepts

Defining a Workflow

All Vellum Workflows extend from the BaseWorkflow class. Workflows define the control flow of your application, orchestrating the order of execution between each Node.

Workflows can be invoked via a run method, which returns the final event that was emitted by the Workflow.

1class MyWorkflow(BaseWorkflow):
2 pass
3
4workflow = MyWorkflow()
5final_event = workflow.run()
6
7assert final_event.name == "workflow.execution.fulfilled"

In the example above, final_event has a name of "workflow.execution.fulfilled". This indicates that the Workflow ran to completion successfully. Had the Workflow encountered an error, the name would have been "workflow.execution.rejected".

Workflow Outputs

You can think of a Workflow as a black box that produces values for pre-defined outputs. To specify the outputs of a Workflow, you must define an Outputs class that extends from BaseWorkflow.Outputs.

Here is a very basic Workflow that defines a single output called hello with a hard-coded return value of the string "world".

1class MyWorkflow(BaseWorkflow):
2 class Outputs(BaseWorkflow.Outputs):
3 greeting = "Hello, world!"
4
5
6workflow = MyWorkflow()
7final_event = workflow.run()
8
9assert final_event.name == "workflow.execution.fulfilled"
10assert final_event.outputs.greeting == "Hello, world!"

Defining Nodes

Nodes are the building blocks of a Workflow and are responsible for executing a specific task. All Nodes in a Workflow must extend from the BaseNode class.

Here we define a very simple custom Node called GreetingNode that overrides the run method to print "Hello, world!" to the console. Notably, this Node doesn’t produce any outputs (yet!).

1class GreetingNode(BaseNode):
2 def run(self) -> BaseNode.Outputs:
3 print("Hello, world!")
4 return self.Outputs()

Defining Node Outputs

Most Nodes produce Outputs that can be referenced elsewhere in the Workflow. Just like a Workflow, a Node defines its outputs via an Outputs class, this time, extending from BaseNode.Outputs.

Here we define a GreetingNode that produces a single output of type str called greeting. The run method returns an instance of GreetingNode.Outputs with the greeting attribute set to "Hello, world!".

1class GreetingNode(BaseNode):
2 class Outputs(BaseNode.Outputs):
3 greeting: str
4
5 def run(self) -> BaseNode.Outputs:
6 greeting = "Hello, world!"
7 print(greeting)
8 return self.Outputs(greeting=greeting)

Using a Node in a Workflow

Nodes are executed as part of a Workflow once they’re added to the Workflow’s graph attribute. Once added, a Node’s output can be used as the Workflow’s output.

1class GreetingNode(BaseNode):
2 class Outputs(BaseNode.Outputs):
3 greeting: str
4
5 def run(self) -> BaseNode.Outputs:
6 greeting = "Hello, world!"
7 print(greeting)
8 return self.Outputs(greeting=greeting)
9
10class MyWorkflow(BaseWorkflow):
11 # Add the GreetingNode to the Workflow's graph
12 graph = GreetingNode
13
14 class Outputs(BaseWorkflow.Outputs):
15 # Use the GreetingNode's output as the Workflow's output
16 greeting = GreetingNode.Outputs.greeting
17
18workflow = MyWorkflow()
19final_event = workflow.run()
20
21assert final_event.name == "workflow.execution.fulfilled"
22assert final_event.outputs.greeting == "Hello, world!"

Workflow Inputs

The runtime behavior of a Workflow almost always depends on some set of input values that are provided at the time of execution.

You can define a Workflow’s inputs via an Inputs class that extends from BaseInputs and that’s then referenced in the Workflow’s parent class as a generic type.

Here’s a Workflow that defines a single input called greeting of type str and simply passes it through as an output.

1class Inputs(BaseInputs):
2 greeting: str
3
4class MyWorkflow(BaseWorkflow[Inputs, BaseState]):
5 class Outputs(BaseWorkflow.Outputs):
6 greeting = Inputs.greeting
7
8workflow = MyWorkflow()
9final_event = workflow.run(inputs=Inputs(greeting="Hello, world!"))
10
11assert final_event.name == "workflow.execution.fulfilled"
12assert final_event.outputs.greeting == "Hello, world!"

Node Attributes

A Workflow’s inputs are usually used to drive the behavior of its Nodes. Nodes can reference these inputs via class attributes that are resolved at runtime.

Below we drive the behavior of a GreetingNode by specifying noun = Inputs.noun as a class attribute, then referencing self.noun in the run method to produce a dynamic greeting.

1class Inputs(BaseInputs):
2 noun: str
3
4class GreetingNode(BaseNode):
5 noun = Inputs.noun
6
7 class Outputs(BaseNode.Outputs):
8 greeting: str
9
10 def run(self) -> Outputs:
11 return self.Outputs(greeting=f"Hello, {self.noun}!")
12
13class MyWorkflow(BaseWorkflow[Inputs, BaseState]):
14 graph = GreetingNode
15
16 class Outputs(BaseWorkflow.Outputs):
17 hello = GreetingNode.Outputs.greeting
18
19workflow = MyWorkflow()
20
21# Run it once with "world"
22final_event = workflow.run(inputs=Inputs(noun="world"))
23
24assert final_event.name == "workflow.execution.fulfilled"
25assert final_event.outputs.hello == "Hello, world!"
26
27# Run it again with "universe"
28final_event = workflow.run(inputs=Inputs(noun="universe"))
29
30assert final_event.name == "workflow.execution.fulfilled"
31assert final_event.outputs.hello == "Hello, universe!"
Descriptors

Inputs.noun is what we call a “descriptor” and is not a literal value. Think of it like a pointer or reference whose value is resolved at runtime. If you were to call Inputs.noun within a node’s run method instead of self.noun an exception would be raised.

Control Flow

Defining Control Flow

Until now, we’ve only defined Workflows that contain a single Node – not very interesting! Most Workflows orchestrate the execution of multiple Nodes in a specific order. This is achieved by defining a graph attribute with a special syntax that describes the control flow between Nodes.

Here we define three Nodes, GreetingNode, EndNode, and AggregatorNode, then define the order of their execution by using the >> operator.

1class MyWorkflow(BaseWorkflow):
2 graph = GreetingNode >> EndNode >> AggregatorNode
3
4 class Outputs(BaseWorkflow.Outputs):
5 results = AggregatorNode.Outputs.results
6
7workflow = MyWorkflow()
8final_event = workflow.run()
9
10assert final_event.name == "workflow.execution.fulfilled"
11assert final_event.outputs.results == ["Hello, world!", "Goodbye, world!"]

Ports and Conditionals

Nodes contain Ports and use them to determine which Nodes to execute next. Ports are useful for performing branching logic and conditional execution of subsequent Nodes.

We haven’t seen any Ports up until now, but they’re actually present in every Node. By default, a Node has a single Port called default, which is always invoked after the Node’s run method completes.

The following Workflows are equivalent:

1class MyWorkflow1(BaseWorkflow):
2 graph = GreetingNode >> SomeNode,
3
4 class Outputs(BaseWorkflow.Outputs):
5 result = "Hello"
6
7class MyWorkflow2(BaseWorkflow):
8 graph = GreetingNode.Ports.default >> WinnerNode,
9
10 class Outputs(BaseWorkflow.Outputs):
11 result = "Hello"

You can explicitly define a Ports class on a Node and define the conditions in which one Node or another should execute. Below, we define a SwitchNode that has a winner Port and a loser Port.

1class SwitchNode(BaseNode):
2 class Ports(BaseNode.Ports):
3 # Invoke the `winner` Port if the `StartNode`'s `score` output is greater than `5`
4 winner = Port.on_if(StartNode.Outputs.score.greater_than(5))
5 # Otherwise, invoke the `loser` Port
6 loser = Port.on_else()
7
8class MyWorkflow(BaseWorkflow):
9 graph = StartNode >> {
10 SwitchNode.Ports.winner >> WinnerNode,
11 SwitchNode.Ports.loser >> LoserNode,
12 }
13
14 class Outputs(BaseWorkflow.Outputs):
15 result = WinnerNode.Outputs.result.coalesce(LoserNode.Ports.result)
16
17
18workflow = MyWorkflow()
19final_event = workflow.run()
20
21assert final_event.name == "workflow.execution.fulfilled"
22assert final_event.outputs.result in ("We won!", "We lost :(")

Notice that we use the greater_than Expression to define the winner Port— more on Expressions next.

Expressions

Descriptors support a declarative syntax for defining Expressions. Expressions are usually used in conjunction with Ports to define conditional execution of subsequent Nodes, but can also be used as short-hand for performing simple operations that would otherwise have to be manually defined in a Node’s run method.

Here we define a StartNode that produces a random score between 0 and 10. We then define an EndNode that has a single output called winner that is True if the score is greater than 5.

For example, the longform definition of a Node that relies on StartNode.Outputs.score would look like this:

1class EndNode(BaseNode):
2 score = StartNode.Outputs.score
3
4 class Outputs(BaseNode.Outputs):
5 winner: bool
6
7 def run(self) -> Outputs:
8 return self.Outputs(winner=self.score > 5)

And the shortform using an Expression would look like this:

1class EndNode(BaseNode):
2
3 class Outputs(BaseNode.Outputs):
4 winner = StartNode.Outputs.score.greater_than(5)

Triggers

In some cases, you may want to delay the execution of a Node until a certain condition is met. For example, you may want to wait for multiple upstream Nodes to complete before executing a Node, like when executing Nodes in parallel. This is where Triggers come in.

Just as Nodes define a Ports class implicitly by default, they also define a Trigger class implicitly by default. Here’s what the default Trigger class looks like:

1class Trigger(BaseNode.Trigger):
2 merge_behavior = MergeStrategy.AWAIT_ANY

This means that by default, a Node will execute as soon as any one of its immediately upstream Nodes have fulfilled. You might instead want to wait until all of its upstream Nodes have fulfilled. To do this, you can explicitly define a Trigger class on a Node like so:

1class Trigger(BaseNode.Trigger):
2 merge_behavior = MergeStrategy.AWAIT_ALL

Here’s a complete example:

1class QuickNode(BaseNode):
2 class Outputs(BaseNode.Outputs):
3 prefix = "Hello"
4
5class SlowNode(BaseNode):
6 class Outputs(BaseNode.Outputs):
7 suffix: str
8
9 def run(self) -> Outputs:
10 time.sleep(5)
11 return self.Outputs(suffix="World")
12
13
14class MergeNode(BaseNode):
15 prefix = QuickNode.Outputs.prefix
16 suffix = SlowNode.Outputs.suffix
17
18 class Outputs(BaseNode.Outputs):
19 message: str
20
21 class Trigger(BaseNode.Trigger):
22 merge_strategy = MergeStrategy.AWAIT_ALL
23
24 def run(self) -> Outputs:
25 return self.Outputs(message=f"{self.prefix} {self.suffix}")
26
27
28class MyWorkflow(BaseWorkflow):
29 graph = {
30 QuickNode,
31 SlowNode,
32 } >> MergeNode
33
34 class Outputs(BaseWorkflow.Outputs):
35 result = MergeNode.Outputs.message
36
37
38workflow = MyWorkflow()
39final_event = workflow.run()
40
41assert final_event.name == "workflow.execution.fulfilled"
42assert final_event.outputs.result == "Hello World"

It’s usually sufficient to stick with the “Await All” and “Await Any” merge behaviors that are provided out-of-box. However, you can also define your own custom merge behaviors by overriding the Trigger class’s should_initiate method. By doing so, you can access any information about the Node’s dependencies or the Workflow’s State (more on State later).

Parallel Execution

You may want to run multiple execution paths in parallel. For example, if you want to run multiple LLM prompts concurrently, or respond to a user while performing background tasks. To do this, you can use “set syntax” as follows:

1class FirstNode(TimeSinceStartNode):
2 pass
3
4
5class SecondNode(TimeSinceStartNode):
6 pass
7
8class BasicParallelizationWorkflow(BaseWorkflow):
9 graph = StartNode >> {
10 FirstNode,
11 SecondNode,
12 }
13
14 class Outputs(BaseWorkflow.Outputs):
15 first_node_time: int = FirstNode.Outputs.total_time
16 second_node_time: int = SecondNode.Outputs.total_time
17
18workflow = BasicParallelizationWorkflow()
19final_event = workflow.run()
20
21assert final_event.name == "workflow.execution.fulfilled"
22assert final_event.outputs.first_node_time == 1
23assert final_event.outputs.second_node_time == 1

State

In most cases it’s sufficient to drive a Node’s behavior based on either inputs to the Workflow, or the outputs of upstream Nodes. However, Workflow’s also support writing to and reading from a global state object that lives for the duration of the Workflow’s execution.

Here’s an example of how to define the schema of a State object and use it in a Workflow.

1class State(BaseState):
2 items: Set[int]
3
4
5class TopNode(BaseNode[State]):
6 def run(self) -> BaseNode.Outputs:
7 self.state.items.add(random.randint(0, 10))
8 return self.Outputs()
9
10
11class BottomNode(BaseNode[State]):
12 def run(self) -> BaseNode.Outputs:
13 self.state.items.add(random.randint(10, 20))
14 return self.Outputs()
15
16
17class MergeNode(BaseNode):
18 all_items = State.items
19
20 class Outputs(BaseNode.Outputs):
21 total: int
22
23 class Trigger(BaseNode.Trigger):
24 merge_strategy = MergeStrategy.AWAIT_ALL
25
26 def run(self) -> Outputs:
27 return self.Outputs(total=len(self.state.all_items))
28
29
30class MyWorkflow(BaseWorkflow[BaseInputs, State]):
31 graph = {
32 TopNode,
33 BottomNode,
34 } >> MergeNode
35
36 class Outputs(BaseWorkflow.Outputs):
37 result = MergeNode.Outputs.total
38
39
40workflow = MyWorkflow()
41final_event = workflow.run()
42
43assert final_event.name == "workflow.execution.fulfilled"
44assert final_event.outputs.result == 2
Even if no State class is explicitly defined, Workflows use State under the hood to track all information about a Workflow’s execution. This information is stored under the reserved meta attribute on the State class and can be accessed for your own purposes.

Streaming Outputs

Workflow Event Streaming

Until now, we’ve only seen the run() method being invoked on Workflows we’ve defined. run() is a blocking call that waits for the Workflow to complete before returning a terminal fulfilled or rejected event.

In some cases, you may want to stream the events a Workflow produces as they’re being emitted. This is useful when your Workflow produces outputs along the way, and you want to consume them in real-time.

You can do this via the stream() method, which returns a Generator that yields events as they’re produced.

1class Inputs(BaseInputs):
2 boost: int
3
4class StartNode(BaseNode):
5 boost = Inputs.boost
6 class Outputs(BaseNode.Outputs):
7 score: int
8
9 def run(self) -> Outputs:
10 return self.Outputs(score=random.randint(0, 10) + self.boost)
11
12
13class EndNode(BaseNode):
14 class Outputs(BaseNode.Outputs):
15 winner = StartNode.Outputs.score.greater_than(15)
16
17
18class MyWorkflow(BaseWorkflow):
19 graph = StartNode >> EndNode
20
21 class Outputs(BaseWorkflow.Outputs):
22 score = StartNode.Outputs.score
23 winner = EndNode.Outputs.winner
24
25
26workflow = MyWorkflow()
27events = workflow.stream(inputs=Inputs(boost=10))
28
29for event in events:
30 if event.name == "workflow.execution.initiated":
31 assert event.inputs.boost == 10
32 elif event.name == "workflow.execution.fulfilled":
33 assert event.outputs.winner is True
34 elif event.name == "workflow.execution.streaming":
35 if event.output.name == "score":
36 assert event.output.value > 10
37 elif event.output.name == "winner":
38 assert event.output.value is True

Node Event Streaming

By default, when you call a Workflow’s stream() method, you’ll only receive Workflow-level events. However, you may also opt in to receive Node-level events by specifying the event_types parameter.

With this, you can receive the events that Nodes in the Workflow produce as they’re emitted. This is useful when you want to inspect the outputs of individual Nodes for debugging purposes.

1class Inputs(BaseInputs):
2 boost: int
3
4
5class StartNode(BaseNode):
6 boost = Inputs.boost
7
8 class Outputs(BaseNode.Outputs):
9 score: int
10
11 def run(self) -> Outputs:
12 return self.Outputs(score=random.randint(0, 10) + self.boost)
13
14
15class EndNode(BaseNode):
16 class Outputs(BaseNode.Outputs):
17 winner = StartNode.Outputs.score.greater_than(15)
18
19
20class MyWorkflow(BaseWorkflow):
21 graph = StartNode >> EndNode
22
23 class Outputs(BaseWorkflow.Outputs):
24 winner = EndNode.Outputs.winner
25
26
27workflow = MyWorkflow()
28events = workflow.stream(
29 inputs=Inputs(boost=10),
30 event_types={
31 WorkflowEventType.NODE,
32 WorkflowEventType.WORKFLOW,
33 },
34)
35
36for event in events:
37 if event.name == "workflow.execution.initiated":
38 assert event.inputs.boost == 10
39 elif event.name == "workflow.execution.fulfilled":
40 assert event.outputs.winner is True
41 elif event.name == "node.execution.fulfilled":
42 if event.node_class is StartNode:
43 assert event.outputs.score > 10
44 elif event.node_class is EndNode:
45 assert event.outputs.winner is True
Built with