Workflow Execution Filtering and Monitoring

When working with Workflow Deployments in production, you’ll often need to filter and retrieve execution data for monitoring, debugging, or analytics purposes. This guide shows you how to use the Workflow execution API endpoints effectively, including how to construct complex filters programmatically.

Overview

Vellum provides two key endpoints for working with Workflow execution data:

  1. List Workflow Deployment Executions - Retrieve and filter executions with advanced query capabilities
  2. Retrieve Workflow Deployment Execution - Get detailed information about a specific execution, including actuals submissions
1

Basic Execution Retrieval

Start by retrieving recent executions for a Workflow Deployment. This quick example demonstrates how to set up the client and fetch execution data with basic filtering and sorting:

1import vellum
2
3client = vellum.VellumClient(api_key="your-api-key")
4
5# List recent executions for a Workflow Deployment
6executions = client.workflow_deployments.list_executions(
7 id="your-workflow-deployment-id",
8 limit=50, # Number of executions to return
9 offset=0 # Starting position for pagination
10)
11
12for execution in executions.results:
13 print(f"Execution ID: {execution.execution_id}")
14 print(f"Status: {execution.state}")
15 print(f"Timestamp: {execution.timestamp}")

Advanced Filtering

Understanding Filter Structure

The filters parameter accepts a JSON string that defines complex filtering conditions. Here’s the basic structure:

Filter JSON Structure
1{
2 "type": "LOGICAL_CONDITION_GROUP",
3 "conditions": [
4 {
5 "type": "LOGICAL_CONDITION",
6 "lhs_variable": {"type": "STRING", "value": "field_name"},
7 "operator": ">=",
8 "rhs_variable": {"type": "STRING", "value": "filter_value"}
9 }
10 ],
11 "combinator": "AND",
12 "negated": false
13}

Common Filtering Examples

Filter by Timestamp Range

1import json
2from datetime import datetime, timezone
3
4# Create a timestamp filter for executions after a specific date
5filter_conditions = {
6 "type": "LOGICAL_CONDITION_GROUP",
7 "conditions": [
8 {
9 "type": "LOGICAL_CONDITION",
10 "lhs_variable": {"type": "STRING", "value": "timestamp"},
11 "operator": ">=",
12 "rhs_variable": {"type": "STRING", "value": "2025-09-22T20:42:09.361Z"}
13 }
14 ],
15 "combinator": "AND",
16 "negated": False
17}
18
19executions = client.workflow_deployments.list_executions(
20 id="your-workflow-deployment-id",
21 filters=json.dumps(filter_conditions)
22)

Filter by Execution Status

1# Filter for failed executions
2filter_conditions = {
3 "type": "LOGICAL_CONDITION_GROUP",
4 "conditions": [
5 {
6 "type": "LOGICAL_CONDITION",
7 "lhs_variable": {"type": "STRING", "value": "state"},
8 "operator": "==",
9 "rhs_variable": {"type": "STRING", "value": "REJECTED"}
10 }
11 ],
12 "combinator": "AND",
13 "negated": False
14}
15
16failed_executions = client.workflow_deployments.list_executions(
17 id="your-workflow-deployment-id",
18 filters=json.dumps(filter_conditions)
19)

Complex Multi-Condition Filters

1# Filter for executions that are either successful OR failed after a specific date
2complex_filter = {
3 "type": "LOGICAL_CONDITION_GROUP",
4 "conditions": [
5 {
6 "type": "LOGICAL_CONDITION",
7 "lhs_variable": {"type": "STRING", "value": "timestamp"},
8 "operator": ">=",
9 "rhs_variable": {"type": "STRING", "value": "2025-09-20T00:00:00Z"}
10 },
11 {
12 "type": "LOGICAL_CONDITION_GROUP",
13 "conditions": [
14 {
15 "type": "LOGICAL_CONDITION",
16 "lhs_variable": {"type": "STRING", "value": "state"},
17 "operator": "==",
18 "rhs_variable": {"type": "STRING", "value": "FULFILLED"}
19 },
20 {
21 "type": "LOGICAL_CONDITION",
22 "lhs_variable": {"type": "STRING", "value": "state"},
23 "operator": "==",
24 "rhs_variable": {"type": "STRING", "value": "REJECTED"}
25 }
26 ],
27 "combinator": "OR",
28 "negated": False
29 }
30 ],
31 "combinator": "AND",
32 "negated": False
33}
34
35filtered_executions = client.workflow_deployments.list_executions(
36 id="your-workflow-deployment-id",
37 filters=json.dumps(complex_filter)
38)

Ordering Results

Use the ordering parameter to sort execution results. Common ordering options include:

  • timestamp - Sort by execution timestamp (oldest first)
  • -timestamp - Sort by execution timestamp (newest first)
  • state - Sort by execution state
  • -state - Sort by execution state (reverse order)
1# Get most recent executions first
2executions = client.workflow_deployments.list_executions(
3 id="your-workflow-deployment-id",
4 ordering="-timestamp",
5 limit=20
6)

Retrieving Execution Details and Actuals

Once you have an execution ID, you can retrieve detailed information including submitted actuals:

1# Get detailed execution information
2execution_detail = client.workflow_deployments.retrieve_execution_event(
3 id="your-workflow-deployment-id",
4 execution_id="execution-uuid"
5)
6
7# Access execution details
8print(f"Execution State: {execution_detail.state}")
9print(f"Input Values: {execution_detail.inputs}")
10print(f"Output Values: {execution_detail.outputs}")
11
12# Access actuals if submitted
13if hasattr(execution_detail, 'latest_actual') and execution_detail.latest_actual:
14 print(f"Actual submitted at: {execution_detail.latest_actual.timestamp}")
15 print(f"Actual output: {execution_detail.latest_actual.output}")

UI Filter Builder Reference

While we recommend using the programmatic approach above, you can also use the Vellum UI to build filters visually and then copy the generated filter structure:

Vellum UI showing filter builder interface for Workflow executions
Use the Workflow Deployment Executions page filter builder to construct complex queries

The URL will contain the encoded filter structure that you can decode and use in your API calls:

Decoding UI-Generated Filters
1// Example URL filter parameter
2const encodedFilter = '%7B%22type%22%3A%22LOGICAL_CONDITION_GROUP%22%2C%22conditions%22%3A%5B%7B%22type%22%3A%22LOGICAL_CONDITION%22%2C%22lhs_variable%22%3A%7B%22type%22%3A%22STRING%22%2C%22value%22%3A%22timestamp%22%7D%2C%22operator%22%3A%22%3E%3D%22%2C%22rhs_variable%22%3A%7B%22type%22%3A%22STRING%22%2C%22value%22%3A%222025-09-22T20%3A42%3A09.361Z%22%7D%7D%5D%2C%22combinator%22%3A%22AND%22%2C%22negated%22%3Afalse%7D';
3
4// Decode the filter
5const decodedFilter = decodeURIComponent(encodedFilter);
6const filterObject = JSON.parse(decodedFilter);
7
8console.log('Filter structure:', filterObject);

Common Filtering Fields

Here are the most commonly used fields for filtering Workflow executions:

FieldTypeDescriptionExample Values
timestampDateTimeExecution timestamp2025-09-22T20:42:09.361Z
stateStringExecution stateFULFILLED, REJECTED, INITIATED
execution_idUUIDUnique execution identifieruuid-string
external_idStringUser-provided external IDyour-custom-id

Supported Operators

When building filter conditions, you can use these operators:

OperatorDescriptionExample Use Case
==Equal toExact state matching
!=Not equal toExclude specific states
>=Greater than or equalTimestamp ranges (after date)
<=Less than or equalTimestamp ranges (before date)
>Greater thanStrict timestamp filtering
<Less thanStrict timestamp filtering

Production Monitoring Patterns

Real-time Error Monitoring

Python SDK - Error Monitoring
1import json
2from datetime import datetime, timedelta
3
4def monitor_failed_executions(deployment_id, hours_back=1):
5 """Monitor for failed executions in the last N hours."""
6
7 # Calculate timestamp for filtering
8 cutoff_time = datetime.utcnow() - timedelta(hours=hours_back)
9 cutoff_iso = cutoff_time.isoformat() + 'Z'
10
11 # Build filter for recent failures
12 filter_conditions = {
13 "type": "LOGICAL_CONDITION_GROUP",
14 "conditions": [
15 {
16 "type": "LOGICAL_CONDITION",
17 "lhs_variable": {"type": "STRING", "value": "timestamp"},
18 "operator": ">=",
19 "rhs_variable": {"type": "STRING", "value": cutoff_iso}
20 },
21 {
22 "type": "LOGICAL_CONDITION",
23 "lhs_variable": {"type": "STRING", "value": "state"},
24 "operator": "==",
25 "rhs_variable": {"type": "STRING", "value": "REJECTED"}
26 }
27 ],
28 "combinator": "AND",
29 "negated": False
30 }
31
32 # Retrieve failed executions
33 failed_executions = client.workflow_deployments.list_executions(
34 id=deployment_id,
35 filters=json.dumps(filter_conditions),
36 ordering="-timestamp"
37 )
38
39 # Process failures
40 for execution in failed_executions.results:
41 print(f"ALERT: Execution {execution.execution_id} failed at {execution.timestamp}")
42
43 # Get detailed error information
44 detail = client.workflow_deployments.retrieve_execution_event(
45 id=deployment_id,
46 execution_id=execution.execution_id
47 )
48
49 if hasattr(detail, 'error') and detail.error:
50 print(f"Error: {detail.error.message}")
51
52# Usage
53monitor_failed_executions("your-deployment-id", hours_back=2)

Performance Analytics

Python SDK - Performance Analytics
1def analyze_execution_performance(deployment_id, days_back=7):
2 """Analyze execution performance over time."""
3
4 from datetime import datetime, timedelta
5
6 # Get executions from the last week
7 cutoff_time = datetime.utcnow() - timedelta(days=days_back)
8 cutoff_iso = cutoff_time.isoformat() + 'Z'
9
10 filter_conditions = {
11 "type": "LOGICAL_CONDITION_GROUP",
12 "conditions": [
13 {
14 "type": "LOGICAL_CONDITION",
15 "lhs_variable": {"type": "STRING", "value": "timestamp"},
16 "operator": ">=",
17 "rhs_variable": {"type": "STRING", "value": cutoff_iso}
18 }
19 ],
20 "combinator": "AND",
21 "negated": False
22 }
23
24 executions = client.workflow_deployments.list_executions(
25 id=deployment_id,
26 filters=json.dumps(filter_conditions),
27 limit=1000, # Adjust based on your volume
28 ordering="-timestamp"
29 )
30
31 # Analyze results
32 total_count = len(executions.results)
33 successful_count = len([e for e in executions.results if e.state == "FULFILLED"])
34 failed_count = len([e for e in executions.results if e.state == "REJECTED"])
35
36 print(f"Execution Summary (Last {days_back} days):")
37 print(f"Total: {total_count}")
38 print(f"Successful: {successful_count} ({successful_count/total_count*100:.1f}%)")
39 print(f"Failed: {failed_count} ({failed_count/total_count*100:.1f}%)")
40
41 return {
42 "total": total_count,
43 "successful": successful_count,
44 "failed": failed_count,
45 "success_rate": successful_count/total_count if total_count > 0 else 0
46 }

Next Steps