Long Running Workflows

When building production applications with Vellum Workflows, you may encounter scenarios where Workflows take several minutes or longer to complete. This guide covers best practices for handling these long-running Workflows effectively.

Understanding the Challenge

Long-running Workflows can present challenges in production environments:

  • Client timeouts: HTTP clients may timeout before Workflow completion
  • Resource constraints: Keeping connections open for extended periods
  • Error handling: Managing failures in long-running processes
  • User experience: Providing feedback during extended operations

The most robust approach for long-running Workflows is to execute them asynchronously and use webhooks to receive completion notifications. See our Webhooks documentation for detailed setup instructions.

1

Configure Webhooks

Set up webhook endpoints in your Vellum organization to receive Workflow execution events:

  1. Navigate to Organization SettingsWebhooks
  2. Add your webhook endpoint URL
  3. Configure authentication (API key, Bearer token, or HMAC)
  4. Select the events you want to receive:
    • workflow.execution.initiated
    • workflow.execution.fulfilled
    • workflow.execution.rejected

For security, we recommend using HMAC authentication for webhook endpoints. See our HMAC Authentication guide for implementation details.

2

Execute Workflow

Use the streaming endpoint to initiate your Workflow and immediately receive an execution_id for correlation. You can also provide your own external_id (such as a job_id, content_id, document_id, or any entity in your system):

1import vellum
2
3client = vellum.VellumClient(api_key="your-api-key")
4
5# Use streaming endpoint to get execution_id immediately
6stream = client.execute_workflow_stream(
7 workflow_deployment_name="your-workflow",
8 inputs=[
9 vellum.WorkflowRequestStringInput(
10 name="user_query",
11 value="Process this complex request"
12 )
13 ],
14 external_id="task-12345" # Your internal task ID for correlation
15)
16
17# Get execution_id from first event and terminate
18for event in stream:
19 if event.type == "workflow.execution.initiated":
20 execution_id = event.execution_id
21 print(f"Workflow started with execution_id: {execution_id}")
22 # Store execution_id for later correlation with webhook
23 break
3

Handle Webhook Events

Process webhook events to update your application state:

1from flask import Flask, request, jsonify
2import hmac
3import hashlib
4
5app = Flask(__name__)
6
7@app.route('/webhook/vellum', methods=['POST'])
8def handle_vellum_webhook():
9 # Verify HMAC signature (recommended)
10 signature = request.headers.get('X-Vellum-Signature')
11 timestamp = request.headers.get('X-Vellum-Timestamp')
12
13 if not verify_hmac_signature(request.data, signature, timestamp):
14 return jsonify({'error': 'Invalid signature'}), 401
15
16 event = request.json
17
18 if event['type'] == 'workflow.execution.fulfilled':
19 # Workflow completed successfully
20 external_id = event['data']['parent']['external_id']
21 outputs = event['data']['outputs']
22
23 # Update your internal task status
24 update_task_status(external_id, 'completed', outputs)
25
26 elif event['type'] == 'workflow.execution.rejected':
27 # Workflow failed
28 external_id = event['data']['parent']['external_id']
29 error = event['data']['error']
30
31 # Update your internal task status
32 update_task_status(external_id, 'failed', error)
33
34 return jsonify({'status': 'received'}), 200
35
36def verify_hmac_signature(payload, signature, timestamp):
37 # Implement HMAC verification
38 # See our HMAC Authentication documentation for implementation details:
39 # https://docs.vellum.ai/product/security/hmac-authentication
40 pass
41
42def update_task_status(external_id, status, data):
43 # Update your database/system with the Workflow result
44 print(f"Task {external_id} status: {status}")

Option 2: Polling

Use the streaming endpoint to initiate a Workflow and immediately receive an execution_id (API reference) — or you can provide your own external_id — and then poll for the execution results using the Retrieve Workflow Deployment Execution Event endpoint:

1import vellum
2import time
3
4client = vellum.VellumClient(api_key="your-api-key")
5
6# Step 1: Start Workflow and get execution_id
7stream = client.execute_workflow_stream(
8 workflow_deployment_name="your-workflow",
9 inputs=[
10 vellum.WorkflowRequestStringInput(
11 name="user_query",
12 value="Process this complex request"
13 )
14 ],
15 external_id="task-12345" # Optional: your internal ID
16)
17
18execution_id = None
19for event in stream:
20 if event.type == "workflow.execution.initiated":
21 execution_id = event.execution_id
22 print(f"Workflow started with execution_id: {execution_id}")
23 break
24
25# Step 2: Poll for results
26while True:
27 try:
28 # Retrieve execution event
29 execution_event = client.retrieve_workflow_deployment_execution_event(
30 execution_id=execution_id
31 )
32
33 if execution_event.state == "FULFILLED":
34 print("Workflow completed successfully!")
35 print(f"Results: {execution_event.outputs}")
36 break
37 elif execution_event.state == "REJECTED":
38 print("Workflow failed!")
39 print(f"Error: {execution_event.error}")
40 break
41 else:
42 print(f"Workflow still running... State: {execution_event.state}")
43 time.sleep(30) # Poll every 30 seconds
44
45 except Exception as e:
46 print(f"Error polling execution: {e}")
47 time.sleep(30)

Option 3: API Node

This approach is beneficial when you want to specify a specific callback URL or payload format. You can use an API Node at the end of your Workflow to send results directly to your system:

1

Workflow Design

  1. Add an API Node at the end of your Workflow
  2. Configure the API Node to POST results to your callback endpoint
  3. Include your task ID in the API Node payload
2

API Node Configuration

Configure the API Node with:

  • URL: Your callback endpoint
  • Method: POST
  • Headers: Include authentication if needed
  • Body: Include Workflow outputs and your task ID
API Node Body Template
1{
2 "task_id": "{{workflow_inputs.task_id}}",
3 "status": "completed",
4 "results": {
5 "output": "{{final_output_node.output}}",
6 "metadata": {
7 "execution_time": "{{workflow_metadata.duration}}",
8 "cost": "{{workflow_metadata.cost}}"
9 }
10 }
11}

Option 4: Streaming Updates

For Workflows where you want to provide real-time progress updates, use the streaming execution endpoint. This can still be problematic if updates are really long, but at least enables you to stream updated messaging from your LLMs as they invoke tools:

1import vellum
2
3client = vellum.VellumClient(api_key="your-api-key")
4
5# Stream Workflow execution for real-time updates
6stream = client.execute_workflow_stream(
7 workflow_deployment_name="your-workflow",
8 inputs=[
9 vellum.WorkflowRequestStringInput(
10 name="user_query",
11 value="Process this complex request"
12 )
13 ],
14 external_id="task-12345"
15)
16
17for event in stream:
18 if event.type == "workflow.execution.initiated":
19 print(f"Workflow started: {event.execution_id}")
20 elif event.type == "workflow.execution.streaming":
21 # Handle intermediate results
22 print(f"Progress update: {event.data}")
23 elif event.type == "workflow.execution.fulfilled":
24 print(f"Workflow completed: {event.data.outputs}")
25 break
26 elif event.type == "workflow.execution.rejected":
27 print(f"Workflow failed: {event.data.error}")
28 break

Streaming connections should still have reasonable timeout limits. For very long Workflows (>10 minutes), webhooks are still the recommended approach.

Timeout Management

Client-Side Timeouts

When using synchronous execution, configure appropriate timeouts:

1import vellum
2from vellum.core import RequestOptions
3
4client = vellum.VellumClient(api_key="your-api-key")
5
6try:
7 response = client.execute_workflow(
8 workflow_deployment_name="your-workflow",
9 inputs=[...],
10 request_options=RequestOptions(
11 timeout_in_seconds=600 # 10 minute timeout
12 )
13 )
14except TimeoutError:
15 print("Workflow execution timed out")
16 # Handle timeout - workflow may still be running

Infrastructure Considerations

AWS Lambda

If using AWS Lambda, be aware of the 15-minute maximum execution time:

Lambda Handler Pattern
1import json
2import vellum
3
4def lambda_handler(event, context):
5 client = vellum.VellumClient(api_key=os.environ['VELLUM_API_KEY'])
6
7 # For long Workflows, use async pattern
8 response = client.execute_workflow(
9 workflow_deployment_name="long-running-workflow",
10 inputs=event['inputs'],
11 external_id=event['task_id'] # Use for webhook correlation
12 )
13
14 # Return immediately with execution_id
15 return {
16 'statusCode': 200,
17 'body': json.dumps({
18 'execution_id': response.execution_id,
19 'status': 'initiated',
20 'message': 'Workflow started. Results will be sent via webhook.'
21 })
22 }

Container Environments

For containerized applications, ensure your containers can handle long-running connections if using streaming:

Docker Configuration
1# Dockerfile
2FROM python:3.11
3# ... other setup ...
4
5# Set appropriate timeouts
6ENV REQUESTS_TIMEOUT=900
7ENV WORKFLOW_TIMEOUT=900
8
9# Health check for long-running processes
10HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
11 CMD curl -f http://localhost:8000/health || exit 1

Error Handling and Retry Strategies

Workflow-Level Retry

For Workflows that may fail due to transient issues, implement retry logic:

Python Retry Logic
1import time
2import vellum
3from typing import Optional
4
5def execute_workflow_with_retry(
6 client: vellum.VellumClient,
7 workflow_name: str,
8 inputs: list,
9 external_id: str,
10 max_retries: int = 3,
11 retry_delay: int = 60
12) -> Optional[str]:
13 """Execute Workflow with retry logic for transient failures."""
14
15 for attempt in range(max_retries + 1):
16 try:
17 response = client.execute_workflow(
18 workflow_deployment_name=workflow_name,
19 inputs=inputs,
20 external_id=f"{external_id}-attempt-{attempt}"
21 )
22 return response.execution_id
23
24 except Exception as e:
25 if attempt == max_retries:
26 print(f"Workflow failed after {max_retries} retries: {e}")
27 return None
28
29 print(f"Attempt {attempt + 1} failed: {e}. Retrying in {retry_delay}s...")
30 time.sleep(retry_delay)
31
32 return None

Node-Level Resilience

Prompts may experience nondeterministic errors from model providers. Longer running Workflows are particularly prone to these issues. To mitigate this, it’s a good idea to implement retry logic with Node Adornemnts. You can also disable streaming from the Model Settings while editing a Prompt to mitigate other nondeterministic connection issues.

Use Node Adornments to add resilience to individual Workflow nodes:

  • Retry Node Adornments: Automatically retry failed nodes
  • Try Node Adornments: Gracefully handle node failures with fallback paths

See our Node Adornments documentation for detailed configuration.

Monitoring and Observability

Execution Tracking

Monitor long-running Workflows through the Monitoring tab of your Workflow Deployment (see our monitoring documentation for details):

  1. Executions Tab: View real-time execution status
  2. Timeline View: Analyze execution flow and bottlenecks
  3. Cost Tracking: Monitor resource usage for long Workflows

Best Practices Summary

Always use webhook-based async execution for Workflows that may take more than a few minutes. This provides the most reliable and scalable approach.

Always include an external_id when executing Workflows to correlate webhook events with your internal processes.

Handle both Workflow-level failures and infrastructure timeouts gracefully. Consider retry strategies for transient failures.

Track Workflow execution times to identify performance bottlenecks and optimize your Workflows.

Use HMAC authentication to secure your webhook endpoints and verify that events are coming from Vellum.

Use Node Adornments (Retry, Try) to make individual Workflow components more resilient to transient failures.