Skip to content

feat: Added Traceloop SDK for collecting traces and metrics from Langflow#9317

Merged
edwinjosechittilappilly merged 22 commits intolangflow-ai:mainfrom
2getsandesh:traceloop-sdk-integration
Aug 22, 2025
Merged

feat: Added Traceloop SDK for collecting traces and metrics from Langflow#9317
edwinjosechittilappilly merged 22 commits intolangflow-ai:mainfrom
2getsandesh:traceloop-sdk-integration

Conversation

@2getsandesh
Copy link
Contributor

@2getsandesh 2getsandesh commented Aug 7, 2025

Description:
This PR enables observability of LangFlow using Instana by exporting traces and metrics via the Traceloop SDK to the Instana backend.

Key Changes:
New traceloop.py file:
-Introduced TraceloopTracer class with add_trace and end_trace methods for managing trace lifecycle.
Integration in service.py:
-Initialized the TraceloopTracer class to start capturing traces during LangFlow execution.
Fallback logic in component.py:
-Added getattr()-based fallback to use instance attribute values instead of default input values when available.
Tests:
-Added mock/patch statements in unit test for TraceloopTracer in test_tracing_service.py.
Dependencies:
-Updated pyproject.toml to include traceloop-sdk as a new dependency.
Screenshot 2025-07-21 at 7 28 44 PM
Screenshot 2025-08-06 at 4 31 33 PM

@coderabbitai

Summary by CodeRabbit

  • New Features

    • Added integration with Traceloop as an additional tracer, capturing workflows, inputs/outputs, errors, and logs when tracing is enabled.
    • Improved trace metadata accuracy by resolving input values from component state where available.
  • Chores

    • Added dependency on traceloop-sdk (>=0.43.1).
  • Tests

    • Updated unit tests to cover the new tracer and adjust callback expectations accordingly.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 7, 2025

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds traceloop tracing integration. Introduces a new TraceloopTracer class, wires it into TracingService startup, adjusts component metadata resolution, updates unit tests to account for the new tracer, and adds traceloop-sdk as a project dependency.

Changes

Cohort / File(s) Summary
Dependency update
pyproject.toml
Adds dependency traceloop-sdk>=0.43.1 under [project].dependencies.
Component metadata resolution
src/backend/base/langflow/custom/custom_component/component.py
get_trace_as_metadata now resolves each input via getattr(self, input_.name, input_.value) when trace_as_metadata is set.
Tracing integration
src/backend/base/langflow/services/tracing/* (.../service.py, .../traceloop.py)
Adds TraceloopTracer implementation and integrates it into TracingService via _get_traceloop_tracer and _initialize_traceloop_tracer; registers tracer in trace_context.tracers and sets relevant metadata.
Tests update
src/backend/tests/unit/services/tracing/test_tracing_service.py
Patches _get_traceloop_tracer in tests, asserts presence of "traceloop" tracer, and adjusts callback count expectations dynamically.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant TracingService
  participant TraceContext
  participant TraceloopTracer

  Client->>TracingService: start_tracers(trace_context)
  TracingService->>TracingService: _initialize_existing_tracers(...)
  TracingService->>TracingService: _initialize_traceloop_tracer(trace_context)
  activate TracingService
  TracingService->>TraceloopTracer: __init__(trace_name, trace_type, project_name, trace_id, user_id, session_id)
  TracingService->>TraceContext: tracers["traceloop"] = TraceloopTracer
  deactivate TracingService

  Client->>TracingService: end_tracers(...)
  TracingService->>TraceloopTracer: end(inputs, outputs, error?, metadata?)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested labels

size:XXL

Suggested reviewers

  • erichare
  • edwinjosechittilappilly
✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@2getsandesh 2getsandesh changed the title Added Traceloop SDK for colecting traces and metrics from Langflow feat: Added Traceloop SDK for colecting traces and metrics from Langflow Aug 7, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 7, 2025
@edwinjosechittilappilly
Copy link
Collaborator

@2getsandesh Lets ensure the old PR #8209 is closed if there are overlap. Also Please ensure that all the tests are passing, cause this is crucial for merge.

@2getsandesh 2getsandesh force-pushed the traceloop-sdk-integration branch from a584afa to c109c5c Compare August 12, 2025 04:51
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 12, 2025
@2getsandesh 2getsandesh marked this pull request as ready for review August 12, 2025 05:42
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
src/backend/base/langflow/services/tracing/traceloop.py (1)

213-213: Remove unnecessary comment.

The "Made with Bob" comment should be removed as it doesn't provide any value.

-
-# Made with Bob
src/backend/base/langflow/services/tracing/service.py (1)

210-222: Consider adding error handling for tracer initialization.

The Traceloop tracer initialization should handle potential exceptions similar to how other operations handle errors.

 def _initialize_traceloop_tracer(self, trace_context: TraceContext) -> None:
     if self.deactivated:
         return
-    traceloop_tracer = _get_traceloop_tracer()
-    trace_context.tracers["traceloop"] = traceloop_tracer(
-        trace_name=trace_context.run_name,
-        trace_type="chain",
-        project_name=trace_context.project_name,
-        trace_id=trace_context.run_id,
-        user_id=trace_context.user_id,
-        session_id=trace_context.session_id,
-    )
+    try:
+        traceloop_tracer = _get_traceloop_tracer()
+        trace_context.tracers["traceloop"] = traceloop_tracer(
+            trace_name=trace_context.run_name,
+            trace_type="chain",
+            project_name=trace_context.project_name,
+            trace_id=trace_context.run_id,
+            user_id=trace_context.user_id,
+            session_id=trace_context.session_id,
+        )
+    except Exception as e:
+        logger.debug(f"Failed to initialize Traceloop tracer: {e}")
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 515786c and 4a216eb.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • pyproject.toml (1 hunks)
  • src/backend/base/langflow/custom/custom_component/component.py (1 hunks)
  • src/backend/base/langflow/services/tracing/service.py (3 hunks)
  • src/backend/base/langflow/services/tracing/traceloop.py (1 hunks)
  • src/backend/tests/unit/services/tracing/test_tracing_service.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
{src/backend/**/*.py,tests/**/*.py,Makefile}

📄 CodeRabbit Inference Engine (.cursor/rules/backend_development.mdc)

{src/backend/**/*.py,tests/**/*.py,Makefile}: Run make format_backend to format Python code before linting or committing changes
Run make lint to perform linting checks on backend Python code

Files:

  • src/backend/base/langflow/custom/custom_component/component.py
  • src/backend/tests/unit/services/tracing/test_tracing_service.py
  • src/backend/base/langflow/services/tracing/traceloop.py
  • src/backend/base/langflow/services/tracing/service.py
src/backend/**/*component*.py

📄 CodeRabbit Inference Engine (.cursor/rules/icons.mdc)

In your Python component class, set the icon attribute to a string matching the frontend icon mapping exactly (case-sensitive).

Files:

  • src/backend/base/langflow/custom/custom_component/component.py
src/backend/tests/unit/**/*.py

📄 CodeRabbit Inference Engine (.cursor/rules/backend_development.mdc)

Test component integration within flows using create_flow, build_flow, and get_build_events utilities

Files:

  • src/backend/tests/unit/services/tracing/test_tracing_service.py
src/backend/tests/**/*.py

📄 CodeRabbit Inference Engine (.cursor/rules/testing.mdc)

src/backend/tests/**/*.py: Unit tests for backend code must be located in the 'src/backend/tests/' directory, with component tests organized by component subdirectory under 'src/backend/tests/unit/components/'.
Test files should use the same filename as the component under test, with an appropriate test prefix or suffix (e.g., 'my_component.py' → 'test_my_component.py').
Use the 'client' fixture (an async httpx.AsyncClient) for API tests in backend Python tests, as defined in 'src/backend/tests/conftest.py'.
When writing component tests, inherit from the appropriate base class in 'src/backend/tests/base.py' (ComponentTestBase, ComponentTestBaseWithClient, or ComponentTestBaseWithoutClient) and provide the required fixtures: 'component_class', 'default_kwargs', and 'file_names_mapping'.
Each test in backend Python test files should have a clear docstring explaining its purpose, and complex setups or mocks should be well-commented.
Test both sync and async code paths in backend Python tests, using '@pytest.mark.asyncio' for async tests.
Mock external dependencies appropriately in backend Python tests to isolate unit tests from external services.
Test error handling and edge cases in backend Python tests, including using 'pytest.raises' and asserting error messages.
Validate input/output behavior and test component initialization and configuration in backend Python tests.
Use the 'no_blockbuster' pytest marker to skip the blockbuster plugin in tests when necessary.
Be aware of ContextVar propagation in async tests; test both direct event loop execution and 'asyncio.to_thread' scenarios to ensure proper context isolation.
Test error handling by mocking internal functions using monkeypatch in backend Python tests.
Test resource cleanup in backend Python tests by using fixtures that ensure proper initialization and cleanup of resources.
Test timeout and performance constraints in backend Python tests using 'asyncio.wait_for' and timing assertions.
Test Langflow's Messag...

Files:

  • src/backend/tests/unit/services/tracing/test_tracing_service.py
🧠 Learnings (1)
📚 Learning: 2025-07-21T14:16:14.125Z
Learnt from: CR
PR: langflow-ai/langflow#0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-07-21T14:16:14.125Z
Learning: Applies to src/backend/tests/**/*.py : Mock external dependencies appropriately in backend Python tests to isolate unit tests from external services.

Applied to files:

  • src/backend/tests/unit/services/tracing/test_tracing_service.py
🔇 Additional comments (7)
pyproject.toml (1)

129-129: LGTM! Dependency addition looks good.

The traceloop-sdk dependency version constraint is properly specified and follows the existing pattern.

src/backend/base/langflow/custom/custom_component/component.py (1)

1021-1026: Review metadata fallback logic in CustomComponent
The get_trace_as_metadata method (component.py lines 1021–1026) now uses

getattr(self, input_.name, input_.value)

so any instance attribute named like an input will override the input’s original value. Although an automated search didn’t reveal direct name collisions, please manually verify that:

  • No input names in your custom components shadow attributes set in CustomComponent.__init__ or subclasses
  • Sensitive fields (e.g. api_key, password, etc.) cannot be exposed via unintended attribute overrides

If there’s a risk, consider isolating trace metadata in a dedicated private mapping (for example, self._trace_metadata) or prefixing attribute names to avoid shadowing.

src/backend/tests/unit/services/tracing/test_tracing_service.py (3)

142-145: LGTM! Test patch properly added.

The patch for _get_traceloop_tracer follows the established pattern for mocking tracers.


176-176: LGTM! Test assertion for traceloop tracer.

The test correctly verifies that the traceloop tracer is initialized and registered.


306-307: Good dynamic test adjustment.

The test now dynamically calculates the expected number of callbacks based on actual tracers, making it more maintainable.

src/backend/base/langflow/services/tracing/service.py (2)

56-60: LGTM! Factory function follows established pattern.

The factory function for Traceloop tracer follows the same pattern as other tracers.


249-249: LGTM! Tracer initialization properly integrated.

The Traceloop tracer is correctly initialized alongside other tracers in the expected sequence.

Comment on lines +203 to +235
def get_langchain_callback(self) -> BaseCallbackHandler | None:
"""Get the LangChain callback handler.

Returns:
The LangChain callback handler.
"""
if not self.ready:
return None

Copy link
Contributor

@coderabbitai coderabbitai bot Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incomplete LangChain callback implementation.

The get_langchain_callback method returns None when ready, which seems incorrect. This appears to be a placeholder implementation that needs to be completed.

The method should return a proper LangChain callback handler when the tracer is ready. Currently, it returns None in all cases which will prevent LangChain integration from working properly.

Would you like me to help implement a proper LangChain callback handler for Traceloop integration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

I'll help you implement a proper LangChain callback handler for the Traceloop integration. Let me first research the Traceloop SDK's LangChain callback capabilities and examine how other tracers in the codebase handle this.


🏁 Script executed:

#!/bin/bash
# Check how other tracers implement get_langchain_callback method
rg -A 10 "def get_langchain_callback" --type py

# Look for any existing LangChain callback implementations in the codebase
rg -A 5 "BaseCallbackHandler" --type py

# Check what's imported from traceloop.sdk
rg "from traceloop" --type py -A 3 -B 1

Length of output: 16183


🌐 Web query:

Traceloop SDK LangChain callback handler implementation get_langchain_callback

💡 Result:

Below is a practical, ready-to-run Python implementation you can drop into your codebase. It implements a LangChain BaseCallbackHandler that creates OpenTelemetry spans (so they are exported via the Traceloop SDK once you call Traceloop.init()). The TraceloopTracer.get_langchain_callback() method simply returns an instance of that handler.

Notes before the code

  • This uses the OpenTelemetry API (tracer obtained via opentelemetry.trace.get_tracer). Traceloop.init() configures the global tracer provider / exporter, so the spans created here will be exported by Traceloop. (traceloop.com, docs.truefoundry.com)
  • LangChain callback handlers implement BaseCallbackHandler and receive events such as on_chain_start/on_chain_end/on_llm_start/on_llm_end/etc. The example implements the most-common events and shows the pattern to extend it. (python.langchain.com)
  • The implementation supports both the older langchain import locations and the newer langchain_core where applicable (it tries imports in a compatibility-friendly way).

Code (python)

# traceloop_langchain_callback.py
from __future__ import annotations
from typing import Any, Dict, Optional, Tuple

# LangChain callback handler import compatibility
try:
    # older / common import
    from langchain.callbacks.base import BaseCallbackHandler
except Exception:
    # newer split package (langchain_core)
    from langchain_core.callbacks import BaseCallbackHandler  # type: ignore

from opentelemetry import trace
from opentelemetry.trace import Span, SpanKind, StatusCode, use_span

# Optional: helpful for setting span attributes
import json

class TraceloopLangChainHandler(BaseCallbackHandler):
    """
    LangChain callback handler that creates OpenTelemetry spans for LangChain events.
    Spans are created with the global tracer (so ensure Traceloop.init(...) has been called).
    """

    def __init__(self, tracer_name: str = "traceloop.langchain", trace_content: bool = True):
        """
        :param tracer_name: name used to get an OpenTelemetry tracer
        :param trace_content: if True, attach prompt / response text as attributes (disable for PII/privacy)
        """
        self.tracer = trace.get_tracer(tracer_name)
        # map run_id -> (Span, use_span_context_manager)
        self._spans: Dict[str, Tuple[Span, Any]] = {}
        self.trace_content = trace_content

    # ---- internal helpers ----
    def _start_span(self, run_id: str, name: str, kind: SpanKind = SpanKind.INTERNAL, attributes: Optional[Dict[str, Any]] = None):
        attributes = attributes or {}
        span = self.tracer.start_span(name, kind=kind, attributes=attributes)
        # make it current so events (tokens, etc.) are attached to it
        scope = use_span(span, end_on_exit=False)
        scope.__enter__()  # push span into current context
        self._spans[run_id] = (span, scope)

    def _end_span(self, run_id: str, error: Optional[Exception] = None, output: Optional[Any] = None):
        pair = self._spans.pop(run_id, None)
        if not pair:
            return
        span, scope = pair
        if error:
            try:
                span.record_exception(error)
            except Exception:
                pass
            span.set_status(StatusCode.ERROR)
            span.set_attribute("error", True)
            span.set_attribute("error.message", str(error))
        if output is not None and self.trace_content:
            # attach stringified output (be careful with large/PII content)
            try:
                span.set_attribute("output", json.dumps(output, default=str))
            except Exception:
                span.set_attribute("output", str(output))
        # pop from context
        try:
            scope.__exit__(None, None, None)
        except Exception:
            pass
        # finish span
        try:
            span.end()
        except Exception:
            pass

    # ---- LangChain callback events ----
    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: str, **kwargs):
        # serialized often includes "name" or "id" depending on the chain
        name = serialized.get("name") or serialized.get("id") or "chain"
        attributes = {"langchain.type": "chain", "langchain.name": name}
        if inputs and self.trace_content:
            try:
                attributes["langchain.inputs"] = json.dumps(inputs, default=str)
            except Exception:
                attributes["langchain.inputs"] = str(inputs)
        self._start_span(run_id, f"chain:{name}", kind=SpanKind.INTERNAL, attributes=attributes)

    def on_chain_end(self, outputs: Dict[str, Any], *, run_id: str, **kwargs):
        self._end_span(run_id, output=outputs)

    def on_chain_error(self, error: Exception, *, run_id: str, **kwargs):
        self._end_span(run_id, error=error)

    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, *, run_id: str, **kwargs):
        tool_name = serialized.get("name") or "tool"
        attributes = {"langchain.type": "tool", "langchain.tool": tool_name}
        if input_str and self.trace_content:
            attributes["langchain.tool.input"] = input_str
        self._start_span(run_id, f"tool:{tool_name}", kind=SpanKind.CLIENT, attributes=attributes)

    def on_tool_end(self, output: Any, *, run_id: str, **kwargs):
        self._end_span(run_id, output=output)

    def on_tool_error(self, error: Exception, *, run_id: str, **kwargs):
        self._end_span(run_id, error=error)

    def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], *, run_id: str, **kwargs):
        # serialized may include "model" or other data depending on LLM wrapper
        model = serialized.get("model") or serialized.get("name") or "llm"
        attributes = {"langchain.type": "llm", "langchain.model": model}
        if self.trace_content and prompts:
            try:
                attributes["langchain.prompts"] = json.dumps(prompts, default=str)
            except Exception:
                attributes["langchain.prompts"] = str(prompts)
        self._start_span(run_id, f"llm:{model}", kind=SpanKind.CLIENT, attributes=attributes)

    def on_llm_new_token(self, token: str, *, run_id: Optional[str] = None, **kwargs):
        # tokens get attached as events to the current span (if any)
        current = trace.get_current_span()
        if current is not None and current.is_recording():
            try:
                current.add_event("llm.token", {"token": token})
            except Exception:
                pass

    def on_llm_end(self, response: Any, *, run_id: str, **kwargs):
        # response will often be a LLMResult / ChatGeneration; keep concise
        output_summary = None
        if self.trace_content:
            try:
                output_summary = json.dumps(response, default=str)
            except Exception:
                output_summary = str(response)
        self._end_span(run_id, output=output_summary)

    def on_llm_error(self, error: Exception, *, run_id: str, **kwargs):
        self._end_span(run_id, error=error)

    def on_retriever_start(self, serialized: Dict[str, Any], query: str, *, run_id: str, **kwargs):
        retriever_name = serialized.get("name") or "retriever"
        attributes = {"langchain.type": "retriever", "langchain.retriever": retriever_name}
        if self.trace_content and query:
            attributes["langchain.query"] = query
        self._start_span(run_id, f"retriever:{retriever_name}", kind=SpanKind.CLIENT, attributes=attributes)

    def on_retriever_end(self, documents, *, run_id: str, **kwargs):
        # documents may be objects; keep short
        docs_count = getattr(documents, "__len__", lambda: None)()
        try:
            self._end_span(run_id, output={"documents_count": docs_count})
        except Exception:
            self._end_span(run_id, output=str(documents))

    def on_text(self, text: str, *, run_id: Optional[str] = None, parent_run_id: Optional[str] = None, **kwargs):
        # create a small span for arbitrary text calls, if supplied run_id
        if not run_id:
            return
        attributes = {"langchain.type": "text"}
        if self.trace_content:
            attributes["langchain.text"] = text
        self._start_span(run_id, "text", kind=SpanKind.INTERNAL, attributes=attributes)
        self._end_span(run_id, output={"text": text})

    # Implement other callbacks as needed following the same pattern...

Helper TraceloopTracer.get_langchain_callback
If you have a top-level Traceloop tracer-class (or wrapper), its get_langchain_callback should simply return an instance:

def get_langchain_callback(self) -> Optional[BaseCallbackHandler]:
    # ensure Traceloop.init(...) has been called before this point
    return TraceloopLangChainHandler()

Usage example

  • Initialize Traceloop early in your app:
    • Traceloop.init(...) (or Traceloop.init(disable_batch=True) for local debugging). (traceloop.com)
  • Create handler and pass to LangChain objects either at construction or per-run:
    • chain = LLMChain(..., callbacks=[TraceloopLangChainHandler()])
    • or chain.invoke({"input": "hi"}, {"callbacks": [handler]})

Why this approach

  • It uses the OpenTelemetry API so Traceloop (which configures the provider/exporter) will automatically pick up and export spans created here. That keeps the callback implementation decoupled from the Traceloop internals but fully compatible with the Traceloop SDK. (traceloop.com, opentelemetry.io)

Caveats / next steps

  • Privacy: attach prompt/completion text only if acceptable for your use-case; set trace_content=False to avoid sending raw content.
  • Async support: LangChain can call callbacks in async contexts (AsyncCallbackHandler). If you need fully async handlers, implement AsyncCallbackHandler equivalents (same pattern but async methods).
  • Enrich spans: you may want to set specific attributes (model tokens, costs, trace IDs) if the underlying LLM wrapper exposes them — add those into the on_llm_end/on_llm_start code paths.
  • If you want Traceloop-specific helpers (e.g., call Traceloop SDK helpers rather than raw OpenTelemetry), you can adapt the internals to call Traceloop-specific APIs once available.

References

If you want, I can:

  • Provide an async version (AsyncCallbackHandler) that mirrors the same behavior.
  • Add automatic extraction of model usage / token/cost attributes for specific LLM wrappers (OpenAI, Anthropic, etc.) — tell me which LLM classes you use and I'll add them.

Citations:


@2getsandesh Perfect! I can help you implement a proper LangChain callback handler for Traceloop integration. Based on my research and examining how other tracers in the codebase handle this, I'll provide you with a complete implementation.

Looking at the existing implementations, I can see that:

  • Some tracers like langsmith.py and arize_phoenix.py return None (no LangChain integration)
  • Others like opik.py and langfuse.py return proper callback handlers
  • langwatch.py delegates to its trace object's callback method

Here's the implementation for your TraceloopTracer.get_langchain_callback() method:

from __future__ import annotations

import json
import os
from typing import TYPE_CHECKING, Any, Dict, Optional

from loguru import logger
from opentelemetry import trace
from opentelemetry.trace import SpanKind, StatusCode, use_span
from traceloop.sdk import Traceloop
from traceloop.sdk.instruments import Instruments
from typing_extensions import override

from langflow.services.tracing.base import BaseTracer

if TYPE_CHECKING:
    from collections.abc import Sequence
    from uuid import UUID

    from langchain.callbacks.base import BaseCallbackHandler

    from langflow.graph.vertex.base import Vertex
    from langflow.services.tracing.schema import Log


class TraceloopLangChainHandler:
    """
    LangChain callback handler that creates OpenTelemetry spans for LangChain events.
    Spans are created with the global tracer (configured by Traceloop.init).
    """

    def __init__(self, tracer_name: str = "langflow.traceloop", trace_content: bool = True):
        """
        Initialize the LangChain callback handler.

        Args:
            tracer_name: Name used to get an OpenTelemetry tracer.
            trace_content: If True, attach prompt/response text as attributes.
        """
        self.tracer = trace.get_tracer(tracer_name)
        # Map run_id -> (Span, context_manager)
        self._spans: Dict[str, tuple[Any, Any]] = {}
        self.trace_content = trace_content

    def _start_span(
        self,
        run_id: str,
        name: str,
        kind: SpanKind = SpanKind.INTERNAL,
        attributes: Optional[Dict[str, Any]] = None,
    ):
        """Start a new span for the given run_id."""
        attributes = attributes or {}
        span = self.tracer.start_span(name, kind=kind, attributes=attributes)
        # Make it current so events are attached to it
        scope = use_span(span, end_on_exit=False)
        scope.__enter__()
        self._spans[run_id] = (span, scope)

    def _end_span(self, run_id: str, error: Optional[Exception] = None, output: Optional[Any] = None):
        """End the span for the given run_id."""
        pair = self._spans.pop(run_id, None)
        if not pair:
            return
        span, scope = pair
        
        if error:
            try:
                span.record_exception(error)
                span.set_status(StatusCode.ERROR)
                span.set_attribute("error", True)
                span.set_attribute("error.message", str(error))
            except Exception as e:
                logger.warning(f"Failed to record exception in span: {e}")
        
        if output is not None and self.trace_content:
            try:
                span.set_attribute("output", json.dumps(output, default=str))
            except Exception:
                span.set_attribute("output", str(output))
        
        # Pop from context
        try:
            scope.__exit__(None, None, None)
        except Exception as e:
            logger.warning(f"Failed to exit span context: {e}")
        
        # End span
        try:
            span.end()
        except Exception as e:
            logger.warning(f"Failed to end span: {e}")

    # LangChain callback events
    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: str, **kwargs):
        """Handle chain start event."""
        name = serialized.get("name") or serialized.get("id") or "chain"
        attributes = {"langchain.type": "chain", "langchain.name": name}
        if inputs and self.trace_content:
            try:
                attributes["langchain.inputs"] = json.dumps(inputs, default=str)
            except Exception:
                attributes["langchain.inputs"] = str(inputs)
        self._start_span(run_id, f"chain:{name}", kind=SpanKind.INTERNAL, attributes=attributes)

    def on_chain_end(self, outputs: Dict[str, Any], *, run_id: str, **kwargs):
        """Handle chain end event."""
        self._end_span(run_id, output=outputs)

    def on_chain_error(self, error: Exception, *, run_id: str, **kwargs):
        """Handle chain error event."""
        self._end_span(run_id, error=error)

    def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], *, run_id: str, **kwargs):
        """Handle LLM start event."""
        model = serialized.get("model") or serialized.get("name") or "llm"
        attributes = {"langchain.type": "llm", "langchain.model": model}
        if self.trace_content and prompts:
            try:
                attributes["langchain.prompts"] = json.dumps(prompts, default=str)
            except Exception:
                attributes["langchain.prompts"] = str(prompts)
        self._start_span(run_id, f"llm:{model}", kind=SpanKind.CLIENT, attributes=attributes)

    def on_llm_new_token(self, token: str, *, run_id: Optional[str] = None, **kwargs):
        """Handle new LLM token event."""
        current = trace.get_current_span()
        if current is not None and current.is_recording():
            try:
                current.add_event("llm.token", {"token": token})
            except Exception as e:
                logger.debug(f"Failed to add token event: {e}")

    def on_llm_end(self, response: Any, *, run_id: str, **kwargs):
        """Handle LLM end event."""
        output_summary = None
        if self.trace_content:
            try:
                output_summary = json.dumps(response, default=str)
            except Exception:
                output_summary = str(response)
        self._end_span(run_id, output=output_summary)

    def on_llm_error(self, error: Exception, *, run_id: str, **kwargs):
        """Handle LLM error event."""
        self._end_span(run_id, error=error)

    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, *, run_id: str, **kwargs):
        """Handle tool start event."""
        tool_name = serialized.get("name") or "tool"
        attributes = {"langchain.type": "tool", "langchain.tool": tool_name}
        if input_str and self.trace_content:
            attributes["langchain.tool.input"] = input_str
        self._start_span(run_id, f"tool:{tool_name}", kind=SpanKind.CLIENT, attributes=attributes)

    def on_tool_end(self, output: Any, *, run_id: str, **kwargs):
        """Handle tool end event."""
        self._end_span(run_id, output=output)

    def on_tool_error(self, error: Exception, *, run_id: str, **kwargs):
        """Handle tool error event."""
        self._end_span(run_id, error=error)

    def on_retriever_start(self, serialized: Dict[str, Any], query: str, *, run_id: str, **kwargs):
        """Handle retriever start event."""
        retriever_name = serialized.get("name") or "retriever"
        attributes = {"langchain.type": "retriever", "langchain.retriever": retriever_name}
        if self.trace_content and query:
            attributes["langchain.query"] = query
        self._start_span(run_id, f"retriever:{retriever_name}", kind=SpanKind.CLIENT, attributes=attributes)

    def on_retriever_end(self, documents, *, run_id: str, **kwargs):
        """Handle retriever end event."""
        docs_count = getattr(documents, "__len__", lambda: None)()
        try:
            self._end_span(run_id, output={"documents_count": docs_count})
        except Exception:
            self._end_span(run_id, output=str(documents))

    def on_retriever_error(self, error: Exception, *, run_id: str, **kwargs):
        """Handle retriever error event."""
        self._end_span(run_id, error=error)

    def on_text(self, text: str, *, run_id: Optional[str] = None, **kwargs):
        """Handle text event."""
        if not run_id:
            return
        attributes = {"langchain.type": "text"}
        if self.trace_content:
            attributes["langchain.text"] = text
        self._start_span(run_id, "text", kind=SpanKind.INTERNAL, attributes=attributes)
        self._end_span(run_id, output={"text": text})


class TraceloopTracer(BaseTracer):
    """Traceloop tracer for Langflow."""

    def __init__(
        self,
        trace_name: str,
        trace_type: str,
        project_name: str,
        trace_id: UUID,
        user_id: str | None = None,
        session_id: str | None = None,
    ):
        """Initialize the Traceloop tracer.

        Args:
            trace_name: The name of the trace.
            trace_type: The type of the trace.
            project_name: The name of the project.
            trace_id: The ID of the trace.
            user_id: The ID of the user.
            session_id: The ID of the session.
        """
        self.trace_id = trace_id
        self.trace_name = trace_name
        self.trace_type = trace_type
        self.project_name = project_name
        self.user_id = user_id
        self.session_id = session_id

        api_key = os.getenv("TRACELOOP_API_KEY")
        if not api_key:
            logger.warning("TRACELOOP_API_KEY not set. Traceloop tracing will not be enabled.")
            self._ready = False
            return
        try:
            Traceloop.init(
                instruments={Instruments.LANGCHAIN},
                app_name=project_name,
                disable_batch=True,
                api_key=api_key,
                api_endpoint=os.getenv("TRACELOOP_BASE_URL", "https://api.traceloop.com"),
            )
            self._ready = True
            self._workflow = None
            self._tracer = trace.get_tracer("langflow")
            logger.info("Traceloop tracer initialized successfully")
        except (ValueError, RuntimeError, OSError) as e:
            logger.error(f"Failed to initialize Traceloop tracer: {e}")
            self._ready = False

    @property
    def ready(self) -> bool:
        """Check if the tracer is ready."""
        return self._ready

    def _start_workflow(self, inputs: dict[str, Any], metadata: dict[str, Any] | None = None):
        return {"trace_id": str(self.trace_id), "inputs": inputs, "metadata": metadata or {}}

    @override
    def add_trace(
        self,
        trace_id: str,
        trace_name: str,
        trace_type: str,
        inputs: dict[str, Any],
        metadata: dict[str, Any] | None = None,
        vertex: Vertex | None = None,
    ) -> None:
        """Add a trace to the tracer.

        Args:
            trace_id: The ID of the trace.
            trace_name: The name of the trace.
            trace_type: The type of the trace.
            inputs: The inputs to the trace.
            metadata: The metadata for the trace.
            vertex: The vertex associated with the trace.
        """
        if not self.ready:
            return

        # Start the workflow if it's not already started
        if self._workflow is None:
            self._workflow = self._start_workflow(
                inputs,
                {
                    "project_name": self.project_name,
                    "trace_type": self.trace_type,
                    "user_id": self.user_id,
                    "session_id": self.session_id,
                    **(metadata or {}),
                },
            )

        # Extract model_name and provider from metadata
        model_name = (metadata or {}).get("model_name")
        agent_llm = (metadata or {}).get("agent_llm")
        if not model_name:
            logger.warning(f"model_name not found in metadata for trace {trace_name}")
        if not agent_llm:
            logger.warning(f"agent_llm not found in metadata for trace {trace_name}")

        # Optionally, set as attributes on the workflow dict for later use
        if self._workflow is not None:
            self._workflow["model_name"] = model_name
            self._workflow["agent_llm"] = agent_llm

    @override
    def end_trace(
        self,
        trace_id: str,
        trace_name: str,
        outputs: dict[str, Any] | None = None,
        error: Exception | None = None,
        logs: Sequence[Log | dict] = (),
    ) -> None:
        """End a trace.

        Args:
            trace_id: The ID of the trace.
            trace_name: The name of the trace.
            outputs: The outputs of the trace.
            error: Any error that occurred.
            logs: The logs for the trace.
        """
        if not self.ready:
            return

        # Add span for component completion
        span_name = f"component.{trace_name}"
        with self._tracer.start_as_current_span(span_name) as span:
            if outputs:
                span.set_attributes({"outputs": str(outputs)})
            if error:
                span.record_exception(error)
            if logs:
                span.set_attributes({"logs": str(logs)})

    @override
    def end(
        self,
        inputs: dict[str, Any],
        outputs: dict[str, Any],
        error: Exception | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """End the trace.

        Args:
            inputs: The inputs to the trace.
            outputs: The outputs of the trace.
            error: Any error that occurred.
            metadata: The metadata for the trace.
        """
        if not self.ready:
            return

        # Add final span for workflow completion
        model_name = (metadata or {}).get("model_name") or ""
        agent_llm = (metadata or {}).get("agent_llm") or ""
        if not model_name:
            logger.warning(f"model_name not found in metadata for trace {self.trace_name}")
        if not agent_llm:
            logger.warning(f"provider not found in metadata for trace {self.trace_name}")
        with self._tracer.start_as_current_span("workflow.end") as span:
            span.set_attributes(
                {
                    "workflow_name": self.trace_name,
                    "workflow_id": str(self.trace_id),
                    "outputs": str(outputs),
                    "model_name": model_name,
                    "agent_llm": agent_llm,
                    **(metadata or {}),
                }
            )
            if error:
                span.record_exception(error)

    @override
    def get_langchain_callback(self) -> BaseCallbackHandler | None:
        """Get the LangChain callback handler.

        Returns:
            The LangChain callback handler if ready, None otherwise.
        """
        if not self.ready:
            return None

        return TraceloopLangChainHandler()

The key changes made:

  1. Fixed the incomplete implementation: The method now returns a proper TraceloopLangChainHandler instance when the tracer is ready, instead of returning None in all cases.

  2. Created a comprehensive LangChain callback handler: The TraceloopLangChainHandler class implements the most important LangChain callback events:

    • on_chain_start/end/error - for chain execution
    • on_llm_start/end/error/new_token - for LLM calls
    • on_tool_start/end/error - for tool usage
    • on_retriever_start/end/error - for retriever operations
    • on_text - for text processing
  3. Proper OpenTelemetry integration: The handler uses OpenTelemetry spans that will be automatically exported by Traceloop since Traceloop.init() configures the global tracer provider.

  4. Error handling and logging: Added proper exception handling and logging throughout the implementation.

  5. Configurable content tracing: The handler includes a trace_content parameter to control whether sensitive data like prompts and responses are included in traces (useful for privacy/PII concerns).

This implementation follows the same patterns used by other tracers in the codebase (like opik.py and langfuse.py) and provides comprehensive LangChain integration for Traceloop tracing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining TraceloopLangChainHandler inside Langflow repo seems like a bad idea. Can we stick to the current approach itself just like arize_phoenix did?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

@2getsandesh You're absolutely right! Let me check how arize_phoenix handles this and see if Traceloop SDK provides its own LangChain callback handler.


🏁 Script executed:

#!/bin/bash
# Check how arize_phoenix implements get_langchain_callback
rg -A 15 "def get_langchain_callback" src/backend/base/langflow/services/tracing/arize_phoenix.py

# Also check what imports arize_phoenix uses for LangChain integration
rg -B 5 -A 5 "langchain" src/backend/base/langflow/services/tracing/arize_phoenix.py

Length of output: 2958


🌐 Web query:

Traceloop SDK LangChain callback handler built-in integration get_langchain_callback

💡 Result:

The Traceloop SDK provides a built-in integration for LangChain that automatically instruments chains and models—so you generally don’t need to attach a custom callback handler manually. Initializing Traceloop is enough to get traces for prompts and completions as separate spans across your LangChain runs.[1][7][10]

Key points and how to use it:

  • Install and initialize Traceloop in your app, before creating or invoking LangChain runnables or chains:
    • from traceloop.sdk import Traceloop; Traceloop.init(...).[1][7]
  • With initialization, Traceloop auto-instruments LangChain (including LCEL and classic APIs) and records spans without requiring a callback handler or runtime config wiring.[1][7][10]
  • You can still combine Traceloop with OpenTelemetry targets (e.g., LangSmith OTEL endpoint) if you want to send traces to LangSmith; configure TRACELOOP_BASE_URL and headers, then Traceloop.init(). This is optional and for routing traces rather than changing LangChain wiring.[4]

About get_langchain_callback:

  • Traceloop’s LangChain integration does not expose a get_langchain_callback() helper in its public docs; auto-instrumentation removes the need for manually attaching a LangChain callback.[1][10]
  • The get_langchain_callback() pattern appears in other observability SDKs (e.g., LangWatch’s current_trace.get_langchain_callback()) to attach a handler via config=RunnableConfig(callbacks=[handler]). That example is for LangWatch, not Traceloop.[3]
  • If you specifically need a per-trace callback object to pass into config callbacks, consider tools that explicitly provide get_langchain_callback()/get_langchain_handler() (e.g., LangWatch, Langfuse) or write a custom LangChain callback as per LangChain’s callback docs.[3][6][8]

When to add a custom handler anyway:

  • If you build custom runnables/tools that don’t automatically propagate callbacks (especially async in Python ≤3.10), you may need to propagate callbacks manually or implement a custom handler, per LangChain’s callback guidance.[2][8]

Practical guidance:

  • For Traceloop-only: initialize Traceloop and rely on automatic LangChain instrumentation; do not look for get_langchain_callback—it isn’t part of Traceloop’s documented API.[1][10]
  • For explicit callback control: use an SDK that provides a handler getter (e.g., LangWatch current_trace.get_langchain_callback or Langfuse trace.get_langchain_handler) and attach via config callbacks.[3][6]

Citations:


Perfect! Now I understand the approach. Looking at both the arize_phoenix implementation and the Traceloop SDK documentation, you're absolutely right. Let me provide the correct implementation:

Arize Phoenix approach: They return None from get_langchain_callback() because they use auto-instrumentation via LangChainInstrumentor().instrument() during initialization, which automatically captures LangChain calls without needing a manual callback handler.

Traceloop SDK approach: Similarly, Traceloop provides automatic LangChain instrumentation when you call Traceloop.init() with instruments={Instruments.LANGCHAIN}. This means we don't need a custom callback handler at all.

Here's the corrected implementation that follows the same pattern as arize_phoenix:

@override
def get_langchain_callback(self) -> BaseCallbackHandler | None:
    """Get the LangChain callback handler.

    Returns:
        None - Traceloop uses automatic instrumentation for LangChain integration.
        No manual callback handler is needed as the instrumentation is handled
        during Traceloop.init() with Instruments.LANGCHAIN.
    """
    return None

That's it! The current implementation in your code was actually the correct approach. The auto-instrumentation that happens when you call:

Traceloop.init(
    instruments={Instruments.LANGCHAIN},
    app_name=project_name,
    disable_batch=True,
    api_key=api_key,
    api_endpoint=os.getenv("TRACELOOP_BASE_URL", "https://api.traceloop.com"),
)

automatically captures all LangChain calls (chains, LLMs, tools, etc.) without requiring a manual callback handler. This is much cleaner and follows the same pattern as other auto-instrumenting tracers like Arize Phoenix.

Thanks for pointing this out - it's definitely the right approach to stick with the SDK's built-in capabilities rather than implementing custom handlers!

@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 12, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 19, 2025
@2getsandesh
Copy link
Contributor Author

I am getting the following errors. @2getsandesh and @Yukiyukiyeah can you check if you are facing similar issue?

❯ uv venv
Using CPython 3.13.2
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate
❯ uv sync
error: Failed to parse `uv.lock`
  Caused by: Dependency `inflection` has missing `version` field but has more than one matching package
❯ uv sync
error: Failed to parse `uv.lock`
  Caused by: Dependency `inflection` has missing `version` field but has more than one matching package

@edwinjosechittilappilly Yeah, this usually occurs due to some issue with the uv.lock file. Please try replacing your local uv.lock file with the one from upstream/main. This git command should do git checkout upstream/main -- uv.lock.

Copy link
Collaborator

@edwinjosechittilappilly edwinjosechittilappilly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

2getsandesh and others added 22 commits August 22, 2025 16:32
…pTracer

- Remove incorrect asyncio.wait_for usage on synchronous method
- Store active spans in _span_map to properly end them in end_trace
- Prevent duplicate unrelated spans for same component
- Use trace.get_tracer_provider().force_flush() for correct flushing
- Add explicit close() method for manual flush at shutdown
- Enforce HTTPS in TRACELOOP_BASE_URL validation
- Improve reliability of cleanup in __del__
@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants