-
Notifications
You must be signed in to change notification settings - Fork 8.7k
feat(api): Add real-time SSE support for webhook flow execution #11028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 27 commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
9afdefb
add webhook ux improvement
Cristhianzl 1f1ea4b
merge fix
Cristhianzl e40b8c3
revert starter_prjects
Cristhianzl 5747f4f
improve code quality
Cristhianzl 0a3c810
test(webhook): add delay to test for background task completion after…
Cristhianzl 4586406
style(endpoints.py): reorder import statements for better organizatio…
Cristhianzl b74e4f4
Merge remote-tracking branch 'origin/main' into cz/webhook-ux-v2
Cristhianzl a20bd3a
fix: regenerate package-lock.json to fix npm ci sync issue
Cristhianzl 5e4cf95
[autofix.ci] apply automated fixes
autofix-ci[bot] 0e373a2
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] b86e3ba
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] d9e0943
merge fix
Cristhianzl 1267c20
restore lock
Cristhianzl 0c572ca
merge main
Cristhianzl 337ce22
rollback changes
Cristhianzl 8ed49f5
[autofix.ci] apply automated fixes
autofix-ci[bot] 8cd6514
Merge branch 'cz/webhook-ux-v2' of github.com:langflow-ai/langflow in…
Cristhianzl 55dec06
[autofix.ci] apply automated fixes
autofix-ci[bot] 8ff6c09
refactor for good pratices
Cristhianzl ee56d5c
improve tests perf
Cristhianzl c5ba929
Merge branch 'cz/webhook-ux-v2' of github.com:langflow-ai/langflow in…
Cristhianzl 3588a9d
[autofix.ci] apply automated fixes
autofix-ci[bot] 2eddde4
fix sse events frontend
Cristhianzl 5f12e4d
[autofix.ci] apply automated fixes
autofix-ci[bot] dfba94f
merge fix
Cristhianzl 6b4fe25
[autofix.ci] apply automated fixes
autofix-ci[bot] ecf6ca9
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] 3336ce9
add output sse event
Cristhianzl a31a9b8
merge fix
Cristhianzl 04349a7
merge fix
Cristhianzl e17f0dc
[autofix.ci] apply automated fixes
autofix-ci[bot] 924bbd9
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] 047fb89
mypy fixes
Cristhianzl a286fa9
remove noisy debug logs
Cristhianzl 085edff
remove sse when we have no webhook on canvas
Cristhianzl 5f202ef
Merge branch 'main' into cz/webhook-ux-v2
carlosrcoelho f0572e6
Merge branch 'main' into cz/webhook-ux-v2
Cristhianzl 75b3d09
add get lock for CI improvement
Cristhianzl 1807633
[autofix.ci] apply automated fixes
autofix-ci[bot] cf4e673
fix tests loading
Cristhianzl 6560323
Merge branch 'cz/webhook-ux-v2' of github.com:langflow-ai/langflow in…
Cristhianzl 4f6a2f2
fix flasky test py
Cristhianzl 75d3f22
ruff style and check
Cristhianzl 4c45305
test: skip flaky test on CI due to resource exhaustion
Cristhianzl File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| """Event Manager for Webhook Real-Time Updates. | ||
|
|
||
| This module provides an in-memory event broadcasting system for webhook builds. | ||
| When a UI is connected via SSE, it receives real-time build events. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import time | ||
| from collections import defaultdict | ||
| from typing import Any | ||
|
|
||
| from loguru import logger | ||
|
|
||
| # Constants | ||
| SSE_QUEUE_MAX_SIZE = 100 | ||
| SSE_EMIT_TIMEOUT_SECONDS = 1.0 | ||
| SECONDS_PER_MINUTE = 60 | ||
|
|
||
|
|
||
| class WebhookEventManager: | ||
| """Manages SSE connections and broadcasts build events for webhooks. | ||
|
|
||
| When a flow is open in the UI, it subscribes to webhook events. | ||
| When a webhook is triggered, events are emitted to all subscribers. | ||
|
|
||
| This provides the same visual experience as clicking "Play" in the UI, | ||
| but triggered by external webhook calls. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| """Initialize the event manager with empty listeners.""" | ||
| self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set) | ||
| self._vertex_start_times: dict[str, dict[str, float]] = defaultdict(dict) | ||
| self._lock = asyncio.Lock() | ||
|
|
||
| def record_build_start(self, flow_id: str, vertex_id: str) -> None: | ||
| """Record when a vertex build starts for duration calculation.""" | ||
| self._vertex_start_times[flow_id][vertex_id] = time.time() | ||
|
|
||
| def get_build_duration(self, flow_id: str, vertex_id: str) -> str | None: | ||
| """Get the formatted build duration for a vertex.""" | ||
| start_time = self._vertex_start_times.get(flow_id, {}).get(vertex_id) | ||
| if start_time is None: | ||
| return None | ||
| elapsed = time.time() - start_time | ||
| # Clean up | ||
| self._vertex_start_times[flow_id].pop(vertex_id, None) | ||
| return self._format_duration(elapsed) | ||
|
|
||
| @staticmethod | ||
| def _format_duration(seconds: float) -> str: | ||
| """Format duration in a human-readable way.""" | ||
| if seconds < 1: | ||
| return f"{int(seconds * 1000)} ms" | ||
| if seconds < SECONDS_PER_MINUTE: | ||
| return f"{seconds:.1f} s" | ||
| minutes = int(seconds // SECONDS_PER_MINUTE) | ||
| secs = seconds % SECONDS_PER_MINUTE | ||
| return f"{minutes}m {secs:.1f}s" | ||
|
|
||
| async def subscribe(self, flow_id: str) -> asyncio.Queue: | ||
| """Subscribe to receive events for a specific flow. | ||
|
|
||
| Args: | ||
| flow_id: The flow ID to subscribe to | ||
|
|
||
| Returns: | ||
| Queue that will receive events for this flow | ||
| """ | ||
| queue: asyncio.Queue = asyncio.Queue(maxsize=SSE_QUEUE_MAX_SIZE) | ||
| async with self._lock: | ||
| self._listeners[flow_id].add(queue) | ||
| listener_count = len(self._listeners[flow_id]) | ||
|
|
||
| logger.info(f"New subscriber for flow {flow_id}. Total listeners: {listener_count}") | ||
| return queue | ||
|
|
||
| async def unsubscribe(self, flow_id: str, queue: asyncio.Queue) -> None: | ||
| """Unsubscribe from flow events. | ||
|
|
||
| Args: | ||
| flow_id: The flow ID to unsubscribe from | ||
| queue: The queue to remove | ||
| """ | ||
| async with self._lock: | ||
| if flow_id in self._listeners: | ||
| self._listeners[flow_id].discard(queue) | ||
| listener_count = len(self._listeners[flow_id]) | ||
|
|
||
| # Clean up empty sets | ||
| if not self._listeners[flow_id]: | ||
| del self._listeners[flow_id] | ||
| logger.info(f"All subscribers disconnected for flow {flow_id}") | ||
| else: | ||
| logger.info(f"Subscriber disconnected from flow {flow_id}. Remaining: {listener_count}") | ||
|
|
||
| async def emit(self, flow_id: str, event_type: str, data: Any) -> None: | ||
| """Emit an event to all subscribers of a flow. | ||
|
|
||
| Args: | ||
| flow_id: The flow ID to emit to | ||
| event_type: Type of event (build_start, end_vertex, etc.) | ||
| data: Event data (will be JSON serialized) | ||
| """ | ||
| async with self._lock: | ||
| listeners = self._listeners.get(flow_id, set()).copy() | ||
|
|
||
| if not listeners: | ||
| # No one listening, skip emission (performance optimization) | ||
| return | ||
|
|
||
| logger.debug(f"Emitting {event_type} to {len(listeners)} listeners for flow {flow_id}") | ||
|
|
||
| # Prepare event | ||
| event = { | ||
| "event": event_type, | ||
| "data": data, | ||
| "timestamp": time.time(), | ||
| } | ||
|
|
||
| # Send to all queues | ||
| dead_queues: set[asyncio.Queue] = set() | ||
|
|
||
| for queue in listeners: | ||
| try: | ||
| await asyncio.wait_for(queue.put(event), timeout=SSE_EMIT_TIMEOUT_SECONDS) | ||
| except asyncio.TimeoutError: | ||
| # Queue is full (slow consumer), skip this event | ||
| logger.warning(f"Queue full for flow {flow_id}, dropping event {event_type}") | ||
| except Exception as e: # noqa: BLE001 | ||
| # Queue is closed or broken, mark for removal | ||
| logger.error(f"Error putting event in queue for flow {flow_id}: {e}") | ||
| dead_queues.add(queue) | ||
|
|
||
| # Clean up dead queues | ||
| if dead_queues: | ||
| async with self._lock: | ||
| if flow_id in self._listeners: | ||
| self._listeners[flow_id] -= dead_queues | ||
| if not self._listeners[flow_id]: | ||
| del self._listeners[flow_id] | ||
|
|
||
| def has_listeners(self, flow_id: str) -> bool: | ||
| """Check if there are any active listeners for a flow.""" | ||
| return flow_id in self._listeners and len(self._listeners[flow_id]) > 0 | ||
|
|
||
|
|
||
| # Module-level instance (can be replaced in tests via dependency injection) | ||
| # TODO: Consider migrating to langflow's service manager pattern for better DI | ||
| _webhook_event_manager: WebhookEventManager | None = None | ||
|
|
||
|
|
||
| def get_webhook_event_manager() -> WebhookEventManager: | ||
| """Get the webhook event manager instance. | ||
|
|
||
| Returns: | ||
| The WebhookEventManager singleton instance. | ||
| """ | ||
| global _webhook_event_manager # noqa: PLW0603 | ||
| if _webhook_event_manager is None: | ||
| _webhook_event_manager = WebhookEventManager() | ||
| return _webhook_event_manager | ||
|
|
||
|
|
||
| # Backwards compatibility alias | ||
| webhook_event_manager = get_webhook_event_manager() |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡️Codeflash found 27% (0.27x) speedup for
_get_vertex_ids_from_flowinsrc/backend/base/langflow/api/v1/endpoints.py⏱️ Runtime :
477 microseconds→374 microseconds(best of117runs)📝 Explanation and details
The optimized code achieves a 27% speedup primarily through two key changes:
1. Eliminated Redundant Dictionary Lookups
The original code calls
flow.data.get("nodes")twice - once in the condition check and again in the list comprehension. The optimized version storesflow.data.get("nodes")in anodesvariable, cutting this expensive dictionary access in half.2. Used Walrus Operator to Cache Node IDs
The original list comprehension calls
node.get("id")twice per node - once to check truthiness and once to add to the result list. The optimized version uses the walrus operator (:=) to assignnode.get("id")tonode_idin a single operation, then reuses that value. This eliminates one dictionary lookup per node.Why This Matters
Dictionary lookups in Python involve hash computation and collision resolution, making them relatively expensive operations. When processing flows with many nodes (the test suite validates up to 999 nodes), these redundant lookups compound significantly. The line profiler shows the list comprehension time dropped from 616,217ns to 541,050ns (12% faster), confirming the per-node optimization impact.
Test Case Performance
The optimization benefits all test cases, but particularly shines with:
The changes preserve all behavior including edge cases (empty data, missing nodes, non-dict entries) while delivering measurable performance gains.
✅ Correctness verification report:
⚙️ Click to see Existing Unit Tests
🌀 Click to see Generated Regression Tests
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
imports
import pytest # used for our unit tests
from langflow.api.v1.endpoints import _get_vertex_ids_from_flow
function to test
The original function referenced Flow from an external module. For the purposes
of unit testing in this self-contained file we define a minimal Flow dataclass
that mirrors the shape the function expects: an object with a
dataattributethat may be a dict with a "nodes" list.
@DataClass
class Flow:
data: Optional[Dict[str, Any]] = None
from langflow.api.v1.endpoints import _get_vertex_ids_from_flow
unit tests
Basic Test Cases
def test_basic_returns_ids_in_order():
"""
Basic functionality: nodes with valid 'id' strings should be returned in order.
"""
# Create a flow with three nodes each having an 'id'
flow = Flow(data={"nodes": [{"id": "a"}, {"id": "b"}, {"id": "c"}]})
codeflash_output = _get_vertex_ids_from_flow(flow); result = codeflash_output
def test_ignores_nodes_without_id_or_empty_id():
"""
Nodes missing 'id' or with falsy ids (empty string, None, 0) should be ignored.
"""
# Mix nodes: valid ids, missing id, empty string, None, numeric 0 (falsy), and another valid id
nodes = [
{"id": "1"}, # included
{"other": "x"}, # missing 'id' -> ignored
{"id": ""}, # empty string -> falsy -> ignored
{"id": None}, # None -> falsy -> ignored
{"id": 0}, # 0 -> falsy -> ignored
{"id": "2"}, # included
]
flow = Flow(data={"nodes": nodes})
codeflash_output = _get_vertex_ids_from_flow(flow); result = codeflash_output
def test_handles_empty_nodes_list_and_missing_nodes_key():
"""
When nodes is an empty list -> return empty list.
When data exists but 'nodes' missing -> return empty list.
When data is falsy (None, empty dict) -> return empty list.
"""
# Empty nodes list
flow_empty_nodes = Flow(data={"nodes": []})
codeflash_output = _get_vertex_ids_from_flow(flow_empty_nodes)
Edge Test Cases
def test_calling_with_none_flow_raises_attribute_error():
"""
Passing None instead of a Flow should raise AttributeError when the function
attempts to access flow.data. This documents current behavior for invalid input.
"""
with pytest.raises(AttributeError):
# Not a Flow instance -> accessing .data should raise
_get_vertex_ids_from_flow(None) # type: ignore[arg-type]
def test_nodes_with_non_dict_entries_raise_attribute_error():
"""
If nodes list contains elements that are not dict-like (no .get), the function
will attempt to call .get and raise AttributeError. Confirm this behavior.
"""
# Put a non-dict as the first node to ensure the list comprehension hits it and raises.
flow = Flow(data={"nodes": ["not-a-dict", {"id": "ok"}]})
with pytest.raises(AttributeError):
_get_vertex_ids_from_flow(flow)
def test_includes_truthy_non_string_ids_and_preserves_duplicates_and_order():
"""
The function does not coerce types. If an id is a truthy non-string (e.g., True),
it will be included as-is. Duplicate ids should be preserved and order kept.
"""
nodes = [
{"id": True}, # truthy boolean -> included
{"id": "x"}, # included
{"id": "x"}, # duplicate included again
{"id": "0"}, # string "0" is truthy -> included
]
flow = Flow(data={"nodes": nodes})
codeflash_output = _get_vertex_ids_from_flow(flow); result = codeflash_output
Large Scale Test Cases
def test_large_scale_near_thousand_nodes_performance_and_correctness():
"""
Large-scale test: create 999 nodes (just under 1000 per constraints) and ensure
all ids are returned in correct order and the function completes quickly.
This verifies scalability within the given limit.
"""
n = 999 # keep under 1000 as requested
# Build nodes with predictable ids "node_0", "node_1", ..., "node_998"
nodes = [{"id": f"node_{i}"} for i in range(n)]
flow = Flow(data={"nodes": nodes})
codeflash_output = _get_vertex_ids_from_flow(flow); result = codeflash_output
def test_mixed_valid_and_invalid_nodes_in_large_list_stops_on_type_error_when_first_bad():
"""
Ensure that when a large list begins with a malformed node (non-dict), the function
raises immediately (and does not silently skip) -- preserving documented behavior.
We only place one non-dict at the start to force an immediate failure.
"""
n = 500 # moderately large but under constraints
# Start with a bad element to trigger AttributeError, followed by many valid nodes
nodes = ["bad-start"] + [{"id": f"node_{i}"} for i in range(n)]
flow = Flow(data={"nodes": nodes})
with pytest.raises(AttributeError):
_get_vertex_ids_from_flow(flow)
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from unittest.mock import MagicMock, Mock
imports
import pytest
from langflow.api.v1.endpoints import _get_vertex_ids_from_flow
from langflow.services.database.models.flow.model import Flow
============================================================================
BASIC TEST CASES
============================================================================
To test or edit this optimization locally
git merge codeflash/optimize-pr11028-2026-01-15T21.10.46