Overview
Tasks represent individual workflow executions. Each time a workflow runs (manually or via schedule), a task is created to track the execution state, input/output data, errors, and retry attempts.
Task Lifecycle
pending : Task created and queued for execution
running : Worker has picked up the task and is executing
success : Task completed successfully
failed : Task failed after all retry attempts
retrying : Task failed but will retry
List Tasks
Retrieve all tasks with optional filtering by workflow or status. Supports pagination.
Filter tasks by workflow ID
Filter by task status: pending, running, success, failed, retrying
Number of items per page (min: 1, max: 100)
Number of items to skip for pagination
cURL
cURL (Filter by status)
Python
JavaScript
curl -X GET "http://localhost:8883/api/v1/tasks?workflow_id=550e8400-e29b-41d4-a716-446655440000&limit=10" \
-H "X-API-Key: your-api-key"
Response
{
"items" : [
{
"id" : "660e8400-e29b-41d4-a716-446655440001" ,
"workflow_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"status" : "success" ,
"input_data" : {
"value" : "Generate daily report"
},
"output_data" : {
"result" : "Report generated successfully" ,
"file_path" : "/reports/2025-11-04.pdf"
},
"error" : null ,
"tries" : 1 ,
"max_retries" : 3 ,
"next_retry_at" : null ,
"started_at" : "2025-11-04T09:00:05Z" ,
"finished_at" : "2025-11-04T09:02:34Z" ,
"created_at" : "2025-11-04T09:00:00Z" ,
"updated_at" : "2025-11-04T09:02:34Z"
}
],
"total" : 1 ,
"limit" : 10 ,
"offset" : 0 ,
"has_more" : false
}
{
"detail" : "Invalid API key"
}
Performance : Filter by workflow_id or status to narrow results. Use pagination for large result sets.
Get Task
Retrieve detailed information about a specific task, including full input/output data and error details.
UUID of the task to retrieve
curl -X GET http://localhost:8883/api/v1/tasks/660e8400-e29b-41d4-a716-446655440001 \
-H "X-API-Key: your-api-key"
Response
{
"id" : "660e8400-e29b-41d4-a716-446655440001" ,
"workflow_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"status" : "success" ,
"input_data" : {
"value" : "Generate daily report"
},
"output_data" : {
"result" : "Report generated successfully" ,
"file_path" : "/reports/2025-11-04.pdf" ,
"metrics" : {
"execution_time" : "2m 29s" ,
"records_processed" : 1543
}
},
"error" : null ,
"tries" : 1 ,
"max_retries" : 3 ,
"next_retry_at" : null ,
"started_at" : "2025-11-04T09:00:05Z" ,
"finished_at" : "2025-11-04T09:02:34Z" ,
"created_at" : "2025-11-04T09:00:00Z" ,
"updated_at" : "2025-11-04T09:02:34Z"
}
200 Success (Failed Task)
{
"id" : "660e8400-e29b-41d4-a716-446655440002" ,
"workflow_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"status" : "failed" ,
"input_data" : {
"value" : "Generate report"
},
"output_data" : null ,
"error" : "Connection timeout: Failed to connect to external API after 3 attempts" ,
"tries" : 3 ,
"max_retries" : 3 ,
"next_retry_at" : null ,
"started_at" : "2025-11-04T10:00:05Z" ,
"finished_at" : "2025-11-04T10:15:23Z" ,
"created_at" : "2025-11-04T10:00:00Z" ,
"updated_at" : "2025-11-04T10:15:23Z"
}
{
"detail" : "Task not found"
}
Delete Task
Delete a task from the database. This removes the execution record permanently.
UUID of the task to delete
curl -X DELETE http://localhost:8883/api/v1/tasks/660e8400-e29b-41d4-a716-446655440001 \
-H "X-API-Key: your-api-key"
Response
{
"id" : "660e8400-e29b-41d4-a716-446655440001" ,
"workflow_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"status" : "success" ,
"input_data" : {
"value" : "Generate daily report"
},
"output_data" : {
"result" : "Report generated successfully"
},
"error" : null ,
"tries" : 1 ,
"max_retries" : 3 ,
"next_retry_at" : null ,
"started_at" : "2025-11-04T09:00:05Z" ,
"finished_at" : "2025-11-04T09:02:34Z" ,
"created_at" : "2025-11-04T09:00:00Z" ,
"updated_at" : "2025-11-04T09:02:34Z"
}
{
"detail" : "Task not found"
}
{
"detail" : "Error deleting task: [error details]"
}
Permanent Action : Deleting a task removes the execution record permanently. This cannot be undone. Use with caution.
Task Status Reference
Status Description Next Action
pending Task queued, waiting for worker Worker will pick up automatically running Worker executing task Wait for completion success Task completed successfully No action needed failed Task failed after all retries Check error field, manual retry possible retrying Task failed, retry scheduled Wait for automatic retry
Task Retry Logic
Tasks automatically retry on failure with exponential backoff:
Default retries : 3 attempts
Backoff strategy : Exponential (5s, 25s, 125s)
Max retry delay : 2 minutes
Retry Behavior
Task fails during execution
Status changes to retrying
next_retry_at timestamp is set
Worker automatically retries at scheduled time
After max retries, status changes to failed
Manual Retry : Currently not available via API. Failed tasks must be manually re-run by creating a new workflow execution with the same input data.
Understanding Task Data
Input data is stored as JSON. Simple string inputs are wrapped in an object:
{
"value" : "Your input text here"
}
Complex inputs preserve their structure:
{
"query" : "Search term" ,
"filters" : {
"status" : "active" ,
"date_range" : "last_7_days"
},
"limit" : 100
}
Output data structure depends on the workflow source and output component:
LangFlow Output :
{
"result" : "Generated text response" ,
"session_id" : "session_abc123"
}
Hive Output :
{
"response" : "Agent response text" ,
"metadata" : {
"agent_id" : "agent_xyz" ,
"execution_time" : "1.23s"
}
}
Error Data
When tasks fail, the error field contains the error message:
{
"error" : "HTTPError: 500 Server Error - Internal server error from http://localhost:7860/api/v1/run/abc123"
}
Monitoring Tasks
Real-Time Monitoring
Poll the list endpoint to monitor task status:
import requests
import time
def monitor_task ( task_id , api_key ):
"""Poll task status until completion."""
while True :
response = requests.get(
f "http://localhost:8883/api/v1/tasks/ { task_id } " ,
headers = { "X-API-Key" : api_key}
)
task = response.json()
status = task[ "status" ]
print ( f "Task status: { status } " )
if status in [ "success" , "failed" ]:
return task
time.sleep( 5 ) # Poll every 5 seconds
# Usage
task_id = "660e8400-e29b-41d4-a716-446655440001"
final_task = monitor_task(task_id, "your-api-key" )
Filtering Failed Tasks
List all failed tasks for investigation:
curl -X GET "http://localhost:8883/api/v1/tasks?status=failed" \
-H "X-API-Key: your-api-key"
Workflow Success Rate
Calculate success rate by comparing successful vs failed tasks:
import requests
def get_workflow_success_rate ( workflow_id , api_key ):
"""Calculate success rate for a workflow."""
# Get all tasks
response = requests.get(
"http://localhost:8883/api/v1/tasks" ,
headers = { "X-API-Key" : api_key},
params = { "workflow_id" : workflow_id, "limit" : 100 }
)
tasks = response.json()[ "items" ]
if not tasks:
return 0.0
success_count = sum ( 1 for t in tasks if t[ "status" ] == "success" )
total_count = len (tasks)
return (success_count / total_count) * 100
# Usage
workflow_id = "550e8400-e29b-41d4-a716-446655440000"
rate = get_workflow_success_rate(workflow_id, "your-api-key" )
print ( f "Success rate: { rate :.1f} %" )
Error Codes
Status Detail Cause
400 Error deleting task Database error or task in progress 404 Task not found Invalid task ID or task was deleted 401 Invalid API key Missing or incorrect X-API-Key header
Common Task Errors
Connection Errors
{
"error" : "HTTPError: Connection refused - Cannot connect to http://localhost:7860"
}
Cause : Workflow source (LangFlow/Hive) is not running or unreachable.
Solution : Verify source is running and accessible.
Timeout Errors
{
"error" : "Timeout: Workflow execution exceeded 300 seconds"
}
Cause : Workflow took too long to execute.
Solution : Optimize workflow or increase timeout settings.
{
"error" : "ValidationError: Input component not found in workflow"
}
Cause : Workflow structure changed or input component missing.
Solution : Re-sync workflow from source.
Task Metrics
Calculate execution metrics from task data:
Average Execution Time
from datetime import datetime
def calculate_avg_execution_time ( tasks ):
"""Calculate average execution time in seconds."""
durations = []
for task in tasks:
if task[ "started_at" ] and task[ "finished_at" ]:
start = datetime.fromisoformat(task[ "started_at" ].replace( "Z" , "+00:00" ))
end = datetime.fromisoformat(task[ "finished_at" ].replace( "Z" , "+00:00" ))
duration = (end - start).total_seconds()
durations.append(duration)
if not durations:
return 0.0
return sum (durations) / len (durations)
# Usage
avg_time = calculate_avg_execution_time(tasks)
print ( f "Average execution time: { avg_time :.1f} s" )
Next Steps