Skip to main content
Most users should use the built-in LangFlow and Hive adapters. Custom adapters are only needed for connecting proprietary workflow systems. See the source code for reference implementations.

When to Build a Custom Adapter

Build a custom adapter when you need to integrate workflow systems not currently supported by Spark:
  • Proprietary workflow engines - Internal automation tools with REST APIs
  • Custom orchestration systems - Specialized task execution platforms
  • Legacy workflow platforms - Existing systems that need Spark’s scheduling capabilities
Most users will work with LangFlow or Hive and won’t need custom adapters.

The BaseWorkflowAdapter Interface

All adapters inherit from BaseWorkflowAdapter and implement these methods:
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field


@dataclass
class WorkflowExecutionResult:
    """Unified result format across all workflow sources."""
    success: bool
    result: Any
    session_id: Optional[str] = None
    run_id: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
    error: Optional[str] = None


class BaseWorkflowAdapter(ABC):
    """Abstract base adapter for workflow sources."""

    def __init__(self, api_url: str, api_key: str, source_id: Optional[Any] = None):
        self.api_url = api_url
        self.api_key = api_key
        self.source_id = source_id

    @property
    @abstractmethod
    def source_type(self) -> str:
        """Return the source type identifier (e.g., 'langflow', 'custom-api')"""
        pass

    @abstractmethod
    def list_flows_sync(self) -> List[Dict[str, Any]]:
        """List available workflows from this source"""
        pass

    @abstractmethod
    def get_flow_sync(self, flow_id: str) -> Optional[Dict[str, Any]]:
        """Get a specific workflow by ID"""
        pass

    @abstractmethod
    def run_flow_sync(self, flow_id: str, input_data: Any,
                      session_id: Optional[str] = None) -> WorkflowExecutionResult:
        """Execute a workflow and return normalized result"""
        pass

    @abstractmethod
    async def validate(self) -> Dict[str, Any]:
        """Validate connection to the source"""
        pass

Required Methods

Your adapter must implement these four methods:
MethodPurposeReturns
source_typeUnique identifier for your adapterString (e.g., “custom-rest”)
list_flows_sync()List all available workflowsList of workflow dictionaries
get_flow_sync(flow_id)Retrieve a specific workflowWorkflow dictionary or None
run_flow_sync(flow_id, input_data)Execute a workflowWorkflowExecutionResult
validate()Test connection to sourceStatus dictionary

Minimal Working Example

Here’s a simple REST API adapter that shows the basic structure:
# adapters/custom_rest_adapter.py

import logging
import requests
from typing import Dict, List, Optional, Any
from .base import BaseWorkflowAdapter, WorkflowExecutionResult

logger = logging.getLogger(__name__)


class CustomRestAdapter(BaseWorkflowAdapter):
    """Adapter for custom REST API workflow systems."""

    def __init__(self, api_url: str, api_key: str, source_id: Optional[Any] = None):
        super().__init__(api_url, api_key, source_id)
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        })

    @property
    def source_type(self) -> str:
        return "custom-rest"

    def list_flows_sync(self) -> List[Dict[str, Any]]:
        """List workflows via GET /api/workflows"""
        try:
            response = self.session.get(f"{self.api_url}/api/workflows")
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            logger.error(f"Failed to list workflows: {str(e)}")
            raise

    def get_flow_sync(self, flow_id: str) -> Optional[Dict[str, Any]]:
        """Get workflow via GET /api/workflows/{flow_id}"""
        try:
            response = self.session.get(f"{self.api_url}/api/workflows/{flow_id}")
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            if e.response and e.response.status_code == 404:
                return None
            logger.error(f"Failed to get workflow {flow_id}: {str(e)}")
            raise

    def run_flow_sync(self, flow_id: str, input_data: Any,
                      session_id: Optional[str] = None) -> WorkflowExecutionResult:
        """Execute workflow via POST /api/workflows/{flow_id}/execute"""
        try:
            payload = {"input": input_data}
            if session_id:
                payload["session_id"] = session_id

            response = self.session.post(
                f"{self.api_url}/api/workflows/{flow_id}/execute",
                json=payload,
                timeout=300
            )
            response.raise_for_status()

            result_data = response.json()
            return WorkflowExecutionResult(
                success=result_data.get("success", True),
                result=result_data.get("result"),
                session_id=session_id,
                run_id=result_data.get("run_id")
            )

        except requests.Timeout:
            return WorkflowExecutionResult(
                success=False,
                result=None,
                error="Workflow execution timed out after 300 seconds"
            )
        except requests.RequestException as e:
            return WorkflowExecutionResult(
                success=False,
                result=None,
                error=str(e)
            )

    async def validate(self) -> Dict[str, Any]:
        """Validate connection via GET /api/health"""
        try:
            response = self.session.get(f"{self.api_url}/api/health", timeout=10)
            response.raise_for_status()
            health = response.json()

            return {
                "status": "success",
                "service": "Custom REST API",
                "url": self.api_url,
                "version": health.get("version", "unknown")
            }
        except requests.RequestException as e:
            logger.error(f"Validation failed: {str(e)}")
            raise
This example assumes your workflow API has these endpoints:
  • GET /api/workflows - List workflows
  • GET /api/workflows/{id} - Get workflow details
  • POST /api/workflows/{id}/execute - Execute workflow
  • GET /api/health - Health check
Adapt the endpoint paths and response formats to match your actual API structure.

Registering Your Adapter

Make your adapter available to Spark by registering it:
# adapters/registry.py

from .factory import AdapterRegistry
from .custom_rest_adapter import CustomRestAdapter

def register_adapters():
    """Register all workflow adapters."""
    AdapterRegistry.register("custom-rest", CustomRestAdapter)

# Auto-register when imported
register_adapters()
Now you can use your adapter via the CLI:
# Add a source using your custom adapter
automagik-spark sources add \
  --name my-workflow-system \
  --type custom-rest \
  --url http://localhost:8000 \
  --api-key sk-my-secret-key

# Sync workflows
automagik-spark workflows sync --source my-workflow-system

# Create a schedule
automagik-spark schedules create \
  --workflow-id abc123 \
  --cron "0 9 * * *" \
  --input "Daily task"

Testing Your Adapter

Create basic tests to verify your adapter works:
# tests/test_custom_rest_adapter.py

import pytest
from unittest.mock import Mock, patch
from adapters.custom_rest_adapter import CustomRestAdapter


@pytest.fixture
def adapter():
    return CustomRestAdapter(
        api_url="http://localhost:8000",
        api_key="test-key"
    )


@patch('requests.Session.get')
def test_list_flows(mock_get, adapter):
    mock_response = Mock()
    mock_response.json.return_value = [{"id": "wf1", "name": "Test"}]
    mock_response.raise_for_status = Mock()
    mock_get.return_value = mock_response

    flows = adapter.list_flows_sync()
    assert len(flows) == 1
    assert flows[0]["id"] == "wf1"


@patch('requests.Session.post')
def test_run_flow(mock_post, adapter):
    mock_response = Mock()
    mock_response.json.return_value = {
        "success": True,
        "result": {"output": "Done"},
        "run_id": "run123"
    }
    mock_response.raise_for_status = Mock()
    mock_post.return_value = mock_response

    result = adapter.run_flow_sync("wf1", "test input")
    assert result.success is True
    assert result.result["output"] == "Done"
Run tests with:
pytest tests/test_custom_rest_adapter.py

Next Steps

For production-ready adapters with advanced features:
  • Review reference implementations: See the LangFlow adapter and Hive adapter source code
  • Add connection pooling: Use HTTPAdapter with retry strategies for better reliability
  • Implement error handling: Handle timeouts, connection errors, and API failures gracefully
  • Join the community: Ask questions in GitHub Discussions
Start simple and add features as needed. The built-in adapters handle most production concerns already.