Skip to main content

System Architecture

Spark is built on a distributed task queue architecture that separates concerns: scheduling, execution, and state management. This design enables reliable 24/7 automation with horizontal scalability.

Core Components

FastAPI Server

REST API (Port 8883)HTTP server that handles all user interactions: creating sources, syncing workflows, managing schedules, and viewing task history. Built with FastAPI for async performance and automatic OpenAPI documentation.

PostgreSQL Database

State Storage (Port 5402)Primary data store for all workflow metadata: sources, workflows, schedules, tasks, and execution results. Uses SQLAlchemy ORM with async support (asyncpg driver) for efficient database operations.

Redis

Task Queue (Port 5412)In-memory broker for Celery’s task queue and result backend. Holds pending tasks waiting for workers and stores task results temporarily. Redis persistence enabled via appendonly mode.

Celery Beat

Schedule MonitorDedicated process that continuously checks PostgreSQL for active schedules. When a schedule’s time arrives (cron expression or interval), Beat creates a task and pushes it to Redis queue.

Celery Workers

Task ExecutorsWorker processes that pull tasks from Redis queue, load the appropriate adapter, call the external source API, and save results to PostgreSQL. Scale horizontally by running multiple workers.

Workflow Adapters

Source AbstractionAdapter classes that translate Spark’s generic task format into source-specific API calls. Each source (LangFlow, Hive) has its own adapter handling authentication, input/output mapping, and error normalization.

Data Flow: Schedule to Execution

Here’s exactly what happens when a schedule triggers a workflow execution:
1

Schedule Creation

User creates a schedule via API/CLI:
automagik-spark schedules create --workflow-id <uuid> \
  --expression "0 9 * * *" --input '{"message": "Hello"}'
Schedule stored in PostgreSQL with status="active" and calculated next_run_at timestamp.
2

Schedule Monitoring

Celery Beat runs a loop every 5 seconds (configurable via beat_max_loop_interval):
  • Queries PostgreSQL for schedules where next_run_at <= NOW()
  • Uses custom DatabaseScheduler class (not file-based)
  • Calculates next run time based on cron expression (via croniter library)
3

Task Creation

When schedule time arrives, Beat:
  • Creates Celery task: execute_workflow_task.apply_async()
  • Pushes task to Redis queue with workflow ID and input data
  • Updates schedule’s next_run_at in PostgreSQL
  • Creates Task record in PostgreSQL with status="pending"
4

Task Pickup

Celery Worker (running in separate process):
  • Polls Redis queue for pending tasks
  • Picks up task using worker_prefetch_multiplier=1 (one at a time)
  • Updates Task record to status="running" in PostgreSQL
5

Adapter Invocation

Worker loads workflow from PostgreSQL and:
  • Identifies source type (langflow, automagik-hive)
  • Instantiates appropriate adapter from registry
  • Calls adapter’s run_flow_sync() method with input data
  • Adapter handles source-specific API authentication and request format
6

External API Call

Adapter makes HTTP request to source:
  • LangFlow: POST /api/v1/run/{flow_id} with tweaks mapping
  • Hive: Different endpoints for agent/team/workflow types
  • Includes API key in headers
  • Handles session management and polling for results
7

Result Processing

Adapter normalizes response into WorkflowExecutionResult:
WorkflowExecutionResult(
    success=True,
    result=response_data,
    session_id=session_id,
    run_id=run_id,
    metadata={...}
)
Worker saves result to Task record in PostgreSQL.
8

Status Update

Worker updates Task record with:
  • status="completed" or status="failed"
  • result field with execution output
  • error field if failed
  • completed_at timestamp
User can view results via API: GET /api/v1/tasks/{task_id}

Port Configuration

Spark uses non-standard ports to avoid conflicts with other services:
ComponentDefault PortEnvironment VariablePurpose
API Server8883AUTOMAGIK_SPARK_API_PORTREST API endpoint
PostgreSQL5402AUTOMAGIK_SPARK_POSTGRES_PORTDatabase connections
Redis5412AUTOMAGIK_SPARK_REDIS_PORTTask queue broker
LangFlow7860LANGFLOW_PORTExternal workflow source
Hive8881AUTOMAGIK_API_PORTExternal agent source
All ports are configurable via environment variables. Default ports chosen to avoid conflicts with standard PostgreSQL (5432), Redis (6379), and other common services.

Connection Strings

Spark uses these connection patterns:
# Format: postgresql+asyncpg://user:password@host:port/database
AUTOMAGIK_SPARK_DATABASE_URL=postgresql+asyncpg://spark_user:spark_pass@localhost:5402/automagik_spark

# Driver: asyncpg (async SQLAlchemy support)
# Creates tables via Alembic migrations
# Connection pooling via SQLAlchemy AsyncEngine
# Broker (task queue)
AUTOMAGIK_SPARK_CELERY_BROKER_URL=redis://localhost:5412/0

# Result Backend (task results)
AUTOMAGIK_SPARK_CELERY_RESULT_BACKEND=redis://localhost:5412/0

# Same Redis instance, database 0
# Persistence: appendonly yes (for durability)
# API Key (X-API-Key header)
AUTOMAGIK_SPARK_API_KEY=your-secret-key

# Example request:
curl -H "X-API-Key: your-secret-key" \
  http://localhost:8883/api/v1/workflows

How It All Works Together

Why PostgreSQL for Metadata?

Relational data needs relational storage. Spark manages complex relationships:
  • Workflows belong to Sources
  • Schedules reference Workflows
  • Tasks track execution history
  • Components define workflow inputs/outputs
PostgreSQL provides:
  • ACID transactions for schedule creation (ensure consistency)
  • Foreign keys to maintain referential integrity
  • Indexes for fast schedule lookups by next_run_at
  • JSON columns for flexible workflow data storage

Why Redis for Queuing?

Speed and simplicity. Redis is optimized for the queue use case:
  • In-memory performance for sub-millisecond task pickup
  • Pub/Sub support for Celery’s result backend
  • Persistence (appendonly) prevents task loss on restart
  • Simple data structures (lists, sets) match queue patterns
Alternative (RabbitMQ) is overkill for Spark’s workload. Redis handles thousands of tasks/second with minimal overhead.

Why Celery for Distributed Execution?

Proven, battle-tested architecture. Celery provides:
  • Horizontal scaling: Add workers without code changes
  • Reliability: Task retry, result persistence, failure handling
  • Scheduling: Beat scheduler with database backend
  • Monitoring: Built-in task events and state tracking
Celery separates scheduling (Beat) from execution (Workers), allowing:
  • Scale workers independently of scheduler
  • Multiple workers on different machines
  • Worker specialization (different queues)

Why Adapters for Source Abstraction?

Each source has unique APIs. Adapters solve: LangFlow specifics:
  • Component-based I/O (tweaks mapping)
  • Session management for stateful flows
  • Different endpoints for running vs testing
Hive specifics:
  • Three types: agent, team, workflow
  • Different endpoints per type
  • Different input/output formats
Common adapter interface (BaseWorkflowAdapter) allows:
  • Add new sources without modifying core
  • Consistent error handling across sources
  • Unified result format (WorkflowExecutionResult)
  • Easy testing (mock adapters)

Design Principles

  • API layer: HTTP interface only
  • Database layer: State persistence only
  • Queue layer: Task distribution only
  • Execution layer: Workflow running only
Each component has one job. No circular dependencies.
  • FastAPI async handlers
  • SQLAlchemy async engine (asyncpg)
  • Celery async tasks
Maximizes throughput without blocking. API can handle concurrent requests while workers execute long-running workflows.
  • All schedules in PostgreSQL
  • All task results in PostgreSQL
  • Redis is ephemeral (queue only)
Restart any component without data loss. Redis can be flushed and rebuilt from PostgreSQL.
  • Schedule sync doesn’t duplicate workflows
  • Task creation checks for existing tasks
  • Adapter calls handle duplicate requests
Safe to retry operations. No side effects from re-runs.

Scalability & Reliability

Horizontal Scaling

Workers scale independently:
# Run multiple workers
automagik-spark workers start --concurrency 4  # Worker 1
automagik-spark workers start --concurrency 4  # Worker 2
automagik-spark workers start --concurrency 4  # Worker 3
Each worker pulls from the same Redis queue. No coordination needed.

Failure Handling

Tasks retry automatically:
  • Celery default: 3 retries with exponential backoff
  • Task state persisted in PostgreSQL
  • Failed tasks can be manually retried via API
Worker crashes:
  • Task remains in queue
  • Another worker picks it up
  • No data loss (state in PostgreSQL)
Database failures:
  • Workers wait for database to recover
  • Beat pauses schedule checks
  • API returns 503 (service unavailable)

Monitoring

Track system health:
# View active schedules
automagik-spark schedules list

# View recent tasks
automagik-spark tasks list --limit 50

# Worker status (check logs)
automagik-spark workers start  # Shows active schedules on startup
Task execution logs:
  • Stored in PostgreSQL (tasks table)
  • Includes: start time, end time, status, result, error
  • Query via API: GET /api/v1/tasks?status=failed

Next Steps