In this guide, we will walk through the process of creating custom nodes for Vellum Workflows using the Workflows SDK. Custom nodes allow you to extend the functionality of your Workflows by implementing specific logic tailored to your needs.

Overview

In this tutorial, we will create a simple workflow that:

  1. Fetches fake social media posts from an API
  2. Maintains strong types by validating the API response with Pydantic, via the Workflows SDK UniversalBaseModel
  3. Depending on runtime inputs, also fetches & appends post author data
  4. Passes the data to a Prompt Node to summarize the results using a language model.

Components

1. workflow.py

This file defines the main workflow. The workflow consists of a graph that connects a CustomNode (our data fetcher) to a SummarizeResultsNode. The Outputs class specifies the expected output of the workflow.

workflow.py
1from vellum.workflows import BaseWorkflow
2from .nodes.custom_node import CustomNode
3from .nodes.summarize_results_node import SummarizeResultsNode
4
5class Workflow(BaseWorkflow):
6 graph = CustomNode >> SummarizeResultsNode
7
8 class Outputs(BaseWorkflow.Outputs):
9 results = SummarizeResultsNode.Outputs.text

2. custom_node.py

In custom_node.py we define a CustomNode that extends BaseNode. The run method fetches fake social media posts from an API and, depending on the include_user_details input, fetches user details from a separate API and returns the results. This node demonstrates how you can use the run method to define custom runtime logic based on the node’s inputs.

In types.py we define response types using UniversalBaseModel (which inherits from Pydantic) to achieve static type checking & type inference throughout our Workflow. We also use pydash to coerce our response types from camel case to snake case, to achieve consistency with the code conventions we prefer (and simultaneously, make our linter happy).

1from typing import List
2
3import requests
4
5from vellum.workflows.nodes import BaseNode
6
7from examples.extend_base_node.inputs import Inputs
8from examples.extend_base_node.nodes.types import Post, PostsResponse, User
9
10
11class CustomNode(BaseNode):
12 search_query: str = Inputs.search_query
13 include_user_details = Inputs.include_user_details
14
15 class Outputs(BaseNode.Outputs):
16 posts: List[Post]
17
18 def run(self) -> BaseNode.Outputs:
19 raw_response = requests.get(f"https://dummyjson.com/posts/search?q={self.search_query}&limit=3", timeout=10).json()
20 response = PostsResponse.model_validate(raw_response)
21 posts = response.posts
22
23 if self.include_user_details:
24 for post in posts:
25 user_id = post.user_id
26 raw_user_response = requests.get(f"https://dummyjson.com/users/{user_id}", timeout=10).json()
27 user = User.model_validate(raw_user_response)
28 post.user = user
29
30 return self.Outputs(posts=posts)

3. summarize_results_node.py

This file defines a SummarizeResultsNode that extends InlinePromptNode. It uses GPT-4o-mini to summarize the posts data fetched by the CustomNode. The node is configured with prompt blocks that define how the input data should be processed.

nodes/summarize_results_node.py
1from vellum import ChatMessagePromptBlock, JinjaPromptBlock
2from vellum.workflows.nodes import InlinePromptNode
3from .nodes.custom_node import CustomNode
4
5class SummarizeResultsNode(InlinePromptNode):
6 ml_model = "gpt-4o-mini"
7 blocks = [
8 ChatMessagePromptBlock(
9 chat_role="SYSTEM",
10 blocks=[
11 JinjaPromptBlock(
12 block_type="JINJA",
13 template="Summarize the following social media posts: <posts>{{ posts }}</posts>",
14 ),
15 ],
16 ),
17 ]
18 prompt_inputs = {"results": CustomNode.Outputs.results}

Running the Workflow

To execute the workflow, create a script.py like the one below, and run it via python script.py. The workflow will fetch the social media posts, process them through the SummarizeResultsNode, and log the summary.

script.py
1import logging
2from .workflow import Workflow
3
4logger = logging.getLogger(__name__)
5
6logging.basicConfig(
7 level=logging.INFO,
8 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
9)
10if __name__ == "__main__":
11 workflow = Workflow()
12 terminal_event = workflow.run()
13 if terminal_event.name == "workflow.execution.fulfilled":
14 logger.info(terminal_event.outputs["results"])
15 else:
16 logger.error(terminal_event)

Output:

1. In the first post titled "The leather jacket showed the scars," he reflects
on a well-worn leather jacket that has become a symbol of pride and character
over the years. The jacket's scars are seen as enhancements rather than flaws,
indicating it is still in its prime. This post received 428 likes and 19
dislikes, with 765 views.
2. The second post, "Sometimes it's just better not to be seen," discusses a
character named Harry who prefers to remain unnoticed and blend into the
background. He is surprised when someone actually notices him, challenging his
self-perception. This post garnered 390 likes and 25 dislikes, with a total of 2,928 views.
Both posts are tagged with themes of French fiction and classic storytelling.

Conclusion

By following this guide, you have learned how to extend BaseNode to create custom nodes in Vellum Workflows. This approach allows you to tailor workflows to your specific requirements, integrating external data sources and processing them with advanced language models.

Built with