Skip to content

Latest commit

 

History

History
289 lines (230 loc) · 7.84 KB

File metadata and controls

289 lines (230 loc) · 7.84 KB

Cascade Phase 1: Complete! 🎉

What's Been Built

Cascade's Phase 1 implementation is complete! We now have a working single-node workflow orchestrator with:

✅ Core Components Implemented

  1. Database Layer

    • PostgreSQL schemas for DAGs, Jobs, and TaskExecutions
    • Ecto models with proper relationships and validations
    • Migrations for all tables with proper indexes
  2. DSL (Domain Specific Language)

    • Elixir macros for defining DAGs (dag, task, depends_on)
    • Compiler that converts DSL to language-agnostic JSON
    • Validator with cycle detection using libgraph
    • Topological sorting for dependency resolution
  3. Runtime Engine

    • StateManager: ETS-based in-memory state + async Postgres persistence
    • Scheduler: Job lifecycle orchestration, dependency resolution
    • Executor: Task dispatch and worker selection
    • TaskRunner: Worker GenServer pool for task execution
    • LocalExecutor: Executes Elixir module functions
  4. Event System

    • PubSub-based event publishing for job and task updates
    • Standardized event structs (JobEvent, TaskEvent, WorkerEvent)
    • Topic-based subscriptions for real-time updates
  5. Example DAG

    • ETL pipeline with Extract → Transform → Load → Notify
    • Sample task modules demonstrating the task interface

How to Use Cascade

1. Start the Application

# From project root
cd /Users/tim/cascade_project/cascade

# Start IEx with the application
iex -S mix

2. Load the Example DAG

# Load the ETL DAG into the database
{:ok, dag} = Cascade.Examples.DAGLoader.load_etl_dag()

3. Trigger a Job

# Manually trigger the DAG
alias Cascade.Runtime.Scheduler
{:ok, job} = Scheduler.trigger_job(dag.id, "manual", %{environment: "dev"})

# You should see logs showing:
# - Job created
# - Tasks executing in dependency order: extract → transform → load → notify
# - Each task completing successfully

4. Query Job Status

# Get job details
alias Cascade.Workflows
job_with_details = Workflows.get_job_with_details!(job.id)

# Check task executions
task_executions = Workflows.list_task_executions_for_job(job.id)
Enum.each(task_executions, fn te ->
  IO.puts("#{te.task_id}: #{te.status}")
end)

# Get in-memory state (for active jobs)
alias Cascade.Runtime.StateManager
{:ok, job_state} = StateManager.get_job_state(job.id)
IO.inspect(job_state, label: "Job State")

5. View All DAGs

# List all DAGs in the system
dags = Workflows.list_dags()
Enum.each(dags, fn dag ->
  IO.puts("#{dag.name}: #{dag.description}")
end)

Architecture Overview

┌─────────────┐
│   User/API  │
└──────┬──────┘
       │ trigger_job(dag_id)
       ▼
┌─────────────────────┐
│   Scheduler         │  ← Orchestrates job lifecycle
│  - Create Job (PG)  │
│  - Init State (ETS) │
│  - Find ready tasks │
└──────┬──────────────┘
       │ dispatch_task
       ▼
┌─────────────────────┐
│    Executor         │  ← Dispatches tasks to workers
│  - Select worker    │
│  - Assign task      │
└──────┬──────────────┘
       │ PubSub: worker:node1
       ▼
┌─────────────────────┐
│   TaskRunner        │  ← Executes tasks
│  - Run local code   │
│  - Report results   │
└──────┬──────────────┘
       │ task_completed
       ▼
┌─────────────────────┐
│  StateManager       │  ← Tracks state
│  - Update ETS       │
│  - Publish events   │
│  - Persist to PG    │
└─────────────────────┘

Creating Your Own DAG

# 1. Define task modules
defmodule MyApp.Tasks.MyTask do
  require Logger

  def run(context) do
    Logger.info("Running task for job #{context.job_id}")

    # Your task logic here
    result = %{status: "success", data: "..."}

    {:ok, result}
  end
end

# 2. Define the DAG
defmodule MyApp.MyDAG do
  use Cascade.DSL

  dag "my_workflow" do
    description "My custom workflow"

    task :step1 do
      type :local
      module "MyApp.Tasks.MyTask"
      timeout 300
    end

    task :step2 do
      type :local
      module "MyApp.Tasks.AnotherTask"
      depends_on [:step1]
      timeout 300
    end
  end
end

# 3. Load it into the database
definition = MyApp.MyDAG.get_dag_definition()
{:ok, validated} = Cascade.DSL.Validator.validate(definition)
{:ok, dag} = Cascade.Workflows.create_dag(%{
  name: definition["name"],
  description: definition["metadata"]["description"],
  definition: validated,
  compiled_at: DateTime.utc_now()
})

# 4. Trigger it!
{:ok, job} = Cascade.Runtime.Scheduler.trigger_job(dag.id, "manual", %{})

What's Next: Phase 2-5

Phase 2: Distributed Workers (Not Yet Implemented)

  • Multi-node Erlang clustering
  • Worker health monitoring and heartbeats
  • Task reassignment on worker failure
  • Load balancing across worker pool

Phase 3: AWS Integration (Not Yet Implemented)

  • Lambda task execution
  • S3 artifact storage
  • Hybrid local/remote execution

Phase 4: LiveView UI (Not Yet Implemented)

  • Real-time DAG visualization
  • Job monitoring dashboard
  • Manual job triggering UI
  • Worker cluster status

Phase 5: Advanced Features (Not Yet Implemented)

  • Cron-based scheduling
  • Retry logic with exponential backoff
  • Task timeouts
  • Error callbacks
  • Job cancellation
  • Authentication/authorization

Key Files Reference

Core Engine

  • lib/cascade/runtime/state_manager.ex - In-memory job state (ETS)
  • lib/cascade/runtime/scheduler.ex - Job lifecycle orchestration
  • lib/cascade/runtime/executor.ex - Task dispatch
  • lib/cascade/runtime/task_runner.ex - Task execution workers

DSL

  • lib/cascade/dsl.ex - DSL macros
  • lib/cascade/dsl/compiler.ex - DSL → JSON compiler
  • lib/cascade/dsl/validator.ex - DAG validation & topological sort

Data Models

  • lib/cascade/workflows/dag.ex - DAG schema
  • lib/cascade/workflows/job.ex - Job schema
  • lib/cascade/workflows/task_execution.ex - TaskExecution schema
  • lib/cascade/workflows.ex - Context functions (CRUD)

Events

  • lib/cascade/events.ex - PubSub events & topics

Examples

  • lib/cascade/examples/etl_dag.ex - Example ETL DAG
  • lib/cascade/examples/tasks.ex - Example task modules
  • lib/cascade/examples/dag_loader.ex - Helper to load DAGs

Testing the System

# Run the application
iex -S mix

# In IEx:
# 1. Load example DAG
{:ok, dag} = Cascade.Examples.DAGLoader.load_etl_dag()

# 2. Trigger a job
alias Cascade.Runtime.Scheduler
{:ok, job} = Scheduler.trigger_job(dag.id, "manual", %{})

# 3. Watch the logs - you should see:
# - Job created
# - ExtractData task starts and completes
# - TransformData task starts and completes
# - LoadData task starts and completes
# - SendNotification task starts and completes
# - Job marked as complete

# 4. Check the results
alias Cascade.Workflows
job_final = Workflows.get_job_with_details!(job.id)
IO.inspect(job_final.status)  # Should be :success

Notes

  • Current implementation is single-node only
  • All tasks execute on the same node
  • Worker pool size defaults to 2 * CPU cores
  • Set CASCADE_WORKERS env var to override
  • State is stored in ETS (lost on restart) and Postgres (durable)

Success Criteria ✅

Phase 1 is complete! You can now:

✅ Define DAGs in Elixir DSL ✅ Load DAGs to Postgres ✅ Trigger jobs manually via IEx ✅ See tasks execute in dependency order ✅ Query job/task status from database ✅ View in-memory state for active jobs

The foundation is solid and ready for Phase 2: Distributed Workers!