Skip to main content

Overview

This guide covers production scaling strategies for Automagik Spark. Learn how to horizontally scale workers, optimize database connections, manage Redis memory, and configure task queues for high-throughput workflow execution.

Worker Scaling

Horizontal Scaling with Docker Compose

Scale workers to handle increased task volume:
# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: automagik_spark
      POSTGRES_USER: spark_user
      POSTGRES_PASSWORD: spark_password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 2gb --maxmemory-policy allkeys-lru
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  spark-api:
    image: automagik/spark:latest
    command: automagik-spark api start --host 0.0.0.0 --port 8883
    ports:
      - "8883:8883"
    environment:
      AUTOMAGIK_SPARK_DATABASE_URL: postgresql://spark_user:spark_password@postgres:5432/automagik_spark
      AUTOMAGIK_SPARK_CELERY_BROKER_URL: redis://redis:6379/0
      AUTOMAGIK_SPARK_CELERY_RESULT_BACKEND: redis://redis:6379/0
      AUTOMAGIK_SPARK_API_KEY: your-api-key-here
    depends_on:
      - postgres
      - redis

  spark-worker:
    image: automagik/spark:latest
    command: automagik-spark worker start --threads 4
    environment:
      AUTOMAGIK_SPARK_DATABASE_URL: postgresql://spark_user:spark_password@postgres:5432/automagik_spark
      AUTOMAGIK_SPARK_CELERY_BROKER_URL: redis://redis:6379/0
      AUTOMAGIK_SPARK_CELERY_RESULT_BACKEND: redis://redis:6379/0
      CELERY_WORKER_CONCURRENCY: 4
    depends_on:
      - postgres
      - redis
    deploy:
      replicas: 3  # Run 3 worker containers

  spark-beat:
    image: automagik/spark:latest
    command: celery -A automagik_spark.core.celery.celery_app beat --loglevel=info
    environment:
      AUTOMAGIK_SPARK_DATABASE_URL: postgresql://spark_user:spark_password@postgres:5432/automagik_spark
      AUTOMAGIK_SPARK_CELERY_BROKER_URL: redis://redis:6379/0
      AUTOMAGIK_SPARK_CELERY_RESULT_BACKEND: redis://redis:6379/0
    depends_on:
      - postgres
      - redis

volumes:
  postgres_data:
  redis_data:

Scale Workers Dynamically

# Scale to 5 worker instances
docker compose up -d --scale spark-worker=5

# Check worker status
docker compose ps

# View logs from all workers
docker compose logs -f spark-worker

# Scale down to 2 workers
docker compose up -d --scale spark-worker=2

Worker Concurrency Configuration

Control how many tasks each worker processes concurrently:
ConfigurationThreadsWorkersTotal CapacityUse Case
Low throughput212 tasksDevelopment, testing
Medium throughput428 tasksSmall production
High throughput4520 tasksMedium production
Very high throughput81080 tasksLarge production
# Set worker concurrency via environment variable
export CELERY_WORKER_CONCURRENCY=8

# Or via CLI flag
automagik-spark worker start --threads 8

Worker Configuration Options

# automagik_spark/core/celery/celery_app.py (reference)

config = {
    "worker_prefetch_multiplier": 1,  # Disable prefetching (fair task distribution)
    "worker_max_tasks_per_child": 100,  # Restart worker after 100 tasks (prevent memory leaks)
    "worker_max_memory_per_child": 200000,  # 200MB memory limit per worker
    "task_track_started": True,  # Track when tasks start (not just queued)
}
Key settings explained:
  • worker_prefetch_multiplier: 1 - Each worker takes one task at a time. Prevents uneven load distribution.
  • worker_max_tasks_per_child: 100 - Worker restarts after 100 tasks. Prevents memory leaks in long-running processes.
  • worker_max_memory_per_child: 200000 - Worker restarts if it exceeds 200MB memory. Protects against runaway memory usage.
  • task_track_started: True - Tasks show “running” status immediately, not just “queued”. Better visibility.

Monitoring Worker Health

# Check active workers
automagik-spark worker status

# View worker logs
automagik-spark worker logs

# Check Celery inspect
celery -A automagik_spark.core.celery.celery_app inspect active

# View worker stats
celery -A automagik_spark.core.celery.celery_app inspect stats

Database Connection Pooling

PostgreSQL Pool Size Tuning

SQLAlchemy creates a connection pool for each process. Configure it based on your worker count:
# Calculating optimal pool size
# Formula: pool_size = (workers * concurrency) + api_connections + overhead

# Example:
# 5 workers * 4 threads = 20 worker connections
# 1 API process * 10 connections = 10 API connections
# Beat scheduler = 2 connections
# Overhead = 5 connections
# Total = 37 connections

# Set PostgreSQL max_connections
# max_connections = total_pool_size * 1.5 (safety margin)
# = 37 * 1.5 = 55 connections

Configure PostgreSQL

-- /etc/postgresql/15/main/postgresql.conf

-- Set max connections based on your calculation
max_connections = 100

-- Connection pooling with PgBouncer (recommended)
-- Install PgBouncer: apt-get install pgbouncer

-- /etc/pgbouncer/pgbouncer.ini
[databases]
automagik_spark = host=localhost port=5432 dbname=automagik_spark

[pgbouncer]
listen_addr = 127.0.0.1
listen_port = 6432
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 100
default_pool_size = 25
reserve_pool_size = 5

Use PgBouncer for Connection Pooling

# docker-compose.yml with PgBouncer

services:
  postgres:
    image: postgres:15
    # ... postgres config

  pgbouncer:
    image: pgbouncer/pgbouncer:latest
    environment:
      DATABASES_HOST: postgres
      DATABASES_PORT: 5432
      DATABASES_USER: spark_user
      DATABASES_PASSWORD: spark_password
      DATABASES_DBNAME: automagik_spark
      POOL_MODE: transaction
      MAX_CLIENT_CONN: 100
      DEFAULT_POOL_SIZE: 25
    ports:
      - "6432:6432"
    depends_on:
      - postgres

  spark-worker:
    # Connect to PgBouncer instead of PostgreSQL directly
    environment:
      AUTOMAGIK_SPARK_DATABASE_URL: postgresql://spark_user:spark_password@pgbouncer:6432/automagik_spark
Benefits of PgBouncer:
  • Reduces PostgreSQL connection overhead
  • Better resource utilization
  • Handles connection spikes gracefully
  • Faster connection establishment

Database Connection Best Practices

# Configure SQLAlchemy engine with proper settings

from sqlalchemy import create_engine

engine = create_engine(
    database_url,
    pool_size=10,  # Base connection pool size
    max_overflow=20,  # Additional connections under load
    pool_timeout=30,  # Wait 30s for connection before timing out
    pool_recycle=3600,  # Recycle connections after 1 hour
    pool_pre_ping=True,  # Test connections before using them
    echo=False,  # Disable SQL query logging in production
)
Tuning parameters:
ParameterDevelopmentProductionHigh Load
pool_size51020
max_overflow102040
pool_timeout303060
pool_recycle360036001800

Redis Memory Management

Configure Redis Memory Limits

# redis.conf

# Set maximum memory (adjust based on your server)
maxmemory 2gb

# Eviction policy when memory limit is reached
maxmemory-policy allkeys-lru

# LRU sample size (larger = more accurate eviction, slower)
maxmemory-samples 5

# Persistence (disable for pure cache usage)
save ""
appendonly no

# Performance tuning
tcp-backlog 511
timeout 300
tcp-keepalive 60

Memory Eviction Policies

PolicyBehaviorUse Case
allkeys-lruEvict least recently used keysRecommended for Spark - Good for task queue
allkeys-lfuEvict least frequently used keysHigh read-to-write ratio
volatile-lruEvict LRU keys with expiration setMixed usage (tasks + cache)
volatile-ttlEvict keys with shortest TTL firstTime-sensitive data
noevictionReturn errors when memory limit reachedGuaranteed task persistence
Recommendation for Spark: Use allkeys-lru for task queues. Tasks are consumed quickly, older tasks can be safely evicted.

Monitor Redis Memory

# Check Redis memory usage
redis-cli info memory

# Expected output:
# used_memory:1024000
# used_memory_human:1000.00K
# used_memory_rss:2048000
# used_memory_peak:5120000
# maxmemory:2147483648
# maxmemory_human:2.00G

# Monitor in real-time
redis-cli --stat

# Check specific keys
redis-cli --scan --pattern "celery-task-meta-*" | wc -l

Redis Persistence Options

For production task queues, choose based on your durability requirements:
# Option 1: No persistence (fastest, tasks lost on restart)
save ""
appendonly no

# Option 2: RDB snapshots (periodic backups)
save 900 1      # Save after 900s if 1 key changed
save 300 10     # Save after 300s if 10 keys changed
save 60 10000   # Save after 60s if 10000 keys changed
appendonly no

# Option 3: AOF (append-only file, most durable)
appendonly yes
appendfsync everysec  # Fsync every second (good balance)
Recommendation: Use RDB snapshots for Spark. Tasks are transient, full durability not required.

Queue Priority Strategies

Configure Task Queues

# automagik_spark/core/celery/celery_app.py

from kombu import Exchange, Queue

config = {
    "task_queues": [
        Queue("celery", Exchange("celery"), routing_key="celery"),
        Queue("direct", Exchange("direct"), routing_key="direct"),
        Queue("high_priority", Exchange("priority"), routing_key="high"),
        Queue("low_priority", Exchange("priority"), routing_key="low"),
    ],
    "task_default_queue": "celery",
    "task_default_exchange": "celery",
    "task_default_routing_key": "celery",
}

Route Tasks by Priority

# Route tasks to different queues based on priority

from celery import shared_task

@shared_task(queue="high_priority")
def critical_workflow(schedule_id: str):
    """High-priority workflow execution"""
    # ... execute workflow

@shared_task(queue="low_priority")
def bulk_report_generation(schedule_id: str):
    """Low-priority batch processing"""
    # ... generate report

Start Workers for Specific Queues

# Worker 1: High-priority tasks only
celery -A automagik_spark.core.celery.celery_app worker \
  --queues=high_priority \
  --concurrency=8 \
  --loglevel=info

# Worker 2: Default and low-priority tasks
celery -A automagik_spark.core.celery.celery_app worker \
  --queues=celery,low_priority \
  --concurrency=4 \
  --loglevel=info

# Worker 3: All queues
celery -A automagik_spark.core.celery.celery_app worker \
  --queues=high_priority,celery,low_priority,direct \
  --concurrency=8 \
  --loglevel=info

Long-Running Task Handling

Configure Task Timeouts

# automagik_spark/core/tasks/workflow_tasks.py

from celery import shared_task

@shared_task(
    bind=True,
    max_retries=3,
    soft_time_limit=300,  # Soft timeout: 5 minutes
    time_limit=360,       # Hard timeout: 6 minutes
)
def execute_workflow(self, schedule_id: str):
    """Execute a workflow with timeout protection"""
    try:
        # ... execute workflow
        pass
    except SoftTimeLimitExceeded:
        # Soft timeout - allow cleanup
        logger.warning(f"Workflow {schedule_id} approaching timeout")
        # ... cleanup
        raise
    except TimeLimitExceeded:
        # Hard timeout - immediate termination
        logger.error(f"Workflow {schedule_id} exceeded hard timeout")
        raise

Retry Configuration

# automagik_spark/core/tasks/celery.py (reference)

config = {
    "task_retry_max_retries": 3,
    "task_retry_backoff": True,  # Exponential backoff
    "task_retry_backoff_max": 600,  # Max 10 minutes between retries
}
Retry behavior:
AttemptDelayCumulative Time
1st retry2s2s
2nd retry4s6s
3rd retry8s14s

Handle Connection Errors with Retry

# automagik_spark/core/tasks/workflow_tasks.py (reference)

@shared_task(bind=True, max_retries=3)
def execute_workflow(self, schedule_id: str):
    try:
        task = _execute_workflow_sync(schedule_id)
        return task.to_dict() if task else None
    except Exception as e:
        error_str = str(e).lower()

        # Retry on transient errors
        if "connection" in error_str or "timeout" in error_str:
            retry_in = 2**self.request.retries  # Exponential backoff
            raise self.retry(exc=e, countdown=retry_in, max_retries=3)

        # Fail immediately on client errors
        else:
            raise e

Monitoring and Alerting

Key Metrics to Track

MetricWarning ThresholdCritical ThresholdAction
Task queue depth> 100 tasks> 500 tasksScale workers
Task failure rate> 5%> 20%Check workflow errors
Worker CPU usage> 80%> 95%Add more workers
Database connections> 70% of max> 90% of maxIncrease pool size
Redis memory usage> 70%> 90%Increase memory limit
Task execution time> 5 minutes> 10 minutesOptimize workflows
PostgreSQL query time> 100ms> 500msAdd indexes, optimize queries

Monitoring Commands

# Check task queue depth
celery -A automagik_spark.core.celery.celery_app inspect reserved | jq 'length'

# Check worker health
automagik-spark worker status

# Monitor database connections
psql -U spark_user -d automagik_spark -c "SELECT count(*) FROM pg_stat_activity;"

# Monitor Redis memory
redis-cli info memory | grep used_memory_human

# View active tasks
celery -A automagik_spark.core.celery.celery_app inspect active

# View scheduled tasks
celery -A automagik_spark.core.celery.celery_app inspect scheduled

Set Up Alerting (Prometheus Example)

# prometheus.yml

scrape_configs:
  - job_name: 'spark-api'
    static_configs:
      - targets: ['localhost:8883']

  - job_name: 'postgres'
    static_configs:
      - targets: ['localhost:9187']

  - job_name: 'redis'
    static_configs:
      - targets: ['localhost:9121']

# Alert rules
groups:
  - name: spark_alerts
    interval: 30s
    rules:
      - alert: HighTaskQueueDepth
        expr: celery_queue_length{queue="celery"} > 100
        for: 5m
        annotations:
          summary: "Task queue depth is high"

      - alert: WorkerDown
        expr: up{job="spark-worker"} == 0
        for: 1m
        annotations:
          summary: "Worker is down"

      - alert: HighDatabaseConnections
        expr: pg_stat_database_numbackends > 80
        for: 5m
        annotations:
          summary: "Database connection count is high"

Performance Tuning Table

ScaleWorkersThreads/WorkerPool SizeRedis MemoryExpected Throughput
Small2410512MB~50 tasks/min
Medium54202GB~200 tasks/min
Large108404GB~800 tasks/min
Enterprise208808GB~1600 tasks/min

Bottleneck Identification

SymptomLikely BottleneckSolution
Tasks queued but not executingToo few workersScale workers
High CPU on workersCPU-bound tasksAdd more worker instances
High memory on workersMemory leaksReduce max_tasks_per_child
Slow database queriesDatabase connections exhaustedIncrease pool size or use PgBouncer
Redis memory errorsQueue overflowIncrease Redis memory or process tasks faster
Tasks timing outLong-running workflowsIncrease soft_time_limit

Load Testing

Simulate High Task Load

# scripts/load_test.py

import requests
import time
from concurrent.futures import ThreadPoolExecutor

API_URL = "http://localhost:8883"
API_KEY = "your-api-key"
WORKFLOW_ID = "your-workflow-id"

def create_task():
    response = requests.post(
        f"{API_URL}/api/v1/workflows/{WORKFLOW_ID}/run",
        headers={"X-API-Key": API_KEY},
        json={"input_data": "Load test"}
    )
    return response.status_code

# Create 1000 tasks with 50 concurrent threads
with ThreadPoolExecutor(max_workers=50) as executor:
    start = time.time()
    results = list(executor.map(lambda _: create_task(), range(1000)))
    duration = time.time() - start

print(f"Created 1000 tasks in {duration:.2f}s")
print(f"Throughput: {1000/duration:.2f} tasks/sec")
print(f"Success rate: {results.count(200)/len(results)*100:.1f}%")

Monitor During Load Test

# Watch queue depth
watch -n 1 'celery -A automagik_spark.core.celery.celery_app inspect reserved | jq "length"'

# Watch worker CPU
watch -n 1 'docker stats --no-stream --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"'

# Watch database connections
watch -n 1 'psql -U spark_user -d automagik_spark -c "SELECT count(*) FROM pg_stat_activity;"'

Next Steps