This directory contains DAG (Directed Acyclic Graph) workflow definitions that are automatically loaded by Cascade.
Cascade's DagLoader automatically scans this directory for DAG files and loads them into the database. The loader:
- 🔄 Scans every 30 seconds (configurable via
DAGS_SCAN_INTERVAL) - 🔥 Hot-reloads changed DAGs without restart
- ✅ Validates DAGs before loading
- 🗑️ Disables DAGs when files are deleted
- ☁️ Supports loading from S3 buckets
Simple JSON format for static DAG definitions:
{
"nodes": [
{
"id": "task_1",
"type": "local",
"config": {
"module": "MyApp.Tasks.Task1",
"timeout": 300
}
}
],
"edges": [
{"from": "task_1", "to": "task_2"}
],
"description": "My DAG description",
"enabled": true
}Use Elixir for dynamic DAG generation:
# You can use variables and logic
tasks = for i <- 1..10 do
%{
"id" => "task_#{i}",
"type" => "local",
"config" => %{"module" => "MyApp.Task"}
}
end
%{
"nodes" => tasks,
"edges" => [],
"description" => "Dynamically generated DAG"
}Configure via environment variables:
# Local directory to scan (default: ./dags)
export DAGS_DIR="./dags"
# Scan interval in seconds (default: 30)
export DAGS_SCAN_INTERVAL=30
# Enable/disable auto-loading (default: true)
export DAGS_ENABLED=true
# Optional: S3 bucket for remote DAGs
export DAGS_S3_BUCKET="my-dags-bucket"
export DAGS_S3_PREFIX="dags/"nodes: Array of task nodes- Each node must have:
id,type,config
- Each node must have:
edges: Array of dependencies (optional)- Each edge:
{"from": "task_1", "to": "task_2"}
- Each edge:
description: DAG description (string)enabled: Enable/disable the DAG (boolean, default: true)
Execute Elixir modules:
{
"id": "my_task",
"type": "local",
"config": {
"module": "MyApp.Tasks.MyTask",
"timeout": 300,
"retry": 3
}
}Execute AWS Lambda functions:
{
"id": "my_lambda",
"type": "lambda",
"config": {
"function_name": "my-lambda-function",
"timeout": 300,
"payload": {
"key": "value"
}
}
}See the example DAGs in this directory:
example_etl.json- Simple ETL pipelinelambda_pipeline.json- Lambda-based workflowadvanced_example.exs- Dynamic DAG generation with Elixir
DAGs are validated before loading. The validator checks:
- ✅ Required fields present
- ✅ All nodes have unique IDs
- ✅ All edges reference existing nodes
- ✅ No circular dependencies (cycles)
- ✅ Valid node types and configurations
Check the logs for validation errors:
# Look for DAG_LOADER messages
docker logs cascade-app | grep DAG_LOADERTrigger an immediate scan:
# In IEx console
Cascade.DagLoader.scan_now()View loader status:
# In IEx console
Cascade.DagLoader.get_status()To load DAGs from S3:
-
Set environment variables:
export DAGS_S3_BUCKET="my-bucket" export DAGS_S3_PREFIX="production/dags/"
-
Upload DAG files to S3:
aws s3 cp my_dag.json s3://my-bucket/production/dags/
-
DAGs will be automatically loaded on next scan
Cascade supports multiple patterns for implementing tasks, similar to how Airflow handles Python tasks.
Use tasks from Cascade's standard library:
{
"config": {
"module": "Cascade.Examples.Tasks.ExtractData"
}
}Pros: Simple, works immediately, no dependencies Cons: Limited to built-in tasks
Define task modules directly in .exs DAG files:
# dags/weather_pipeline.exs
defmodule WeatherTasks.FetchAPI do
def run(payload) do
context = Map.get(payload, :context, %{})
# Implement task logic using Elixir stdlib
{:ok, %{"temperature" => 72, "city" => "Seattle"}}
end
end
# Return DAG definition
%{
"nodes" => [
%{"id" => "fetch", "type" => "local",
"config" => %{"module" => "WeatherTasks.FetchAPI", "timeout" => 60}}
],
"edges" => [],
"description" => "Weather pipeline with inline tasks"
}Pros: Tasks live with DAG, great for simple logic, hot-reload supported Cons: Limited to Elixir stdlib (no external dependencies)
Create reusable task modules across DAGs:
# dags/_shared_tasks.exs (underscore prefix loads first)
defmodule SharedTasks.DataPipeline do
defmodule FetchFromAPI do
def run(payload) do
# Reusable logic here
{:ok, data}
end
end
defmodule ValidateData do
def run(payload) do
# Reusable logic here
{:ok, validated}
end
end
end
# Return nil (library file, not a DAG)
nilUse in any DAG (JSON or .exs):
{
"nodes": [
{"id": "fetch", "config": {"module": "SharedTasks.DataPipeline.FetchFromAPI"}},
{"id": "validate", "config": {"module": "SharedTasks.DataPipeline.ValidateData"}}
]
}Pros: DRY principle, centralized task logic, reusable across DAGs
Cons: Still limited to Elixir stdlib
Tip: Use _ prefix (e.g., _shared_tasks.exs) to ensure library loads before DAGs
For tasks requiring external libraries (HTTPoison, Timex, database clients), build a custom Docker image:
# Dockerfile.custom
FROM ghcr.io/tim-br/cascade:latest
# Add your task modules with dependencies
COPY lib/my_company_tasks /app/lib/my_company_tasks
# Install dependencies
WORKDIR /app
RUN mix deps.get && mix compileBuild and run:
docker build -f Dockerfile.custom -t cascade-custom .
docker run -p 4000:4000 -v ./dags:/app/dags cascade-customDAGs can now reference custom modules:
{
"config": {"module": "MyCompanyTasks.AdvancedProcessor"}
}Pros: Full access to Hex ecosystem, production-ready, fast DAG loading Cons: Requires Docker rebuild for task changes, less dynamic
dags/
mix.exs # Define task dependencies
lib/
my_tasks.ex # Use HTTPoison, Timex, etc.
my_pipeline.json
Status: Under consideration. Challenges include:
- Hot-reload becomes slow (compilation required)
- Dependency version conflicts with main app
- Code loading complexity
Workaround: Use Pattern 4 (custom Docker image) for now
| Feature | Airflow | Cascade |
|---|---|---|
| DAG Format | Python files | JSON or Elixir (.exs) |
| Task Imports | from my_tasks import fetch |
Shared .exs files with _ prefix |
| Inline Tasks | Define in DAG file | Define in .exs DAG file |
| Dependencies | Build custom Docker image | Build custom Docker image |
| Auto-install deps from dags/ | ❌ No | ❌ No (use Docker) |
| Hot-reload | Yes (scheduler restart) | Yes (automatic, ~30s) |
| DAG deletion | Keeps in DB, hides from UI | Keeps in DB, sets enabled: false |
Both philosophies align: keep DAG directory simple, use Docker for dependencies.
- Use meaningful names: File name becomes DAG name
- Version control: Keep DAG files in git
- Test locally: Validate DAGs before deploying
- Use
.exsfor complex logic: Dynamic generation when needed - Keep it simple: Prefer JSON for static workflows
- Add descriptions: Document what each DAG does
- Set appropriate timeouts: Based on task complexity
- Share common tasks: Use
_shared_tasks.exspattern - Use
_prefix for libraries: Ensures load order
If you were using mix cascade.load_dag, simply:
- Move your DAG JSON files to this directory
- Remove manual
mix cascade.load_dagcalls - DAGs will auto-load on application start