Most users should use the built-in LangFlow and Hive adapters. Custom adapters are only needed for connecting proprietary workflow systems. See the source code for reference implementations.
When to Build a Custom Adapter
Build a custom adapter when you need to integrate workflow systems not currently supported by Spark:
- Proprietary workflow engines - Internal automation tools with REST APIs
- Custom orchestration systems - Specialized task execution platforms
- Legacy workflow platforms - Existing systems that need Spark’s scheduling capabilities
Most users will work with LangFlow or Hive and won’t need custom adapters.
The BaseWorkflowAdapter Interface
All adapters inherit from BaseWorkflowAdapter and implement these methods:
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
@dataclass
class WorkflowExecutionResult:
"""Unified result format across all workflow sources."""
success: bool
result: Any
session_id: Optional[str] = None
run_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
class BaseWorkflowAdapter(ABC):
"""Abstract base adapter for workflow sources."""
def __init__(self, api_url: str, api_key: str, source_id: Optional[Any] = None):
self.api_url = api_url
self.api_key = api_key
self.source_id = source_id
@property
@abstractmethod
def source_type(self) -> str:
"""Return the source type identifier (e.g., 'langflow', 'custom-api')"""
pass
@abstractmethod
def list_flows_sync(self) -> List[Dict[str, Any]]:
"""List available workflows from this source"""
pass
@abstractmethod
def get_flow_sync(self, flow_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific workflow by ID"""
pass
@abstractmethod
def run_flow_sync(self, flow_id: str, input_data: Any,
session_id: Optional[str] = None) -> WorkflowExecutionResult:
"""Execute a workflow and return normalized result"""
pass
@abstractmethod
async def validate(self) -> Dict[str, Any]:
"""Validate connection to the source"""
pass
Required Methods
Your adapter must implement these four methods:
| Method | Purpose | Returns |
source_type | Unique identifier for your adapter | String (e.g., “custom-rest”) |
list_flows_sync() | List all available workflows | List of workflow dictionaries |
get_flow_sync(flow_id) | Retrieve a specific workflow | Workflow dictionary or None |
run_flow_sync(flow_id, input_data) | Execute a workflow | WorkflowExecutionResult |
validate() | Test connection to source | Status dictionary |
Minimal Working Example
Here’s a simple REST API adapter that shows the basic structure:
# adapters/custom_rest_adapter.py
import logging
import requests
from typing import Dict, List, Optional, Any
from .base import BaseWorkflowAdapter, WorkflowExecutionResult
logger = logging.getLogger(__name__)
class CustomRestAdapter(BaseWorkflowAdapter):
"""Adapter for custom REST API workflow systems."""
def __init__(self, api_url: str, api_key: str, source_id: Optional[Any] = None):
super().__init__(api_url, api_key, source_id)
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
})
@property
def source_type(self) -> str:
return "custom-rest"
def list_flows_sync(self) -> List[Dict[str, Any]]:
"""List workflows via GET /api/workflows"""
try:
response = self.session.get(f"{self.api_url}/api/workflows")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to list workflows: {str(e)}")
raise
def get_flow_sync(self, flow_id: str) -> Optional[Dict[str, Any]]:
"""Get workflow via GET /api/workflows/{flow_id}"""
try:
response = self.session.get(f"{self.api_url}/api/workflows/{flow_id}")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if e.response and e.response.status_code == 404:
return None
logger.error(f"Failed to get workflow {flow_id}: {str(e)}")
raise
def run_flow_sync(self, flow_id: str, input_data: Any,
session_id: Optional[str] = None) -> WorkflowExecutionResult:
"""Execute workflow via POST /api/workflows/{flow_id}/execute"""
try:
payload = {"input": input_data}
if session_id:
payload["session_id"] = session_id
response = self.session.post(
f"{self.api_url}/api/workflows/{flow_id}/execute",
json=payload,
timeout=300
)
response.raise_for_status()
result_data = response.json()
return WorkflowExecutionResult(
success=result_data.get("success", True),
result=result_data.get("result"),
session_id=session_id,
run_id=result_data.get("run_id")
)
except requests.Timeout:
return WorkflowExecutionResult(
success=False,
result=None,
error="Workflow execution timed out after 300 seconds"
)
except requests.RequestException as e:
return WorkflowExecutionResult(
success=False,
result=None,
error=str(e)
)
async def validate(self) -> Dict[str, Any]:
"""Validate connection via GET /api/health"""
try:
response = self.session.get(f"{self.api_url}/api/health", timeout=10)
response.raise_for_status()
health = response.json()
return {
"status": "success",
"service": "Custom REST API",
"url": self.api_url,
"version": health.get("version", "unknown")
}
except requests.RequestException as e:
logger.error(f"Validation failed: {str(e)}")
raise
This example assumes your workflow API has these endpoints:
GET /api/workflows - List workflows
GET /api/workflows/{id} - Get workflow details
POST /api/workflows/{id}/execute - Execute workflow
GET /api/health - Health check
Adapt the endpoint paths and response formats to match your actual API structure.
Registering Your Adapter
Make your adapter available to Spark by registering it:
# adapters/registry.py
from .factory import AdapterRegistry
from .custom_rest_adapter import CustomRestAdapter
def register_adapters():
"""Register all workflow adapters."""
AdapterRegistry.register("custom-rest", CustomRestAdapter)
# Auto-register when imported
register_adapters()
Now you can use your adapter via the CLI:
# Add a source using your custom adapter
automagik-spark sources add \
--name my-workflow-system \
--type custom-rest \
--url http://localhost:8000 \
--api-key sk-my-secret-key
# Sync workflows
automagik-spark workflows sync --source my-workflow-system
# Create a schedule
automagik-spark schedules create \
--workflow-id abc123 \
--cron "0 9 * * *" \
--input "Daily task"
Testing Your Adapter
Create basic tests to verify your adapter works:
# tests/test_custom_rest_adapter.py
import pytest
from unittest.mock import Mock, patch
from adapters.custom_rest_adapter import CustomRestAdapter
@pytest.fixture
def adapter():
return CustomRestAdapter(
api_url="http://localhost:8000",
api_key="test-key"
)
@patch('requests.Session.get')
def test_list_flows(mock_get, adapter):
mock_response = Mock()
mock_response.json.return_value = [{"id": "wf1", "name": "Test"}]
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
flows = adapter.list_flows_sync()
assert len(flows) == 1
assert flows[0]["id"] == "wf1"
@patch('requests.Session.post')
def test_run_flow(mock_post, adapter):
mock_response = Mock()
mock_response.json.return_value = {
"success": True,
"result": {"output": "Done"},
"run_id": "run123"
}
mock_response.raise_for_status = Mock()
mock_post.return_value = mock_response
result = adapter.run_flow_sync("wf1", "test input")
assert result.success is True
assert result.result["output"] == "Done"
Run tests with:
pytest tests/test_custom_rest_adapter.py
Next Steps
For production-ready adapters with advanced features:
- Review reference implementations: See the LangFlow adapter and Hive adapter source code
- Add connection pooling: Use
HTTPAdapter with retry strategies for better reliability
- Implement error handling: Handle timeouts, connection errors, and API failures gracefully
- Join the community: Ask questions in GitHub Discussions
Start simple and add features as needed. The built-in adapters handle most production concerns already.