External Workflow Server

Note: This capability is under active development and considered to be in Beta. If you’re interested in deploying your organization’s Workflow, please reach out to Support.

In this tutorial, we show you how to deploy your Workflow to your own infrastructure. For the purposes of this tutorial, we will use AWS as the example cloud provider, but these steps should apply to any cloud provider that supports VM instances that can run Docker Images.

1. Stand up the VM Instance

First, create an EC2 Instance in your AWS account. Be sure to:

  • create a Key Pair so that you could SSH in, or have some way to run setup commands inside of the instance
  • ensure the instance belongs to a security group with sufficient inbound rules to allow HTTP requests on port 8000.

2. Setup Docker

Install Docker on the instance. Run the following commands to do so for an Amazon Linux Image:

$sudo yum update -y
>sudo yum install docker -y
>sudo systemctl start docker
>sudo systemctl enable docker
>sudo usermod -a -G docker ec2-user

After performing the last command, you will need to leave and return to the instance for the credentials to refresh.

3. Pull Vellum’s Workflow Server

Now, we will pull the latest Docker Image from Vellum’s Workflow Server. You can pull the latest tag, or a specific server version. Server versions match 1:1 with ones that get posted on the SDK releases.

$docker pull vellumai/python-workflow-runtime:1.10.4

4. Run the server

Now that the image is in the EC2 instance, you can run the server with:

$docker run -p 8000:8000 vellumai/python-workflow-runtime:1.10.4

Two logs that you’ll want to see are one specifying Running with SDK version: 1.10.4 and one alerting that the server is Listening at: http://0.0.0.0:8000 to confirm that the server is running properly

5. Deploy the Workflow to Vellum

By deploying the Workflow to Vellum, you create an deployment entry for which monitoring data could be sent to:

$vellum workflows push demo --deploy

6. Execute the Workflow!

There are some notable differences when invoking Workflows against the Vellum Workflow Server vs the traditional Vellum API. While we work to consolidate the two, you can use the following script to invoke it:

1import json
2from uuid import uuid4
3import requests
4import os
5from pydantic import TypeAdapter
6from vellum.client.types import WorkflowEvent
7
8workflow_event_type: TypeAdapter[WorkflowEvent] = TypeAdapter(
9 WorkflowEvent
10)
11
12
13def main():
14 """
15 This script is assumed to be in the same directory as
16 the `workflow.py` file.
17 """
18
19 parent_dir = os.path.dirname(os.path.abspath(__file__))
20 files = {}
21 current_file = os.path.basename(__file__)
22
23 for filename in os.listdir(parent_dir):
24 filepath = os.path.join(parent_dir, filename)
25 if not os.path.isfile(filepath):
26 continue
27 if not filename.endswith(".py"):
28 continue
29 if filename == current_file:
30 continue
31
32 with open(filepath, "r") as f:
33 files[filename] = f.read()
34
35 # grab this from the EC2 dashboard
36 # Should be `http://{public_ip_address}:8000`
37 base_url = os.environ["VELLUM_API_URL"]
38 execution_id = str(uuid4())
39
40 deployment_context = {
41 "type": "WORKFLOW_RELEASE_TAG",
42 "span_id": execution_id,
43 "release_tag_id": str(uuid4()),
44 "release_tag_name": "LATEST",
45 "external_id": None,
46 "metadata": None,
47 # Replace these with the actual values from the workflow deployment
48 "deployment_name": "demo",
49 "deployment_id": "85f9cfa9-8580-4b0d-bbf6-ba09185267da",
50 "deployment_history_item_id": "f2a57a88-5a3f-422b-9fe6-85dee0207c7b",
51 "workflow_version_id": '0c08e431-2f77-4f4e-bd67-706a590e8964',
52 }
53
54 response = requests.post(
55 f"{base_url}/workflow/stream",
56 json={
57 "files": files,
58 "environment_api_key": os.getenv("VELLUM_API_KEY"),
59 "module": __name__,
60 "execution_context": {
61 "trace_id": str(uuid4()),
62 "parent_context": deployment_context,
63 },
64 # Don't worry about the value of these fields.
65 # They are currently required but considered legacy and
66 # will be removed in the future.
67 "execution_id": execution_id,
68 },
69 timeout=10,
70 )
71
72 final_output = None
73 error = None
74 for line in response.iter_lines():
75 if not line:
76 continue
77 if b"vembda.execution" in line:
78 continue
79 if b"END" == line:
80 continue
81
82 event = workflow_event_type.validate_json(line)
83 if event.name == "workflow.execution.rejected":
84 error = event.body.error
85 continue
86
87 if event.name != "workflow.execution.fulfilled":
88 continue
89
90 final_output = event.body.outputs['final']
91
92 if error:
93 raise Exception(f"[{error.code}] {error.message}")
94
95 print(f"Success! Final output: {final_output}")
96 return final_output
97
98
99if __name__ == "__main__":
100 main()