Skip to content

feat(telemetry): add client_type field and remove desktop from payloads#8812

Merged
ogabrielluiz merged 10 commits intomainfrom
add-clienttype-telemetry
Sep 25, 2025
Merged

feat(telemetry): add client_type field and remove desktop from payloads#8812
ogabrielluiz merged 10 commits intomainfrom
add-clienttype-telemetry

Conversation

@ogabrielluiz
Copy link
Contributor

@ogabrielluiz ogabrielluiz commented Jul 1, 2025

Introduce a new client_type field in telemetry payloads and modify the TelemetryService to set and utilize this field appropriately.

Summary by CodeRabbit

  • New Features

    • Added support for specifying the client type in telemetry data, providing improved identification of the application environment.
  • Bug Fixes

    • Ensured consistent handling of client type information across all telemetry payloads.
  • Tests

    • Introduced comprehensive unit and integration tests for telemetry schema validation, edge cases, and performance.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 1, 2025

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes introduce a new optional client_type field to multiple telemetry schema payloads, replacing the desktop field in one case. The telemetry service logic is updated to consistently determine and assign this client_type based on environment variables. Extensive unit tests are added to verify schema behavior and serialization.

Changes

File(s) Change Summary
src/backend/base/langflow/services/telemetry/schema.py Added optional client_type field (with alias clientType) to five payload models; removed desktop from VersionPayload.
src/backend/base/langflow/services/telemetry/service.py Centralized client type determination in TelemetryService; updated payload assignment and logic to use new client_type.
src/backend/tests/unit/services/telemetry/test_telemetry_schema.py Added comprehensive unit, edge case, integration, and performance tests for all telemetry payload schemas and their fields.

Sequence Diagram(s)

sequenceDiagram
    participant Env as Environment
    participant Service as TelemetryService
    participant Payload as TelemetryPayload

    Env->>Service: Initialize TelemetryService
    Service->>Service: _get_client_type() (checks LANGFLOW_DESKTOP)
    Service->>Service: Stores client_type ("desktop" or "oss")

    Service->>Payload: Create payload (e.g., RunPayload)
    Service->>Service: send_telemetry_data(payload)
    alt payload.client_type not set
        Service->>Payload: Set payload.client_type = Service.client_type
    end
    Service->>...: Proceed with telemetry logic
Loading
sequenceDiagram
    participant Test as Test Suite
    participant Payload as TelemetryPayload

    loop For each payload type
        Test->>Payload: Instantiate with/without client_type
        Test->>Payload: Serialize to dict (check alias)
        Test->>Payload: Test edge cases (invalid/None/long/special)
        Test->>Payload: Assert correct behavior
    end
Loading
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch add-clienttype-telemetry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@dosubot dosubot bot added the size:S This PR changes 10-29 lines, ignoring generated files. label Jul 1, 2025
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Jul 1, 2025
@coderabbitai coderabbitai bot changed the title @coderabbitai feat(telemetry): add client_type field and remove desktop from payloads Jul 1, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 1, 2025

Note

Unit test generation is an Early Access feature. Expect some limitations and changes as we gather feedback and continue to improve it.


Generating unit tests... This may take up to 20 minutes.

@github-actions github-actions bot added the enhancement New feature or request label Jul 1, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 1, 2025

Here are the copyable unit test edits:

Copyable Edits

src/backend/tests/unit/base/langflow/services/telemetry/test_schema.py

This is a new file.

mport pytest
rom unittest.mock import patch, MagicMock
rom datetime import datetime
rom uuid import uuid4
mport json
rom typing import Dict, Any, Optional

rom langflow.services.telemetry.schema import (
   TelemetryData,
   ComponentTelemetryData,
   FlowTelemetryData,
   SessionTelemetryData,
   ErrorTelemetryData,
   PerformanceTelemetryData,
   UserTelemetryData,
   TelemetryEvent,
   TelemetryEventType,
   validate_telemetry_data,
   serialize_telemetry_data,
   deserialize_telemetry_data



lass TestTelemetryData:
   """Test cases for TelemetryData base class."""
   
   def test_telemetry_data_initialization_with_valid_data(self):
       """Test TelemetryData initialization with valid parameters."""
       event_id = str(uuid4())
       timestamp = datetime.now()
       user_id = "test_user_123"
       session_id = str(uuid4())
       
       telemetry_data = TelemetryData(
           event_id=event_id,
           timestamp=timestamp,
           user_id=user_id,
           session_id=session_id,
           event_type=TelemetryEventType.COMPONENT_USAGE
       )
       
       assert telemetry_data.event_id == event_id
       assert telemetry_data.timestamp == timestamp
       assert telemetry_data.user_id == user_id
       assert telemetry_data.session_id == session_id
       assert telemetry_data.event_type == TelemetryEventType.COMPONENT_USAGE

   def test_telemetry_data_initialization_with_minimal_data(self):
       """Test TelemetryData initialization with minimal required parameters."""
       telemetry_data = TelemetryData(
           event_type=TelemetryEventType.ERROR
       )
       
       assert telemetry_data.event_id is not None
       assert telemetry_data.timestamp is not None
       assert telemetry_data.event_type == TelemetryEventType.ERROR

   def test_telemetry_data_auto_generated_fields(self):
       """Test that TelemetryData auto-generates required fields when not provided."""
       telemetry_data = TelemetryData(event_type=TelemetryEventType.FLOW_EXECUTION)
       
       # Check that auto-generated fields are valid
       assert isinstance(telemetry_data.event_id, str)
       assert len(telemetry_data.event_id) > 0
       assert isinstance(telemetry_data.timestamp, datetime)

   def test_telemetry_data_to_dict(self):
       """Test TelemetryData serialization to dictionary."""
       event_id = str(uuid4())
       timestamp = datetime.now()
       
       telemetry_data = TelemetryData(
           event_id=event_id,
           timestamp=timestamp,
           user_id="test_user",
           event_type=TelemetryEventType.SESSION_START
       )
       
       data_dict = telemetry_data.to_dict()
       
       assert data_dict["event_id"] == event_id
       assert data_dict["timestamp"] == timestamp.isoformat()
       assert data_dict["user_id"] == "test_user"
       assert data_dict["event_type"] == TelemetryEventType.SESSION_START.value

   def test_telemetry_data_from_dict(self):
       """Test TelemetryData deserialization from dictionary."""
       event_id = str(uuid4())
       timestamp = datetime.now()
       
       data_dict = {
           "event_id": event_id,
           "timestamp": timestamp.isoformat(),
           "user_id": "test_user",
           "session_id": str(uuid4()),
           "event_type": TelemetryEventType.COMPONENT_USAGE.value
       }
       
       telemetry_data = TelemetryData.from_dict(data_dict)
       
       assert telemetry_data.event_id == event_id
       assert telemetry_data.timestamp.replace(microsecond=0) == timestamp.replace(microsecond=0)
       assert telemetry_data.user_id == "test_user"
       assert telemetry_data.event_type == TelemetryEventType.COMPONENT_USAGE


lass TestComponentTelemetryData:
   """Test cases for ComponentTelemetryData."""
   
   def test_component_telemetry_data_initialization(self):
       """Test ComponentTelemetryData initialization with valid parameters."""
       component_data = ComponentTelemetryData(
           component_id="text_input_1",
           component_type="TextInput",
           component_name="User Input",
           flow_id=str(uuid4()),
           execution_time=0.152,
           input_count=1,
           output_count=1,
           error_count=0
       )
       
       assert component_data.component_id == "text_input_1"
       assert component_data.component_type == "TextInput"
       assert component_data.component_name == "User Input"
       assert component_data.execution_time == 0.152
       assert component_data.input_count == 1
       assert component_data.output_count == 1
       assert component_data.error_count == 0

   def test_component_telemetry_data_with_errors(self):
       """Test ComponentTelemetryData with error conditions."""
       component_data = ComponentTelemetryData(
           component_id="llm_1",
           component_type="LLMChain",
           component_name="OpenAI LLM",
           flow_id=str(uuid4()),
           execution_time=5.234,
           input_count=1,
           output_count=0,
           error_count=1,
           error_message="API rate limit exceeded"
       )
       
       assert component_data.error_count == 1
       assert component_data.error_message == "API rate limit exceeded"
       assert component_data.output_count == 0

   def test_component_telemetry_data_validation_invalid_execution_time(self):
       """Test ComponentTelemetryData validation with invalid execution time."""
       with pytest.raises(ValueError, match="Execution time cannot be negative"):
           ComponentTelemetryData(
               component_id="test_component",
               component_type="TestComponent",
               execution_time=-1.0
           )

   def test_component_telemetry_data_validation_invalid_counts(self):
       """Test ComponentTelemetryData validation with invalid counts."""
       with pytest.raises(ValueError, match="Count values cannot be negative"):
           ComponentTelemetryData(
               component_id="test_component",
               component_type="TestComponent",
               input_count=-1
           )

   def test_component_telemetry_data_to_dict_with_metadata(self):
       """Test ComponentTelemetryData serialization with metadata."""
       metadata = {"model": "gpt-3.5-turbo", "temperature": 0.7}
       
       component_data = ComponentTelemetryData(
           component_id="llm_1",
           component_type="OpenAI",
           metadata=metadata
       )
       
       data_dict = component_data.to_dict()
       
       assert data_dict["component_id"] == "llm_1"
       assert data_dict["component_type"] == "OpenAI"
       assert data_dict["metadata"] == metadata


lass TestFlowTelemetryData:
   """Test cases for FlowTelemetryData."""
   
   def test_flow_telemetry_data_initialization(self):
       """Test FlowTelemetryData initialization with valid parameters."""
       flow_id = str(uuid4())
       flow_data = FlowTelemetryData(
           flow_id=flow_id,
           flow_name="Customer Support Flow",
           flow_description="Automated customer support using LLM",
           component_count=5,
           execution_time=12.456,
           success=True,
           total_tokens=1250,
           total_cost=0.045
       )
       
       assert flow_data.flow_id == flow_id
       assert flow_data.flow_name == "Customer Support Flow"
       assert flow_data.component_count == 5
       assert flow_data.execution_time == 12.456
       assert flow_data.success is True
       assert flow_data.total_tokens == 1250
       assert flow_data.total_cost == 0.045

   def test_flow_telemetry_data_failed_execution(self):
       """Test FlowTelemetryData for failed flow execution."""
       flow_data = FlowTelemetryData(
           flow_id=str(uuid4()),
           flow_name="Test Flow",
           component_count=3,
           execution_time=2.123,
           success=False,
           error_message="Component 'llm_1' failed with timeout",
           failed_component_id="llm_1"
       )
       
       assert flow_data.success is False
       assert flow_data.error_message == "Component 'llm_1' failed with timeout"
       assert flow_data.failed_component_id == "llm_1"

   def test_flow_telemetry_data_validation_negative_values(self):
       """Test FlowTelemetryData validation with negative values."""
       with pytest.raises(ValueError, match="Component count cannot be negative"):
           FlowTelemetryData(
               flow_id=str(uuid4()),
               component_count=-1
           )
       
       with pytest.raises(ValueError, match="Execution time cannot be negative"):
           FlowTelemetryData(
               flow_id=str(uuid4()),
               execution_time=-5.0
           )

   def test_flow_telemetry_data_complex_flow(self):
       """Test FlowTelemetryData for complex flow with multiple components."""
       component_executions = [
           {"component_id": "input_1", "execution_time": 0.001},
           {"component_id": "llm_1", "execution_time": 3.245},
           {"component_id": "memory_1", "execution_time": 0.156},
           {"component_id": "output_1", "execution_time": 0.002}
       ]
       
       flow_data = FlowTelemetryData(
           flow_id=str(uuid4()),
           flow_name="Multi-Component Flow",
           component_count=4,
           execution_time=3.404,
           success=True,
           component_executions=component_executions
       )
       
       assert len(flow_data.component_executions) == 4
       assert flow_data.component_executions[1]["execution_time"] == 3.245


lass TestSessionTelemetryData:
   """Test cases for SessionTelemetryData."""
   
   def test_session_telemetry_data_start(self):
       """Test SessionTelemetryData for session start event."""
       session_id = str(uuid4())
       session_data = SessionTelemetryData(
           session_id=session_id,
           action="start",
           user_agent="Mozilla/5.0 Chrome/96.0.4664.110",
           ip_address="192.168.1.100",
           country="US",
           city="San Francisco"
       )
       
       assert session_data.session_id == session_id
       assert session_data.action == "start"
       assert session_data.user_agent == "Mozilla/5.0 Chrome/96.0.4664.110"
       assert session_data.country == "US"
       assert session_data.city == "San Francisco"

   def test_session_telemetry_data_end_with_duration(self):
       """Test SessionTelemetryData for session end event with duration."""
       session_data = SessionTelemetryData(
           session_id=str(uuid4()),
           action="end",
           duration=3600.5,  # 1 hour and 0.5 seconds
           flows_executed=3,
           components_used=15
       )
       
       assert session_data.action == "end"
       assert session_data.duration == 3600.5
       assert session_data.flows_executed == 3
       assert session_data.components_used == 15

   def test_session_telemetry_data_validation_invalid_action(self):
       """Test SessionTelemetryData validation with invalid action."""
       with pytest.raises(ValueError, match="Action must be one of"):
           SessionTelemetryData(
               session_id=str(uuid4()),
               action="invalid_action"
           )

   def test_session_telemetry_data_validation_negative_duration(self):
       """Test SessionTelemetryData validation with negative duration."""
       with pytest.raises(ValueError, match="Duration cannot be negative"):
           SessionTelemetryData(
               session_id=str(uuid4()),
               action="end",
               duration=-100.0
           )


lass TestErrorTelemetryData:
   """Test cases for ErrorTelemetryData."""
   
   def test_error_telemetry_data_initialization(self):
       """Test ErrorTelemetryData initialization with valid parameters."""
       error_data = ErrorTelemetryData(
           error_type="ValueError",
           error_message="Invalid input parameter",
           stack_trace="Traceback (most recent call last):\n  File...",
           component_id="text_processor_1",
           flow_id=str(uuid4()),
           severity="high"
       )
       
       assert error_data.error_type == "ValueError"
       assert error_data.error_message == "Invalid input parameter"
       assert error_data.component_id == "text_processor_1"
       assert error_data.severity == "high"

   def test_error_telemetry_data_with_context(self):
       """Test ErrorTelemetryData with additional context information."""
       context = {
           "input_data": "test input",
           "component_config": {"max_length": 100},
           "execution_step": "validation"
       }
       
       error_data = ErrorTelemetryData(
           error_type="ValidationError",
           error_message="Input exceeds maximum length",
           context=context,
           severity="medium"
       )
       
       assert error_data.context == context
       assert error_data.context["execution_step"] == "validation"

   def test_error_telemetry_data_validation_invalid_severity(self):
       """Test ErrorTelemetryData validation with invalid severity."""
       with pytest.raises(ValueError, match="Severity must be one of"):
           ErrorTelemetryData(
               error_type="CustomError",
               error_message="Test error",
               severity="invalid_severity"
           )

   def test_error_telemetry_data_sanitization(self):
       """Test ErrorTelemetryData sanitizes sensitive information."""
       sensitive_message = "Database connection failed: password=secret123"
       
       error_data = ErrorTelemetryData(
           error_type="ConnectionError",
           error_message=sensitive_message,
           sanitize_sensitive_data=True
       )
       
       # Should sanitize password information
       assert "password=" not in error_data.error_message
       assert "secret123" not in error_data.error_message


lass TestPerformanceTelemetryData:
   """Test cases for PerformanceTelemetryData."""
   
   def test_performance_telemetry_data_initialization(self):
       """Test PerformanceTelemetryData initialization with valid parameters."""
       perf_data = PerformanceTelemetryData(
           operation="flow_execution",
           duration=5.234,
           memory_usage=156.7,  # MB
           cpu_usage=45.2,  # percentage
           gpu_usage=78.9,  # percentage
           tokens_processed=1500,
           throughput=287.5  # tokens per second
       )
       
       assert perf_data.operation == "flow_execution"
       assert perf_data.duration == 5.234
       assert perf_data.memory_usage == 156.7
       assert perf_data.cpu_usage == 45.2
       assert perf_data.gpu_usage == 78.9
       assert perf_data.tokens_processed == 1500
       assert perf_data.throughput == 287.5

   def test_performance_telemetry_data_validation_negative_values(self):
       """Test PerformanceTelemetryData validation with negative values."""
       with pytest.raises(ValueError, match="Duration cannot be negative"):
           PerformanceTelemetryData(
               operation="test_operation",
               duration=-1.0
           )
       
       with pytest.raises(ValueError, match="Memory usage cannot be negative"):
           PerformanceTelemetryData(
               operation="test_operation",
               memory_usage=-50.0
           )

   def test_performance_telemetry_data_validation_percentage_ranges(self):
       """Test PerformanceTelemetryData validation with invalid percentage ranges."""
       with pytest.raises(ValueError, match="CPU usage must be between 0 and 100"):
           PerformanceTelemetryData(
               operation="test_operation",
               cpu_usage=150.0
           )
       
       with pytest.raises(ValueError, match="GPU usage must be between 0 and 100"):
           PerformanceTelemetryData(
               operation="test_operation",
               gpu_usage=-10.0
           )

   def test_performance_telemetry_data_calculated_metrics(self):
       """Test PerformanceTelemetryData with calculated metrics."""
       perf_data = PerformanceTelemetryData(
           operation="llm_inference",
           duration=3.0,
           tokens_processed=1200
       )
       
       # Should calculate throughput automatically
       expected_throughput = 1200 / 3.0  # 400 tokens per second
       assert perf_data.throughput == expected_throughput


lass TestUserTelemetryData:
   """Test cases for UserTelemetryData."""
   
   def test_user_telemetry_data_initialization(self):
       """Test UserTelemetryData initialization with valid parameters."""
       user_data = UserTelemetryData(
           user_id="user_123",
           user_type="premium",
           plan="pro",
           organization_id="org_456",
           feature_usage={
               "flows_created": 15,
               "api_calls": 2500,
               "storage_used": 1.2  # GB
           }
       )
       
       assert user_data.user_id == "user_123"
       assert user_data.user_type == "premium"
       assert user_data.plan == "pro"
       assert user_data.organization_id == "org_456"
       assert user_data.feature_usage["flows_created"] == 15

   def test_user_telemetry_data_anonymization(self):
       """Test UserTelemetryData anonymizes user data when requested."""
       user_data = UserTelemetryData(
           user_id="user_123",
           email="test@example.com",
           anonymize=True
       )
       
       # Should anonymize personal identifiers
       assert user_data.user_id != "user_123"
       assert user_data.email is None or "@" not in user_data.email

   def test_user_telemetry_data_feature_limits(self):
       """Test UserTelemetryData with feature usage and limits."""
       user_data = UserTelemetryData(
           user_id="user_123",
           plan="starter",
           feature_usage={
               "flows_created": 8,
               "api_calls": 950
           },
           feature_limits={
               "flows_created": 10,
               "api_calls": 1000
           }
       )
       
       assert user_data.feature_usage["flows_created"] == 8
       assert user_data.feature_limits["flows_created"] == 10
       
       # Should calculate usage percentages
       flows_usage_percent = (8 / 10) * 100
       assert user_data.get_feature_usage_percentage("flows_created") == flows_usage_percent


lass TestTelemetryEvent:
   """Test cases for TelemetryEvent."""
   
   def test_telemetry_event_initialization(self):
       """Test TelemetryEvent initialization with valid parameters."""
       component_data = ComponentTelemetryData(
           component_id="test_component",
           component_type="TestComponent"
       )
       
       event = TelemetryEvent(
           event_type=TelemetryEventType.COMPONENT_USAGE,
           data=component_data,
           metadata={"version": "1.0.0"}
       )
       
       assert event.event_type == TelemetryEventType.COMPONENT_USAGE
       assert isinstance(event.data, ComponentTelemetryData)
       assert event.metadata["version"] == "1.0.0"

   def test_telemetry_event_serialization(self):
       """Test TelemetryEvent serialization and deserialization."""
       flow_data = FlowTelemetryData(
           flow_id=str(uuid4()),
           flow_name="Test Flow"
       )
       
       event = TelemetryEvent(
           event_type=TelemetryEventType.FLOW_EXECUTION,
           data=flow_data
       )
       
       # Serialize to dict
       event_dict = event.to_dict()
       assert event_dict["event_type"] == TelemetryEventType.FLOW_EXECUTION.value
       assert "data" in event_dict
       
       # Deserialize from dict
       deserialized_event = TelemetryEvent.from_dict(event_dict)
       assert deserialized_event.event_type == TelemetryEventType.FLOW_EXECUTION
       assert isinstance(deserialized_event.data, FlowTelemetryData)

   def test_telemetry_event_validation_mismatched_data_type(self):
       """Test TelemetryEvent validation with mismatched event type and data."""
       component_data = ComponentTelemetryData(
           component_id="test_component",
           component_type="TestComponent"
       )
       
       # Should raise error for mismatched event type and data type
       with pytest.raises(ValueError, match="Event type and data type mismatch"):
           TelemetryEvent(
               event_type=TelemetryEventType.FLOW_EXECUTION,  # Flow event type
               data=component_data  # But component data
           )


lass TestTelemetryUtilityFunctions:
   """Test cases for telemetry utility functions."""
   
   def test_validate_telemetry_data_valid_data(self):
       """Test validate_telemetry_data with valid data."""
       component_data = ComponentTelemetryData(
           component_id="valid_component",
           component_type="ValidComponent"
       )
       
       # Should not raise exception for valid data
       assert validate_telemetry_data(component_data) is True

   def test_validate_telemetry_data_invalid_data(self):
       """Test validate_telemetry_data with invalid data."""
       # Test with None
       assert validate_telemetry_data(None) is False
       
       # Test with invalid object
       assert validate_telemetry_data("not_telemetry_data") is False

   def test_serialize_telemetry_data(self):
       """Test serialize_telemetry_data function."""
       error_data = ErrorTelemetryData(
           error_type="TestError",
           error_message="Test error message"
       )
       
       serialized = serialize_telemetry_data(error_data)
       
       assert isinstance(serialized, str)
       # Should be valid JSON
       parsed = json.loads(serialized)
       assert parsed["error_type"] == "TestError"
       assert parsed["error_message"] == "Test error message"

   def test_deserialize_telemetry_data(self):
       """Test deserialize_telemetry_data function."""
       original_data = SessionTelemetryData(
           session_id=str(uuid4()),
           action="start"
       )
       
       # Serialize then deserialize
       serialized = serialize_telemetry_data(original_data)
       deserialized = deserialize_telemetry_data(serialized, SessionTelemetryData)
       
       assert isinstance(deserialized, SessionTelemetryData)
       assert deserialized.session_id == original_data.session_id
       assert deserialized.action == original_data.action

   def test_deserialize_telemetry_data_invalid_json(self):
       """Test deserialize_telemetry_data with invalid JSON."""
       with pytest.raises(ValueError, match="Invalid JSON"):
           deserialize_telemetry_data("invalid json string", TelemetryData)

   def test_deserialize_telemetry_data_type_mismatch(self):
       """Test deserialize_telemetry_data with type mismatch."""
       component_data = ComponentTelemetryData(
           component_id="test_component",
           component_type="TestComponent"
       )
       
       serialized = serialize_telemetry_data(component_data)
       
       # Try to deserialize as wrong type
       with pytest.raises(ValueError, match="Data type mismatch"):
           deserialize_telemetry_data(serialized, FlowTelemetryData)


lass TestTelemetryDataEdgeCases:
   """Test cases for edge cases and boundary conditions."""
   
   def test_telemetry_data_with_unicode_strings(self):
       """Test telemetry data handling unicode strings."""
       component_data = ComponentTelemetryData(
           component_id="unicode_test_🚀",
           component_type="UnicodeComponent",
           component_name="测试组件",
           error_message="Erreur avec caractères spéciaux: éàüñ"
       )
       
       # Should handle unicode properly
       data_dict = component_data.to_dict()
       assert data_dict["component_id"] == "unicode_test_🚀"
       assert data_dict["component_name"] == "测试组件"

   def test_telemetry_data_with_extremely_large_values(self):
       """Test telemetry data with extremely large values."""
       large_value = 10**15  # Very large number
       
       perf_data = PerformanceTelemetryData(
           operation="stress_test",
           duration=large_value,
           tokens_processed=large_value,
           memory_usage=large_value
       )
       
       assert perf_data.duration == large_value
       assert perf_data.tokens_processed == large_value

   def test_telemetry_data_with_empty_strings(self):
       """Test telemetry data with empty strings."""
       component_data = ComponentTelemetryData(
           component_id="",  # Empty string
           component_type="EmptyTest",
           component_name=""
       )
       
       # Should handle empty strings gracefully
       assert component_data.component_id == ""
       assert component_data.component_name == ""

   def test_telemetry_data_with_none_optional_fields(self):
       """Test telemetry data with None values for optional fields."""
       flow_data = FlowTelemetryData(
           flow_id=str(uuid4()),
           flow_name=None,  # None for optional field
           flow_description=None,
           error_message=None
       )
       
       # Should handle None values properly
       data_dict = flow_data.to_dict()
       assert data_dict.get("flow_name") is None
       assert data_dict.get("flow_description") is None

   def test_telemetry_data_with_special_characters_in_ids(self):
       """Test telemetry data with special characters in IDs."""
       special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?"
       
       component_data = ComponentTelemetryData(
           component_id=f"test_{special_chars}",
           component_type="SpecialCharTest"
       )
       
       # Should preserve special characters
       assert special_chars in component_data.component_id

   @pytest.mark.parametrize("invalid_timestamp", [
       "not_a_datetime",
       12345,
       None,
       []
   ])
   def test_telemetry_data_invalid_timestamp_types(self, invalid_timestamp):
       """Test telemetry data with various invalid timestamp types."""
       with pytest.raises((ValueError, TypeError)):
           TelemetryData(
               event_type=TelemetryEventType.ERROR,
               timestamp=invalid_timestamp
           )

   @pytest.mark.parametrize("event_type", list(TelemetryEventType))
   def test_all_telemetry_event_types(self, event_type):
       """Test that all telemetry event types can be used."""
       telemetry_data = TelemetryData(event_type=event_type)
       assert telemetry_data.event_type == event_type

   def test_telemetry_data_thread_safety(self):
       """Test telemetry data creation in multi-threaded environment."""
       import threading
       import time
       
       results = []
       
       def create_telemetry_data():
           for _ in range(10):
               data = TelemetryData(event_type=TelemetryEventType.COMPONENT_USAGE)
               results.append(data.event_id)
               time.sleep(0.001)  # Small delay
       
       threads = [threading.Thread(target=create_telemetry_data) for _ in range(5)]
       
       for thread in threads:
           thread.start()
       
       for thread in threads:
           thread.join()
       
       # All event IDs should be unique
       assert len(results) == len(set(results))


lass TestTelemetryDataIntegration:
   """Integration tests for telemetry data components."""
   
   def test_complete_flow_telemetry_workflow(self):
       """Test complete telemetry workflow for a flow execution."""
       flow_id = str(uuid4())
       session_id = str(uuid4())
       user_id = "test_user_integration"
       
       # 1. Session start
       session_start = SessionTelemetryData(
           session_id=session_id,
           action="start",
           user_id=user_id
       )
       
       # 2. Flow execution start
       flow_start = FlowTelemetryData(
           flow_id=flow_id,
           flow_name="Integration Test Flow",
           session_id=session_id,
           user_id=user_id,
           component_count=3
       )
       
       # 3. Component executions
       components = [
           ComponentTelemetryData(
               component_id="input_1",
               component_type="TextInput",
               flow_id=flow_id,
               session_id=session_id,
               execution_time=0.001
           ),
           ComponentTelemetryData(
               component_id="llm_1",
               component_type="OpenAI",
               flow_id=flow_id,
               session_id=session_id,
               execution_time=2.345
           ),
           ComponentTelemetryData(
               component_id="output_1",
               component_type="TextOutput",
               flow_id=flow_id,
               session_id=session_id,
               execution_time=0.002
           )
       ]
       
       # 4. Flow execution complete
       total_execution_time = sum(comp.execution_time for comp in components)
       flow_end = FlowTelemetryData(
           flow_id=flow_id,
           execution_time=total_execution_time,
           success=True,
           session_id=session_id
       )
       
       # 5. Session end
       session_end = SessionTelemetryData(
           session_id=session_id,
           action="end",
           duration=300.0,  # 5 minutes
           flows_executed=1,
           components_used=3
       )
       
       # Verify all data is consistent
       assert all(comp.flow_id == flow_id for comp in components)
       assert all(comp.session_id == session_id for comp in components)
       assert flow_end.execution_time == total_execution_time
       assert session_end.flows_executed == 1
       assert session_end.components_used == 3

   def test_error_handling_telemetry_workflow(self):
       """Test telemetry workflow when errors occur."""
       flow_id = str(uuid4())
       session_id = str(uuid4())
       
       # Component that fails
       failed_component = ComponentTelemetryData(
           component_id="failing_llm",
           component_type="OpenAI",
           flow_id=flow_id,
           session_id=session_id,
           execution_time=1.234,
           error_count=1,
           error_message="API timeout"
       )
       
       # Error telemetry
       error_data = ErrorTelemetryData(
           error_type="TimeoutError",
           error_message="OpenAI API request timed out after 30 seconds",
           component_id="failing_llm",
           flow_id=flow_id,
           session_id=session_id,
           severity="high"
       )
       
       # Flow fails due to component error
       flow_data = FlowTelemetryData(
           flow_id=flow_id,
           success=False,
           failed_component_id="failing_llm",
           error_message="Flow execution failed: API timeout",
           session_id=session_id
       )
       
       # Verify error propagation
       assert failed_component.error_count == 1
       assert error_data.component_id == failed_component.component_id
       assert flow_data.failed_component_id == failed_component.component_id
       assert not flow_data.success

   def test_performance_monitoring_workflow(self):
       """Test performance monitoring telemetry workflow."""
       operations = [
           ("model_loading", 5.234, 512.0, 25.5),
           ("tokenization", 0.156, 64.0, 45.2),
           ("inference", 3.789, 1024.0, 89.7),
           ("response_generation", 1.234, 256.0, 67.3)
       ]
       
       perf_data_list = []
       
       for operation, duration, memory, cpu in operations:
           perf_data = PerformanceTelemetryData(
               operation=operation,
               duration=duration,
               memory_usage=memory,
               cpu_usage=cpu
           )
           perf_data_list.append(perf_data)
       
       # Calculate total metrics
       total_duration = sum(p.duration for p in perf_data_list)
       max_memory = max(p.memory_usage for p in perf_data_list)
       avg_cpu = sum(p.cpu_usage for p in perf_data_list) / len(perf_data_list)
       
       # Create summary performance data
       summary_perf = PerformanceTelemetryData(
           operation="complete_inference_pipeline",
           duration=total_duration,
           memory_usage=max_memory,
           cpu_usage=avg_cpu
       )
       
       assert summary_perf.duration == total_duration
       assert summary_perf.memory_usage == max_memory
       assert abs(summary_perf.cpu_usage - avg_cpu) < 0.001


 Test configuration and fixtures
pytest.fixture
ef sample_telemetry_data():
   """Fixture providing sample telemetry data for tests."""
   return {
       "component": ComponentTelemetryData(
           component_id="test_component",
           component_type="TestComponent"
       ),
       "flow": FlowTelemetryData(
           flow_id=str(uuid4()),
           flow_name="Test Flow"
       ),
       "session": SessionTelemetryData(
           session_id=str(uuid4()),
           action="start"
       ),
       "error": ErrorTelemetryData(
           error_type="TestError",
           error_message="Test error message"
       ),
       "performance": PerformanceTelemetryData(
           operation="test_operation",
           duration=1.0
       ),
       "user": UserTelemetryData(
           user_id="test_user",
           user_type="standard"
       )
   }


pytest.fixture
ef mock_datetime():
   """Fixture to mock datetime for consistent testing."""
   fixed_datetime = datetime(2024, 1, 15, 12, 0, 0)
   with patch('langflow.services.telemetry.schema.datetime') as mock_dt:
       mock_dt.now.return_value = fixed_datetime
       mock_dt.side_effect = lambda *args, **kw: datetime(*args, **kw)
       yield fixed_datetime


 Performance and stress tests
lass TestTelemetryDataPerformance:
   """Performance tests for telemetry data operations."""
   
   def test_telemetry_data_creation_performance(self):
       """Test performance of creating many telemetry data objects."""
       import time
       
       start_time = time.time()
       
       # Create 1000 telemetry data objects
       telemetry_objects = []
       for i in range(1000):
           data = ComponentTelemetryData(
               component_id=f"component_{i}",
               component_type="PerformanceTest",
               execution_time=i * 0.001
           )
           telemetry_objects.append(data)
       
       creation_time = time.time() - start_time
       
       # Should create 1000 objects reasonably quickly (under 1 second)
       assert creation_time < 1.0
       assert len(telemetry_objects) == 1000

   def test_telemetry_data_serialization_performance(self):
       """Test performance of serializing telemetry data."""
       import time
       
       # Create complex telemetry data
       complex_data = FlowTelemetryData(
           flow_id=str(uuid4()),
           flow_name="Performance Test Flow",
           component_count=50,
           component_executions=[
               {"component_id": f"comp_{i}", "execution_time": i * 0.01}
               for i in range(50)
           ],
           metadata={
               "large_metadata": ["item_" + str(i) for i in range(100)]
           }
       )
       
       start_time = time.time()
       
       # Serialize 100 times
       for _ in range(100):
           serialized = serialize_telemetry_data(complex_data)
           assert len(serialized) > 0
       
       serialization_time = time.time() - start_time
       
       # Should serialize 100 complex objects reasonably quickly
       assert serialization_time < 2.0


f __name__ == "__main__":
   pytest.main([__file__])

src/backend/tests/unit/base/langflow/services/telemetry/test_service.py

This is a new file.

""Comprehensive unit tests for TelemetryService.

esting library and framework: pytest with pytest-asyncio for async testing
""
mport asyncio
mport os
mport platform
rom datetime import datetime, timezone
rom typing import Any
rom unittest.mock import AsyncMock, Mock, patch, MagicMock

mport httpx
mport pytest
rom loguru import logger

rom langflow.services.telemetry.service import TelemetryService
rom langflow.services.telemetry.schema import (
   ComponentPayload,
   PlaygroundPayload,
   RunPayload,
   ShutdownPayload,
   VersionPayload,



lass MockSettingsService:
   """Mock settings service for testing."""
   
   def __init__(self):
       self.settings = Mock()
       self.settings.telemetry_base_url = "https://api.example.com/telemetry"
       self.settings.do_not_track = False
       self.settings.prometheus_enabled = True
       self.settings.cache_type = "memory"
       self.settings.backend_only = False
       
       self.auth_settings = Mock()
       self.auth_settings.AUTO_LOGIN = False


pytest.fixture
ef mock_settings_service():
   """Create a mock settings service."""
   return MockSettingsService()


pytest.fixture
ef telemetry_service(mock_settings_service):
   """Create a TelemetryService instance with mocked dependencies."""
   return TelemetryService(mock_settings_service)


pytest.fixture
ef sample_run_payload():
   """Sample run payload for testing."""
   return RunPayload(
       run_is_webhook=False,
       run_seconds=120,
       run_success=True,
       run_error_message="",
       client_type="oss"
   )


pytest.fixture
ef sample_shutdown_payload():
   """Sample shutdown payload for testing."""
   return ShutdownPayload(
       time_running=3600,
       client_type="oss"
   )


pytest.fixture
ef sample_version_payload():
   """Sample version payload for testing."""
   return VersionPayload(
       package="langflow",
       version="1.0.0",
       platform="Linux-5.4.0",
       python="3.9",
       arch="x86_64",
       auto_login=False,
       cache_type="memory",
       backend_only=False,
       client_type="oss"
   )


pytest.fixture
ef sample_playground_payload():
   """Sample playground payload for testing."""
   return PlaygroundPayload(
       playground_seconds=45,
       playground_component_count=5,
       playground_success=True,
       playground_error_message="",
       client_type="oss"
   )


pytest.fixture
ef sample_component_payload():
   """Sample component payload for testing."""
   return ComponentPayload(
       component_name="TextInput",
       component_seconds=2,
       component_success=True,
       component_error_message=None,
       client_type="oss"
   )


lass TestTelemetryServiceInitialization:
   """Test telemetry service initialization."""

   def test_init_with_default_settings(self, mock_settings_service):
       """Test initialization with default settings."""
       service = TelemetryService(mock_settings_service)
       
       assert service.settings_service == mock_settings_service
       assert service.base_url == "https://api.example.com/telemetry"
       assert not service.running
       assert not service._stopping
       assert not service.do_not_track
       assert service.client_type == "oss"
       assert isinstance(service.telemetry_queue, asyncio.Queue)
       assert isinstance(service.client, httpx.AsyncClient)

   def test_init_with_do_not_track_setting(self, mock_settings_service):
       """Test initialization with do_not_track enabled in settings."""
       mock_settings_service.settings.do_not_track = True
       service = TelemetryService(mock_settings_service)
       assert service.do_not_track

   @patch.dict(os.environ, {"DO_NOT_TRACK": "true"})
   def test_init_with_do_not_track_env_var(self, mock_settings_service):
       """Test initialization with DO_NOT_TRACK environment variable."""
       service = TelemetryService(mock_settings_service)
       assert service.do_not_track

   @patch.dict(os.environ, {"LANGFLOW_DESKTOP": "true"})
   def test_init_with_desktop_client_type(self, mock_settings_service):
       """Test initialization with desktop client type."""
       service = TelemetryService(mock_settings_service)
       assert service.client_type == "desktop"

   @patch.dict(os.environ, {"LANGFLOW_DESKTOP": "1"})
   def test_init_with_desktop_client_type_numeric(self, mock_settings_service):
       """Test initialization with desktop client type using numeric value."""
       service = TelemetryService(mock_settings_service)
       assert service.client_type == "desktop"

   def test_get_langflow_desktop_various_values(self, telemetry_service):
       """Test _get_langflow_desktop with various environment values."""
       test_cases = [
           ("true", True),
           ("True", True),
           ("1", True),
           ("false", False),
           ("False", False),
           ("0", False),
           ("", False),
           ("random", False),
       ]
       
       for env_value, expected in test_cases:
           with patch.dict(os.environ, {"LANGFLOW_DESKTOP": env_value}):
               result = telemetry_service._get_langflow_desktop()
               assert result == expected, f"Failed for env_value: {env_value}"


lass TestTelemetryServiceLifecycle:
   """Test telemetry service start/stop lifecycle."""

   @pytest.mark.asyncio
   async def test_start_service_success(self, telemetry_service):
       """Test successful service start."""
       with patch('asyncio.create_task') as mock_create_task:
           mock_task = Mock()
           mock_create_task.return_value = mock_task
           
           telemetry_service.start()
           
           assert telemetry_service.running
           assert isinstance(telemetry_service._start_time, datetime)
           assert mock_create_task.call_count == 2  # worker and version tasks

   def test_start_service_when_do_not_track(self, telemetry_service):
       """Test service start when do_not_track is enabled."""
       telemetry_service.do_not_track = True
       
       telemetry_service.start()
       
       assert not telemetry_service.running

   def test_start_service_already_running(self, telemetry_service):
       """Test service start when already running."""
       telemetry_service.running = True
       
       with patch('asyncio.create_task') as mock_create_task:
           telemetry_service.start()
           mock_create_task.assert_not_called()

   @pytest.mark.asyncio
   async def test_stop_service_success(self, telemetry_service):
       """Test successful service stop."""
       # Setup running service
       telemetry_service.running = True
       telemetry_service.worker_task = AsyncMock()
       telemetry_service.log_package_version_task = AsyncMock()
       telemetry_service.client = AsyncMock()
       
       with patch.object(telemetry_service, 'flush', new_callable=AsyncMock) as mock_flush:
           await telemetry_service.stop()
           
           assert telemetry_service._stopping
           assert not telemetry_service.running
           mock_flush.assert_called_once()
           telemetry_service.client.aclose.assert_called_once()

   @pytest.mark.asyncio
   async def test_stop_service_when_do_not_track(self, telemetry_service):
       """Test service stop when do_not_track is enabled."""
       telemetry_service.do_not_track = True
       
       with patch.object(telemetry_service, 'flush', new_callable=AsyncMock) as mock_flush:
           await telemetry_service.stop()
           mock_flush.assert_not_called()

   @pytest.mark.asyncio
   async def test_stop_service_already_stopping(self, telemetry_service):
       """Test service stop when already stopping."""
       telemetry_service._stopping = True
       
       with patch.object(telemetry_service, 'flush', new_callable=AsyncMock) as mock_flush:
           await telemetry_service.stop()
           mock_flush.assert_not_called()

   @pytest.mark.asyncio
   async def test_teardown_calls_stop(self, telemetry_service):
       """Test teardown calls stop method."""
       with patch.object(telemetry_service, 'stop', new_callable=AsyncMock) as mock_stop:
           await telemetry_service.teardown()
           mock_stop.assert_called_once()


lass TestTelemetryServiceDataSending:
   """Test telemetry data sending functionality."""

   @pytest.mark.asyncio
   async def test_send_telemetry_data_success(self, telemetry_service, sample_run_payload):
       """Test successful telemetry data sending."""
       mock_response = Mock()
       mock_response.status_code = httpx.codes.OK
       
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.return_value = mock_response
           
           await telemetry_service.send_telemetry_data(sample_run_payload, "run")
           
           mock_get.assert_called_once()
           call_args = mock_get.call_args
           assert "run" in call_args[0][0]  # URL contains path
           assert "params" in call_args[1]

   @pytest.mark.asyncio
   async def test_send_telemetry_data_without_path(self, telemetry_service, sample_run_payload):
       """Test telemetry data sending without path."""
       mock_response = Mock()
       mock_response.status_code = httpx.codes.OK
       
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.return_value = mock_response
           
           await telemetry_service.send_telemetry_data(sample_run_payload)
           
           call_args = mock_get.call_args
           assert call_args[0][0] == telemetry_service.base_url

   @pytest.mark.asyncio
   async def test_send_telemetry_data_when_do_not_track(self, telemetry_service, sample_run_payload):
       """Test telemetry data sending when do_not_track is enabled."""
       telemetry_service.do_not_track = True
       
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           await telemetry_service.send_telemetry_data(sample_run_payload)
           mock_get.assert_not_called()

   @pytest.mark.asyncio
   async def test_send_telemetry_data_http_error(self, telemetry_service, sample_run_payload):
       """Test telemetry data sending with HTTP error."""
       mock_response = Mock()
       mock_response.status_code = 500
       mock_response.text = "Internal Server Error"
       
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.return_value = mock_response
           
           # Should not raise exception
           await telemetry_service.send_telemetry_data(sample_run_payload)

   @pytest.mark.asyncio
   async def test_send_telemetry_data_http_status_error(self, telemetry_service, sample_run_payload):
       """Test telemetry data sending with HTTPStatusError."""
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.side_effect = httpx.HTTPStatusError("HTTP Error", request=Mock(), response=Mock())
           
           # Should not raise exception
           await telemetry_service.send_telemetry_data(sample_run_payload)

   @pytest.mark.asyncio
   async def test_send_telemetry_data_request_error(self, telemetry_service, sample_run_payload):
       """Test telemetry data sending with RequestError."""
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.side_effect = httpx.RequestError("Network Error", request=Mock())
           
           # Should not raise exception
           await telemetry_service.send_telemetry_data(sample_run_payload)

   @pytest.mark.asyncio
   async def test_send_telemetry_data_general_exception(self, telemetry_service, sample_run_payload):
       """Test telemetry data sending with general exception."""
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.side_effect = Exception("Unexpected error")
           
           # Should not raise exception
           await telemetry_service.send_telemetry_data(sample_run_payload)

   @pytest.mark.asyncio
   async def test_send_telemetry_data_sets_client_type(self, telemetry_service, sample_run_payload):
       """Test that client_type is set when None."""
       sample_run_payload.client_type = None
       mock_response = Mock()
       mock_response.status_code = httpx.codes.OK
       
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.return_value = mock_response
           
           await telemetry_service.send_telemetry_data(sample_run_payload)
           
           assert sample_run_payload.client_type == telemetry_service.client_type


lass TestTelemetryServiceEventLogging:
   """Test telemetry event logging methods."""

   @pytest.mark.asyncio
   async def test_log_package_run(self, telemetry_service, sample_run_payload):
       """Test logging package run event."""
       with patch.object(telemetry_service, '_queue_event', new_callable=AsyncMock) as mock_queue:
           await telemetry_service.log_package_run(sample_run_payload)
           mock_queue.assert_called_once()

   @pytest.mark.asyncio
   async def test_log_package_shutdown(self, telemetry_service):
       """Test logging package shutdown event."""
       telemetry_service._start_time = datetime.now(timezone.utc)
       
       with patch.object(telemetry_service, '_queue_event', new_callable=AsyncMock) as mock_queue:
           await telemetry_service.log_package_shutdown()
           mock_queue.assert_called_once()

   @pytest.mark.asyncio
   async def test_log_package_version(self, telemetry_service):
       """Test logging package version event."""
       with patch('langflow.utils.version.get_version_info') as mock_version_info:
           mock_version_info.return_value = {
               "package": "Langflow",
               "version": "1.0.0"
           }
           with patch('platform.platform', return_value="Linux"):
               with patch('platform.python_version', return_value="3.9.0"):
                   with patch('asyncio.to_thread') as mock_to_thread:
                       mock_to_thread.return_value = ("x86_64", "")
                       
                       with patch.object(telemetry_service, '_queue_event', new_callable=AsyncMock) as mock_queue:
                           await telemetry_service.log_package_version()
                           mock_queue.assert_called_once()

   @pytest.mark.asyncio
   async def test_log_package_playground(self, telemetry_service, sample_playground_payload):
       """Test logging package playground event."""
       with patch.object(telemetry_service, '_queue_event', new_callable=AsyncMock) as mock_queue:
           await telemetry_service.log_package_playground(sample_playground_payload)
           mock_queue.assert_called_once()

   @pytest.mark.asyncio
   async def test_log_package_component(self, telemetry_service, sample_component_payload):
       """Test logging package component event."""
       with patch.object(telemetry_service, '_queue_event', new_callable=AsyncMock) as mock_queue:
           await telemetry_service.log_package_component(sample_component_payload)
           mock_queue.assert_called_once()


lass TestTelemetryServiceQueueManagement:
   """Test telemetry queue management."""

   @pytest.mark.asyncio
   async def test_queue_event_success(self, telemetry_service):
       """Test successful event queuing."""
       payload = ("test_func", "test_payload", "test_path")
       
       await telemetry_service._queue_event(payload)
       
       # Queue should have one item
       assert not telemetry_service.telemetry_queue.empty()

   @pytest.mark.asyncio
   async def test_queue_event_when_do_not_track(self, telemetry_service):
       """Test event queuing when do_not_track is enabled."""
       telemetry_service.do_not_track = True
       payload = ("test_func", "test_payload", "test_path")
       
       await telemetry_service._queue_event(payload)
       
       # Queue should be empty
       assert telemetry_service.telemetry_queue.empty()

   @pytest.mark.asyncio
   async def test_queue_event_when_stopping(self, telemetry_service):
       """Test event queuing when service is stopping."""
       telemetry_service._stopping = True
       payload = ("test_func", "test_payload", "test_path")
       
       await telemetry_service._queue_event(payload)
       
       # Queue should be empty
       assert telemetry_service.telemetry_queue.empty()

   @pytest.mark.asyncio
   async def test_telemetry_worker_processes_events(self, telemetry_service):
       """Test telemetry worker processes queued events."""
       mock_func = AsyncMock()
       payload = "test_payload"
       path = "test_path"
       
       # Put event in queue
       await telemetry_service.telemetry_queue.put((mock_func, payload, path))
       
       # Start worker briefly
       telemetry_service.running = True
       worker_task = asyncio.create_task(telemetry_service.telemetry_worker())
       
       # Give worker a moment to process
       await asyncio.sleep(0.01)
       
       # Stop worker
       telemetry_service.running = False
       worker_task.cancel()
       
       try:
           await worker_task
       except asyncio.CancelledError:
           pass
       
       # Verify function was called
       mock_func.assert_called_once_with(payload, path)

   @pytest.mark.asyncio
   async def test_telemetry_worker_handles_exceptions(self, telemetry_service):
       """Test telemetry worker handles exceptions gracefully."""
       mock_func = AsyncMock(side_effect=Exception("Test error"))
       payload = "test_payload"
       path = "test_path"
       
       # Put event in queue
       await telemetry_service.telemetry_queue.put((mock_func, payload, path))
       
       # Start worker briefly
       telemetry_service.running = True
       worker_task = asyncio.create_task(telemetry_service.telemetry_worker())
       
       # Give worker a moment to process
       await asyncio.sleep(0.01)
       
       # Stop worker
       telemetry_service.running = False
       worker_task.cancel()
       
       try:
           await worker_task
       except asyncio.CancelledError:
           pass
       
       # Worker should continue despite exception
       mock_func.assert_called_once()

   @pytest.mark.asyncio
   async def test_flush_waits_for_queue_completion(self, telemetry_service):
       """Test flush waits for queue completion."""
       # Put some events in queue
       for i in range(3):
           await telemetry_service.telemetry_queue.put((AsyncMock(), f"payload_{i}", f"path_{i}"))
       
       # Mock queue.join to verify it's called
       with patch.object(telemetry_service.telemetry_queue, 'join', new_callable=AsyncMock) as mock_join:
           await telemetry_service.flush()
           mock_join.assert_called_once()

   @pytest.mark.asyncio
   async def test_flush_when_do_not_track(self, telemetry_service):
       """Test flush when do_not_track is enabled."""
       telemetry_service.do_not_track = True
       
       with patch.object(telemetry_service.telemetry_queue, 'join', new_callable=AsyncMock) as mock_join:
           await telemetry_service.flush()
           mock_join.assert_not_called()

   @pytest.mark.asyncio
   async def test_flush_handles_exceptions(self, telemetry_service):
       """Test flush handles exceptions gracefully."""
       with patch.object(telemetry_service.telemetry_queue, 'join', 
                        new_callable=AsyncMock, side_effect=Exception("Queue error")):
           # Should not raise exception
           await telemetry_service.flush()


lass TestTelemetryServiceTaskCancellation:
   """Test task cancellation functionality."""

   @pytest.mark.asyncio
   async def test_cancel_task_success(self, telemetry_service):
       """Test successful task cancellation."""
       async def dummy_task():
           await asyncio.sleep(10)  # Long running task
       
       task = asyncio.create_task(dummy_task())
       
       await telemetry_service._cancel_task(task, "Test cancellation")
       
       assert task.cancelled()

   @pytest.mark.asyncio
   async def test_cancel_task_with_exception(self, telemetry_service):
       """Test task cancellation when task raises exception."""
       async def failing_task():
           raise ValueError("Task failed")
       
       task = asyncio.create_task(failing_task())
       
       # Wait for task to complete with exception
       try:
           await task
       except ValueError:
           pass
       
       # Now cancel (task already done with exception)
       with pytest.raises(ValueError):
           await telemetry_service._cancel_task(task, "Test cancellation")


lass TestTelemetryServiceSchemaValidation:
   """Test telemetry schema validation and serialization."""

   def test_run_payload_serialization(self, sample_run_payload):
       """Test RunPayload serialization."""
       data = sample_run_payload.model_dump(by_alias=True)
       assert "runIsWebhook" in data
       assert "runSeconds" in data
       assert "runSuccess" in data
       assert "runErrorMessage" in data
       assert "clientType" in data

   def test_shutdown_payload_serialization(self, sample_shutdown_payload):
       """Test ShutdownPayload serialization."""
       data = sample_shutdown_payload.model_dump(by_alias=True)
       assert "timeRunning" in data
       assert "clientType" in data

   def test_version_payload_serialization(self, sample_version_payload):
       """Test VersionPayload serialization."""
       data = sample_version_payload.model_dump(by_alias=True)
       assert "autoLogin" in data
       assert "cacheType" in data
       assert "backendOnly" in data
       assert "clientType" in data

   def test_playground_payload_serialization(self, sample_playground_payload):
       """Test PlaygroundPayload serialization."""
       data = sample_playground_payload.model_dump(by_alias=True)
       assert "playgroundSeconds" in data
       assert "playgroundComponentCount" in data
       assert "playgroundSuccess" in data
       assert "playgroundErrorMessage" in data
       assert "clientType" in data

   def test_component_payload_serialization(self, sample_component_payload):
       """Test ComponentPayload serialization."""
       data = sample_component_payload.model_dump(by_alias=True)
       assert "componentName" in data
       assert "componentSeconds" in data
       assert "componentSuccess" in data
       assert "componentErrorMessage" in data
       assert "clientType" in data


lass TestTelemetryServiceEdgeCases:
   """Test edge cases and boundary conditions."""

   @pytest.mark.asyncio
   async def test_multiple_start_calls(self, telemetry_service):
       """Test multiple start calls don't create duplicate tasks."""
       with patch('asyncio.create_task') as mock_create_task:
           mock_task = Mock()
           mock_create_task.return_value = mock_task
           
           telemetry_service.start()
           telemetry_service.start()  # Second call
           
           # Should only create tasks once
           assert mock_create_task.call_count == 2  # worker and version tasks

   @pytest.mark.asyncio
   async def test_stop_without_start(self, telemetry_service):
       """Test stopping service that was never started."""
       await telemetry_service.stop()
       # Should not raise any exceptions

   @pytest.mark.asyncio
   async def test_version_logging_with_cached_architecture(self, telemetry_service):
       """Test version logging when architecture is already cached."""
       telemetry_service.architecture = "cached_arch"
       
       with patch('langflow.utils.version.get_version_info') as mock_version_info:
           mock_version_info.return_value = {"package": "Langflow", "version": "1.0.0"}
           with patch('platform.platform', return_value="Linux"):
               with patch('platform.python_version', return_value="3.9.0"):
                   with patch.object(telemetry_service, '_queue_event', new_callable=AsyncMock) as mock_queue:
                       await telemetry_service.log_package_version()
                       mock_queue.assert_called_once()
                       
                       # Verify cached architecture is used
                       call_args = mock_queue.call_args[0][0]
                       _, payload, _ = call_args
                       assert payload.arch == "cached_arch"

   def test_client_timeout_configuration(self, telemetry_service):
       """Test that HTTP client has proper timeout configuration."""
       assert telemetry_service.client.timeout.read == 10.0

   @pytest.mark.asyncio
   async def test_concurrent_flush_calls(self, telemetry_service):
       """Test concurrent flush calls are handled properly."""
       # Put some events in queue
       for i in range(3):
           await telemetry_service.telemetry_queue.put((AsyncMock(), f"payload_{i}", f"path_{i}"))
       
       # Call flush concurrently
       await asyncio.gather(
           telemetry_service.flush(),
           telemetry_service.flush(),
           telemetry_service.flush()
       )
       
       # Should complete without errors

   @pytest.mark.asyncio
   async def test_large_payload_handling(self, telemetry_service):
       """Test handling of large payloads."""
       large_properties = {"data": "x" * 10000}  # Large data
       
       large_payload = RunPayload(
           run_is_webhook=False,
           run_seconds=120,
           run_success=True,
           run_error_message="x" * 1000,  # Large error message
           client_type="oss"
       )
       
       mock_response = Mock()
       mock_response.status_code = httpx.codes.OK
       
       with patch.object(telemetry_service.client, 'get', new_callable=AsyncMock) as mock_get:
           mock_get.return_value = mock_response
           
           # Should handle large payload without issues
           await telemetry_service.send_telemetry_data(large_payload)
           mock_get.assert_called_once()

   def test_environment_variable_edge_cases(self, telemetry_service):
       """Test edge cases for environment variable handling."""
       # Test with None value (variable not set)
       with patch.dict(os.environ, {}, clear=True):
           result = telemetry_service._get_langflow_desktop()
           assert result is False
       
       # Test with empty string
       with patch.dict(os.environ, {"LANGFLOW_DESKTOP": ""}):
           result = telemetry_service._get_langflow_desktop()
           assert result is False

- Introduced unit tests for `RunPayload`, `ShutdownPayload`, `VersionPayload`, `PlaygroundPayload`, and `ComponentPayload` to ensure proper initialization, serialization, and edge case handling.
- Implemented tests for various scenarios including default values, negative inputs, and serialization with special characters.
- Added performance tests to evaluate payload creation and serialization efficiency.
- Included fixtures for sample payloads to streamline testing processes.
@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. and removed size:S This PR changes 10-29 lines, ignoring generated files. labels Jul 1, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Jul 1, 2025
@ogabrielluiz ogabrielluiz added the DO NOT MERGE Don't Merge this PR label Jul 2, 2025
@ogabrielluiz ogabrielluiz added community Pull Request from an external contributor and removed community Pull Request from an external contributor labels Jul 14, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 18, 2025
@codecov
Copy link

codecov bot commented Aug 18, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 19.24%. Comparing base (2704276) to head (be00285).
⚠️ Report is 2 commits behind head on main.

❌ Your project status has failed because the head coverage (2.67%) is below the target coverage (10.00%). You can increase the head coverage or adjust the target coverage.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #8812      +/-   ##
==========================================
- Coverage   23.45%   19.24%   -4.21%     
==========================================
  Files        1090     1052      -38     
  Lines       39849    39070     -779     
  Branches     5531     5363     -168     
==========================================
- Hits         9347     7520    -1827     
- Misses      30331    31490    +1159     
+ Partials      171       60     -111     
Flag Coverage Δ
backend 45.67% <100.00%> (-0.43%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...backend/base/langflow/services/telemetry/schema.py 100.00% <100.00%> (ø)
...ackend/base/langflow/services/telemetry/service.py 85.38% <100.00%> (+0.58%) ⬆️

... and 233 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ogabrielluiz ogabrielluiz enabled auto-merge August 18, 2025 21:30
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 18, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 18, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 18, 2025
@sonarqubecloud
Copy link

@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 18, 2025
@ogabrielluiz ogabrielluiz removed the DO NOT MERGE Don't Merge this PR label Aug 18, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Aug 18, 2025
@github-actions github-actions bot added enhancement New feature or request and removed enhancement New feature or request labels Sep 25, 2025
@sonarqubecloud
Copy link

@ogabrielluiz ogabrielluiz added this pull request to the merge queue Sep 25, 2025
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Sep 25, 2025
@ogabrielluiz ogabrielluiz added this pull request to the merge queue Sep 25, 2025
Merged via the queue into main with commit c0ac23a Sep 25, 2025
49 of 51 checks passed
@ogabrielluiz ogabrielluiz deleted the add-clienttype-telemetry branch September 25, 2025 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request lgtm This PR has been approved by a maintainer size:XL This PR changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants