System Architecture
Spark is built on a distributed task queue architecture that separates concerns: scheduling, execution, and state management. This design enables reliable 24/7 automation with horizontal scalability.Core Components
FastAPI Server
REST API (Port 8883)HTTP server that handles all user interactions: creating sources, syncing workflows, managing schedules, and viewing task history. Built with FastAPI for async performance and automatic OpenAPI documentation.
PostgreSQL Database
State Storage (Port 5402)Primary data store for all workflow metadata: sources, workflows, schedules, tasks, and execution results. Uses SQLAlchemy ORM with async support (asyncpg driver) for efficient database operations.
Redis
Task Queue (Port 5412)In-memory broker for Celery’s task queue and result backend. Holds pending tasks waiting for workers and stores task results temporarily. Redis persistence enabled via appendonly mode.
Celery Beat
Schedule MonitorDedicated process that continuously checks PostgreSQL for active schedules. When a schedule’s time arrives (cron expression or interval), Beat creates a task and pushes it to Redis queue.
Celery Workers
Task ExecutorsWorker processes that pull tasks from Redis queue, load the appropriate adapter, call the external source API, and save results to PostgreSQL. Scale horizontally by running multiple workers.
Workflow Adapters
Source AbstractionAdapter classes that translate Spark’s generic task format into source-specific API calls. Each source (LangFlow, Hive) has its own adapter handling authentication, input/output mapping, and error normalization.
Data Flow: Schedule to Execution
Here’s exactly what happens when a schedule triggers a workflow execution:1
Schedule Creation
User creates a schedule via API/CLI:Schedule stored in PostgreSQL with
status="active" and calculated next_run_at timestamp.2
Schedule Monitoring
Celery Beat runs a loop every 5 seconds (configurable via
beat_max_loop_interval):- Queries PostgreSQL for schedules where
next_run_at <= NOW() - Uses custom
DatabaseSchedulerclass (not file-based) - Calculates next run time based on cron expression (via croniter library)
3
Task Creation
When schedule time arrives, Beat:
- Creates Celery task:
execute_workflow_task.apply_async() - Pushes task to Redis queue with workflow ID and input data
- Updates schedule’s
next_run_atin PostgreSQL - Creates Task record in PostgreSQL with
status="pending"
4
Task Pickup
Celery Worker (running in separate process):
- Polls Redis queue for pending tasks
- Picks up task using
worker_prefetch_multiplier=1(one at a time) - Updates Task record to
status="running"in PostgreSQL
5
Adapter Invocation
Worker loads workflow from PostgreSQL and:
- Identifies source type (langflow, automagik-hive)
- Instantiates appropriate adapter from registry
- Calls adapter’s
run_flow_sync()method with input data - Adapter handles source-specific API authentication and request format
6
External API Call
Adapter makes HTTP request to source:
- LangFlow:
POST /api/v1/run/{flow_id}with tweaks mapping - Hive: Different endpoints for agent/team/workflow types
- Includes API key in headers
- Handles session management and polling for results
7
Result Processing
Adapter normalizes response into Worker saves result to Task record in PostgreSQL.
WorkflowExecutionResult:8
Status Update
Worker updates Task record with:
status="completed"orstatus="failed"resultfield with execution outputerrorfield if failedcompleted_attimestamp
GET /api/v1/tasks/{task_id}Port Configuration
Spark uses non-standard ports to avoid conflicts with other services:| Component | Default Port | Environment Variable | Purpose |
|---|---|---|---|
| API Server | 8883 | AUTOMAGIK_SPARK_API_PORT | REST API endpoint |
| PostgreSQL | 5402 | AUTOMAGIK_SPARK_POSTGRES_PORT | Database connections |
| Redis | 5412 | AUTOMAGIK_SPARK_REDIS_PORT | Task queue broker |
| LangFlow | 7860 | LANGFLOW_PORT | External workflow source |
| Hive | 8881 | AUTOMAGIK_API_PORT | External agent source |
All ports are configurable via environment variables. Default ports chosen to avoid conflicts with standard PostgreSQL (5432), Redis (6379), and other common services.
Connection Strings
Spark uses these connection patterns:PostgreSQL Connection
PostgreSQL Connection
Redis Connection
Redis Connection
API Authentication
API Authentication
How It All Works Together
Why PostgreSQL for Metadata?
Relational data needs relational storage. Spark manages complex relationships:- Workflows belong to Sources
- Schedules reference Workflows
- Tasks track execution history
- Components define workflow inputs/outputs
- ACID transactions for schedule creation (ensure consistency)
- Foreign keys to maintain referential integrity
- Indexes for fast schedule lookups by
next_run_at - JSON columns for flexible workflow data storage
Why Redis for Queuing?
Speed and simplicity. Redis is optimized for the queue use case:- In-memory performance for sub-millisecond task pickup
- Pub/Sub support for Celery’s result backend
- Persistence (appendonly) prevents task loss on restart
- Simple data structures (lists, sets) match queue patterns
Why Celery for Distributed Execution?
Proven, battle-tested architecture. Celery provides:- Horizontal scaling: Add workers without code changes
- Reliability: Task retry, result persistence, failure handling
- Scheduling: Beat scheduler with database backend
- Monitoring: Built-in task events and state tracking
- Scale workers independently of scheduler
- Multiple workers on different machines
- Worker specialization (different queues)
Why Adapters for Source Abstraction?
Each source has unique APIs. Adapters solve: LangFlow specifics:- Component-based I/O (tweaks mapping)
- Session management for stateful flows
- Different endpoints for running vs testing
- Three types: agent, team, workflow
- Different endpoints per type
- Different input/output formats
BaseWorkflowAdapter) allows:
- Add new sources without modifying core
- Consistent error handling across sources
- Unified result format (
WorkflowExecutionResult) - Easy testing (mock adapters)
Design Principles
Separation of Concerns
Separation of Concerns
- API layer: HTTP interface only
- Database layer: State persistence only
- Queue layer: Task distribution only
- Execution layer: Workflow running only
Async by Default
Async by Default
- FastAPI async handlers
- SQLAlchemy async engine (asyncpg)
- Celery async tasks
Database as Source of Truth
Database as Source of Truth
- All schedules in PostgreSQL
- All task results in PostgreSQL
- Redis is ephemeral (queue only)
Idempotent Operations
Idempotent Operations
- Schedule sync doesn’t duplicate workflows
- Task creation checks for existing tasks
- Adapter calls handle duplicate requests
Scalability & Reliability
Horizontal Scaling
Workers scale independently:Failure Handling
Tasks retry automatically:- Celery default: 3 retries with exponential backoff
- Task state persisted in PostgreSQL
- Failed tasks can be manually retried via API
- Task remains in queue
- Another worker picks it up
- No data loss (state in PostgreSQL)
- Workers wait for database to recover
- Beat pauses schedule checks
- API returns 503 (service unavailable)
Monitoring
Track system health:- Stored in PostgreSQL (
taskstable) - Includes: start time, end time, status, result, error
- Query via API:
GET /api/v1/tasks?status=failed

