Skip to main content

Add Source and Sync First Workflow

This example walks through adding a LangFlow source and syncing your first workflow.

Step 1: Add LangFlow Source

automagik-spark sources add \
  --name my-langflow \
  --type langflow \
  --url http://localhost:7860 \
  --api-key sk-abc123
Expected Output:
Health check passed: status ok
Version check passed: 1.0.24
Successfully added source: http://localhost:7860

Step 2: List Available Flows

automagik-spark workflows sync
Expected Output:
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ ID             ┃ Name           ┃ Description         ┃ Source   ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ abc-123-def-45 │ Chat Bot       │ AI chat assistant   │ langflow │
│ 6              │                │                     │          │
│ ghi-789-jkl-01 │ Data Pipeline  │ Process CSV data    │ langflow │
│ 2              │                │                     │          │
└────────────────┴────────────────┴─────────────────────┴──────────┘

Step 3: Sync a Specific Flow

automagik-spark workflows sync abc-123-def-456
Expected Output:
Successfully synced flow abc-123-def-456

Step 4: Verify Workflow

automagik-spark workflows list
Expected Output:
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ ID               ┃ Name           ┃ Latest Run ┃ Tasks (Failed) ┃ Schedules ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ 550e8400-e29b-41 │ Chat Bot       │ NEW        │ 0 (0)          │ 0         │
└──────────────────┴────────────────┴────────────┴────────────────┴───────────┘

Troubleshooting

Problem: Source validation failed: Connection refused Solution: Ensure LangFlow is running on the specified URL:
curl http://localhost:7860/health
Problem: Failed to sync flow: API key invalid Solution: Verify your API key in LangFlow settings and update:
automagik-spark sources update http://localhost:7860 --api-key sk-new-key

Create Daily Schedule

This example creates a schedule that runs a workflow every day at 8 AM.

Step 1: Start Interactive Schedule Creation

automagik-spark schedules create

Step 2: Follow Interactive Prompts

Available Workflows:
0: Chat Bot (0 schedules)
1: Data Pipeline (1 schedules)

Select a workflow: 0

Schedule Type:
  0: Interval (e.g., every 30 minutes)
  1: Cron (e.g., every day at 8 AM)
  2: One-time (run once at a specific time)

Select schedule type: 1

Cron Examples:
  * * * * *     - Every minute
  */5 * * * *   - Every 5 minutes
  0 * * * *     - Every hour
  0 0 * * *     - Every day at midnight
  0 8 * * *     - Every day at 8 AM
  0 8 * * 1-5   - Every weekday at 8 AM

Enter cron expression: 0 8 * * *

Enter input value: Daily report generation

Schedule created successfully with ID: 990e8400-e29b-41d4-a716-446655440000

Step 3: Verify Schedule

automagik-spark schedules list
Expected Output:
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ ID             ┃ Workflow    ┃ Type    ┃ Expression ┃ Next Run    ┃ Status       ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ 990e8400-e29b- │ Chat Bot    │ cron    │ 0 8 * * *  │ 2025-11-05  │ ● ACTIVE     │
│ 41d4-a716-4466 │             │         │            │ 08:00       │              │
│ 55440000       │             │         │            │             │              │
└────────────────┴─────────────┴─────────┴────────────┴─────────────┴──────────────┘

Step 4: Start Worker to Execute Schedule

automagik-spark worker start

Common Schedule Patterns

Every 5 minutes:
Schedule type: 0 (interval)
Expression: 5m
Every hour:
Schedule type: 0 (interval)
Expression: 1h
Every weekday at 9 AM:
Schedule type: 1 (cron)
Expression: 0 9 * * 1-5
Every Monday at 10 AM:
Schedule type: 1 (cron)
Expression: 0 10 * * 1
Every 6 hours:
Schedule type: 0 (interval)
Expression: 6h

Modify Schedule Later

Update the schedule time:
automagik-spark schedules set-expression 990e8400-e29b-41d4-a716-446655440000 "0 9 * * *"
Update the input data:
automagik-spark schedules set-input 990e8400-e29b-41d4-a716-446655440000 "Updated report config"
Pause the schedule temporarily:
automagik-spark schedules update 990e8400-e29b-41d4-a716-446655440000 pause
Resume the schedule:
automagik-spark schedules update 990e8400-e29b-41d4-a716-446655440000 resume

Debug Failed Task

This example shows how to identify and fix a failed workflow execution.

Step 1: List Failed Tasks

automagik-spark tasks list --status failed --limit 10
Expected Output:
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┓
┃ ID             ┃ Workflow     ┃ Status      ┃ Created          ┃ Updated          ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━┩
│ 770e8400-e29b- │ Data Pipelin │ ✗ FAILED    │ 2025-11-04 15:32 │ 2025-11-04 15:33 │
│ 41d4-a716-4466 │ e            │             │                  │                  │
│ 55440002       │              │             │                  │                  │
└────────────────┴──────────────┴─────────────┴──────────────────┴──────────────────┘

Step 2: View Task Details

automagik-spark tasks view 770e8400
Expected Output:
Task Details:
ID: 770e8400-e29b-41d4-a716-446655440002
Workflow: Data Pipeline
Status: failed
Created: 2025-11-04 15:32:10
Updated: 2025-11-04 15:33:45
Started: 2025-11-04 15:32:11
Finished: 2025-11-04 15:33:45

Input:
"process_file.csv"

Output:
None

Error:
HTTPError: 500 Server Error: Internal Server Error for url: http://localhost:7860/api/v1/run/abc-123
Response: {"detail": "Component 'FileReader' not found in workflow"}

Step 3: Identify the Problem

The error shows that the workflow is missing the ‘FileReader’ component. This could mean:
  1. The workflow was modified in LangFlow after syncing
  2. The component was removed or renamed

Step 4: Fix the Workflow

Option A: Re-sync the workflow from LangFlow If you fixed the workflow in LangFlow:
automagik-spark workflows sync abc-123-def-456
Option B: Check LangFlow directly Verify the workflow exists and is valid:
curl -H "x-api-key: sk-abc123" http://localhost:7860/api/v1/flows/abc-123

Step 5: Retry the Failed Task

automagik-spark tasks retry 770e8400
Expected Output:
Task 770e8400 queued for retry

Step 6: Verify Success

automagik-spark tasks list --workflow-id 550e8400-e29b-41d4-a716-446655440000 --limit 5
Expected Output:
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┓
┃ ID             ┃ Workflow     ┃ Status      ┃ Created          ┃ Updated          ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━┩
│ 880e8400-e29b- │ Data Pipelin │ ✓ COMPLETED │ 2025-11-04 15:40 │ 2025-11-04 15:41 │
│ 41d4-a716-4466 │ e            │             │                  │                  │
│ 55440003       │              │             │                  │                  │
└────────────────┴──────────────┴─────────────┴──────────────────┴──────────────────┘

Common Failure Patterns

Connection Errors:
Error: Connection refused
  • Check if the source (LangFlow/Hive) is running
  • Verify the source URL is correct
  • Check network/firewall settings
Authentication Errors:
Error: 401 Unauthorized
  • Verify the API key is correct
  • Update the source API key:
    automagik-spark sources update http://localhost:7860 --api-key sk-new-key
    
Timeout Errors:
Error: Request timeout
  • Workflow is taking too long to execute
  • Check the workflow for infinite loops or long operations
  • Increase timeout in worker configuration
Invalid Input:
Error: Invalid input format
  • Check the input data format expected by the workflow
  • View workflow definition to understand required input structure

Debugging Tips

Check worker logs:
automagik-spark worker logs --tail --lines 100
Follow worker logs in real-time:
automagik-spark worker logs --follow
Test workflow manually:
automagik-spark workflows run 550e8400-e29b-41d4-a716-446655440000 --input "test data"
Check source connectivity:
automagik-spark sources list

Scale Workers

This example shows how to scale worker capacity for high-throughput workloads.

Check Current Worker Status

automagik-spark worker status
Expected Output:
Worker is running (PID: 12345)

Active tasks:
celery@automagik: 2 tasks
  - execute_workflow (id: abc-123)
  - execute_workflow (id: def-456)

Beat scheduler is running (PID: 12346)

Understanding Worker Concurrency

Spark uses Celery workers with configurable concurrency:
  • Default: 2 worker threads
  • Recommendation: Number of CPU cores for CPU-bound tasks
  • Recommendation: 2-4x CPU cores for I/O-bound tasks (API calls, database queries)

Stop Current Worker

automagik-spark worker stop
Expected Output:
Worker stopped successfully
Beat scheduler stopped successfully

Start Worker with More Threads

For 4 concurrent tasks:
automagik-spark worker start --threads 4
For 8 concurrent tasks:
automagik-spark worker start --threads 8
For 16 concurrent tasks (high-throughput):
automagik-spark worker start --threads 16 --daemon

Monitor Worker Performance

Check active tasks:
automagik-spark worker status
Check task throughput:
automagik-spark tasks list --limit 50
Watch logs in real-time:
automagik-spark worker logs --follow

Horizontal Scaling with Multiple Workers

For even higher throughput, run multiple worker processes: Terminal 1 - Primary worker:
AUTOMAGIK_SPARK_WORKER_LOG=logs/worker1.log \
  automagik-spark worker start --threads 8
Terminal 2 - Secondary worker (no beat scheduler):
# Start only the Celery worker without beat scheduler
celery -A automagik_spark.core.celery.celery_app worker \
  --loglevel=INFO \
  -P prefork \
  --concurrency 8 \
  --hostname celery2@automagik \
  --logfile logs/worker2.log
Terminal 3 - Tertiary worker:
celery -A automagik_spark.core.celery.celery_app worker \
  --loglevel=INFO \
  -P prefork \
  --concurrency 8 \
  --hostname celery3@automagik \
  --logfile logs/worker3.log

Production Scaling Recommendations

Small workload (< 100 tasks/day):
automagik-spark worker start --threads 2 --daemon
Medium workload (100-1000 tasks/day):
automagik-spark worker start --threads 4 --daemon
Large workload (1000-10000 tasks/day):
automagik-spark worker start --threads 8 --daemon
Very large workload (> 10000 tasks/day):
  • Use multiple worker processes (horizontal scaling)
  • Monitor Redis memory usage
  • Consider Redis cluster for high availability
  • Use PostgreSQL connection pooling

Docker Compose Scaling

For production deployments with Docker Compose:
version: '3.8'

services:
  spark-worker:
    image: automagik/spark:latest
    command: automagik-spark worker start --threads 4 --daemon
    environment:
      - AUTOMAGIK_DATABASE_URL=postgresql+asyncpg://user:pass@postgres:5432/spark
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - postgres
      - redis
    deploy:
      replicas: 3  # Run 3 worker instances
      resources:
        limits:
          cpus: '2'
          memory: 2G
Scale up/down:
docker-compose up -d --scale spark-worker=5  # Scale to 5 workers

Monitoring Metrics

Task completion rate:
# Count completed tasks in last hour
automagik-spark tasks list --status completed --limit 1000 | grep "$(date -d '1 hour ago' +%Y-%m-%d)"
Task failure rate:
# Count failed tasks
automagik-spark tasks list --status failed --limit 100
Average task duration: Check worker logs for task execution times:
automagik-spark worker logs --tail | grep "Task.*succeeded"

Performance Tuning

Redis Configuration:
# Increase Redis maxmemory for high throughput
# In redis.conf or via Docker environment
maxmemory 2gb
maxmemory-policy allkeys-lru
PostgreSQL Connection Pooling:
# In .env
AUTOMAGIK_DATABASE_URL=postgresql+asyncpg://user:pass@postgres:5432/spark?pool_size=20&max_overflow=40
Celery Task Prefetch:
# Reduce prefetch for better distribution across workers
celery -A automagik_spark.core.celery.celery_app worker \
  --prefetch-multiplier 1 \
  --concurrency 8

Troubleshooting Scaling Issues

Workers not picking up tasks:
  • Check Redis connection: redis-cli ping
  • Verify all workers connect to same Redis instance
  • Check worker logs: automagik-spark worker logs --follow
High memory usage:
  • Reduce worker threads
  • Check for memory leaks in workflows
  • Monitor with: docker stats or htop
Tasks timing out:
  • Increase Celery task timeout
  • Optimize workflow execution time
  • Check network latency to LangFlow/Hive

Additional Examples

Run Workflow Ad-Hoc

Execute a workflow immediately without creating a schedule:
automagik-spark workflows run 550e8400-e29b-41d4-a716-446655440000 \
  --input "Process this data"

Batch Sync All Workflows

Sync all workflows from all sources:
automagik-spark workflows sync
Sync from specific source:
automagik-spark workflows sync --source http://localhost:7860

Export Schedule List

Export schedule information to a file:
automagik-spark schedules list > schedules.txt

Monitor Schedules in Real-Time

Watch for schedule changes:
watch -n 5 'automagik-spark schedules list'

Database Maintenance

Create migration after model changes:
automagik-spark db migrate --message "Add new fields"
Apply migrations:
automagik-spark db upgrade
Clear test data:
automagik-spark db clear

Check Telemetry Status

View telemetry information:
automagik-spark telemetry status
automagik-spark telemetry info
Disable telemetry:
automagik-spark telemetry disable

Next Steps