Skip to content

Latest commit

 

History

History
529 lines (415 loc) · 11.9 KB

File metadata and controls

529 lines (415 loc) · 11.9 KB

DAG Loading System

Cascade features an automatic DAG loading system that's superior to Airflow's approach with multiple improvements.

Key Improvements Over Airflow

Feature Airflow Cascade
Multiple Sources Local directory only Local directory + S3 bucket (simultaneously)
Hot-Reloading Restart required Automatic detection & reload
Change Detection Full rescan Checksum-based (only reloads changed DAGs)
Validation Basic Comprehensive (nodes, edges, cycles, types)
Error Handling Parse errors can crash Graceful degradation (bad DAGs logged, not loaded)
File Formats Python only JSON + Elixir (.exs)
Deletion Handling Manual cleanup Automatic DAG disabling
Scan Interval Fixed 30s Configurable per deployment

How It Works

The DagLoader GenServer:

  1. Scans sources (local directory and/or S3 bucket) at configurable intervals
  2. Detects changes using MD5 checksums (only reloads changed files)
  3. Validates DAGs before loading (prevents bad DAGs from breaking the system)
  4. Upserts DAGs (creates new or updates existing)
  5. Handles deletions by disabling DAGs when source files are removed
  6. Logs everything for debugging and auditing
┌─────────────────┐         ┌─────────────────┐
│  Local Files    │         │   S3 Bucket     │
│  (./dags/*.json)│         │ (dags/*.json)   │
└────────┬────────┘         └────────┬────────┘
         │                           │
         └───────────┬───────────────┘
                     │
                     ▼
           ┌─────────────────┐
           │   DagLoader     │
           │   (scan every   │
           │    30 seconds)  │
           └────────┬────────┘
                    │
         ┌──────────┼──────────┐
         │          │          │
         ▼          ▼          ▼
    Validate   Calculate   Upsert
    DAG        Checksum    to DB

Configuration

Configure via environment variables:

# Local directory to scan (default: ./dags)
export DAGS_DIR="./dags"

# Scan interval in seconds (default: 30)
export DAGS_SCAN_INTERVAL=30

# Enable/disable auto-loading (default: true)
export DAGS_ENABLED=true

# Optional: S3 bucket for remote DAGs
export DAGS_S3_BUCKET="my-company-dags"
export DAGS_S3_PREFIX="production/dags/"

Docker/ECS Configuration

In docker-compose.yml:

services:
  cascade:
    environment:
      - DAGS_DIR=/app/dags
      - DAGS_SCAN_INTERVAL=60
      - DAGS_S3_BUCKET=my-dags-bucket
      - DAGS_S3_PREFIX=dags/

In ECS task definition:

{
  "environment": [
    {"name": "DAGS_DIR", "value": "/app/dags"},
    {"name": "DAGS_SCAN_INTERVAL", "value": "60"},
    {"name": "DAGS_S3_BUCKET", "value": "my-dags-bucket"}
  ]
}

File Formats

JSON Format (.json)

Simple, declarative DAG definitions:

{
  "nodes": [
    {
      "id": "extract",
      "type": "local",
      "config": {
        "module": "MyApp.Tasks.Extract",
        "timeout": 300,
        "retry": 3
      }
    },
    {
      "id": "transform",
      "type": "local",
      "config": {
        "module": "MyApp.Tasks.Transform",
        "timeout": 300
      },
      "depends_on": ["extract"]
    }
  ],
  "edges": [
    {"from": "extract", "to": "transform"}
  ],
  "description": "ETL Pipeline",
  "enabled": true
}

Elixir Format (.exs)

For dynamic DAG generation:

# Dynamic configuration
num_parallel_tasks = System.get_env("PARALLEL_TASKS", "5") |> String.to_integer()

# Generate tasks programmatically
tasks = for i <- 1..num_parallel_tasks do
  %{
    "id" => "parallel_#{i}",
    "type" => "local",
    "config" => %{"module" => "MyApp.Task"}
  }
end

# Return DAG definition
%{
  "nodes" => tasks,
  "edges" => [],
  "description" => "Dynamically generated with #{num_parallel_tasks} tasks"
}

DAG Structure

Required Fields

%{
  "nodes" => [
    %{
      "id" => "unique_task_id",    # Required: unique identifier
      "type" => "local" | "lambda", # Required: task type
      "config" => %{...}            # Required: task configuration
    }
  ]
}

Optional Fields

%{
  "edges" => [                      # Dependencies between tasks
    %{"from" => "task1", "to" => "task2"}
  ],
  "description" => "...",           # Human-readable description
  "enabled" => true                 # Enable/disable DAG
}

Task Dependencies

Two ways to specify dependencies:

  1. Via depends_on in node:
{
  "id": "task2",
  "type": "local",
  "depends_on": ["task1"],
  "config": {...}
}
  1. Via edges array (preferred):
{
  "nodes": [...],
  "edges": [
    {"from": "task1", "to": "task2"}
  ]
}

Validation

DAGs are validated before loading. The validator checks:

1. Required Fields

  • nodes array must exist and not be empty
  • ✅ Each node must have id, type, and config

2. Uniqueness

  • ✅ Node IDs must be unique
  • ✅ No duplicate node definitions

3. References

  • ✅ All edges must reference existing nodes
  • ✅ All depends_on entries must reference existing nodes

4. Cycles

  • ✅ No circular dependencies allowed (uses topological sort)

5. Types

  • ✅ Task types must be valid (local, lambda)

Example validation errors:

❌ Missing required fields: nodes
❌ Duplicate node IDs: task_1
❌ DAG contains a cycle (circular dependency)
❌ All edges must have valid 'from' and 'to' node IDs

S3 Integration

Setup

  1. Configure AWS credentials (via IAM role or environment variables)
  2. Set S3 environment variables:
    export DAGS_S3_BUCKET="my-dags-bucket"
    export DAGS_S3_PREFIX="production/dags/"
  3. Upload DAG files to S3:
    aws s3 cp my_dag.json s3://my-dags-bucket/production/dags/

S3 Permissions Required

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetObject"
      ],
      "Resource": [
        "arn:aws:s3:::my-dags-bucket",
        "arn:aws:s3:::my-dags-bucket/production/dags/*"
      ]
    }
  ]
}

S3 Benefits

  • Central storage: Share DAGs across multiple Cascade instances
  • Version control: Use S3 versioning for DAG history
  • Access control: Fine-grained IAM permissions
  • Scalability: No file system dependencies

Operational Commands

Check Loader Status

# In IEx console or remote shell
iex> Cascade.DagLoader.get_status()
%{
  enabled: true,
  scan_interval: 30000,
  loaded_dags: ["example_etl", "lambda_pipeline"],
  dag_count: 2,
  local_source: %{type: :local, path: "./dags"},
  s3_source: nil
}

Force Immediate Scan

iex> Cascade.DagLoader.scan_now()
:ok

Disable Auto-Loading

# Temporarily disable
export DAGS_ENABLED=false

# Or restart with disabled loading
docker restart cascade-app

Monitoring & Debugging

Log Messages

The DAG loader produces structured logs:

🔄 [DAG_LOADER] Starting DAG loader (scan_interval=30000ms)
📂 [LOCAL_SOURCE] Scanning directory: ./dags
☁️  [S3_SOURCE] Scanning S3: s3://my-bucket/dags/
📥 [DAG_LOADER] Loading DAG: example_etl from local:./dags/example_etl.json
✅ [DAG_LOADER] Successfully loaded DAG: example_etl
❌ [DAG_LOADER] Failed to load DAG example_etl: JSON parse error
🗑️  [DAG_LOADER] DAG deleted: old_dag

Troubleshooting

DAG not loading:

  1. Check file permissions
  2. Verify JSON/Elixir syntax
  3. Check validation errors in logs
  4. Ensure DAGS_ENABLED=true

Changes not detected:

  1. Wait for next scan interval
  2. Force scan with DagLoader.scan_now()
  3. Verify file actually changed (checksum-based)

S3 DAGs not loading:

  1. Verify AWS credentials
  2. Check S3 bucket permissions
  3. Verify bucket name and prefix
  4. Check S3 logs for access denied errors

Best Practices

1. File Organization

dags/
├── README.md
├── production/
│   ├── daily_etl.json
│   └── hourly_sync.json
├── staging/
│   ├── test_pipeline.json
│   └── debug_flow.exs
└── templates/
    └── example_template.exs

2. Naming Conventions

  • Use descriptive, kebab-case names: daily-etl-pipeline.json
  • File name becomes DAG name (without extension)
  • Avoid special characters

3. Version Control

# Keep DAGs in version control
git add dags/*.json
git commit -m "Add new ETL pipeline"
git push

# Deploy to S3 from CI/CD
aws s3 sync ./dags/ s3://my-bucket/production/dags/

4. Testing

# Test DAG locally before deploying
mix test apps/cascade/test/cascade/dag_loader_test.exs

# Validate JSON syntax
cat dags/my_dag.json | jq .

# Test .exs files
elixir dags/my_dag.exs

5. Documentation

Always include descriptions in DAG files:

{
  "description": "Daily ETL: Extracts from API, transforms data, loads to warehouse",
  "nodes": [...]
}

Migration from Mix Tasks

Before (using Mix task):

# In application startup or manual command
Mix.Task.run("cascade.load_dag", ["daily_etl", "dags/daily_etl.json"])

After (automatic loading):

  1. Move DAG files to ./dags directory:

    mv my_dag.json ./dags/
  2. Remove manual loading code

  3. DAG loads automatically on application start and every 30s

Benefits

  • ✅ No manual intervention required
  • ✅ Works in production (no Mix dependency)
  • ✅ Hot-reloading without restart
  • ✅ Centralized DAG management

Performance

Scan Performance

  • Local directory: < 10ms for 100 files
  • S3 bucket: < 500ms for 100 files (varies by network)
  • Validation: < 1ms per DAG
  • Checksum calculation: < 1ms per DAG

Optimization Tips

  1. Adjust scan interval for large DAG sets:

    export DAGS_SCAN_INTERVAL=60  # Reduce scan frequency
  2. Use S3 prefix to limit scope:

    export DAGS_S3_PREFIX="production/active-dags/"
  3. Disable if not needed:

    export DAGS_ENABLED=false

Advanced Use Cases

Multi-Environment DAGs

# Production
export DAGS_S3_PREFIX="production/dags/"

# Staging
export DAGS_S3_PREFIX="staging/dags/"

# Development
export DAGS_DIR="./dev-dags"
export DAGS_S3_BUCKET=""

Gradual Rollout

# Use .exs for feature flags
enabled = System.get_env("NEW_PIPELINE_ENABLED", "false") == "true"

%{
  "nodes" => [...],
  "enabled" => enabled,
  "description" => "New pipeline (controlled by NEW_PIPELINE_ENABLED)"
}

Template Generation

# Template in dags/templates/etl_template.exs
defmodule ETLTemplate do
  def generate(source, destination) do
    %{
      "nodes" => [
        %{"id" => "extract_#{source}", "type" => "local", ...},
        %{"id" => "load_#{destination}", "type" => "local", ...}
      ],
      "description" => "ETL from #{source} to #{destination}"
    }
  end
end

# Generate actual DAG
ETLTemplate.generate("api", "warehouse")

Security Considerations

  1. S3 bucket encryption: Use server-side encryption (SSE-S3 or SSE-KMS)
  2. IAM roles: Use least-privilege IAM roles (read-only S3 access)
  3. File permissions: Restrict write access to DAG files
  4. Code review: Review .exs files carefully (they execute Elixir code)
  5. Validation: DAG validation prevents malformed workflows

Future Enhancements

Potential improvements under consideration:

  • DAG versioning with rollback capability
  • Webhook notifications on DAG changes
  • UI for DAG management
  • Git integration (pull from repository)
  • Schema validation for task configs
  • DAG templates library