Module agents.metrics

Sub-modules

agents.metrics.analytics
agents.metrics.cascading_metrics_collector
agents.metrics.integration
agents.metrics.logs
agents.metrics.models
agents.metrics.realtime_metrics_collector
agents.metrics.telemetry
agents.metrics.traces_flow

Functions

def auto_initialize_telemetry_and_logs(room_id: str,
peer_id: str,
room_attributes: Dict[str, Any] = None,
session_id: str = None,
sdk_metadata: Dict[str, Any] = None)
Expand source code
def auto_initialize_telemetry_and_logs(room_id: str, peer_id: str, 
                                      room_attributes: Dict[str, Any] = None, session_id: str = None, sdk_metadata: Dict[str, Any] = None):
    """
    Auto-initialize telemetry and logs from room attributes
    """
    if not room_attributes:
        return
    
    observability_jwt = room_attributes.get('observability', '')
    
    traces_config = room_attributes.get('traces', {})
    if traces_config.get('enabled'):
        metadata = {
        }
        
        initialize_telemetry(
            room_id=room_id,
            peer_id=peer_id,
            sdk_name="agents",
            observability_jwt=observability_jwt,
            traces_config=traces_config,
            metadata=metadata,
            sdk_metadata=sdk_metadata
        )
        
    
    logs_config = room_attributes.get('logs', {})
    if logs_config.get('enabled'):
        initialize_logs(
            meeting_id=room_id,
            peer_id=peer_id,
            jwt_key=observability_jwt, 
            log_config=logs_config,
            session_id=session_id,
            sdk_metadata=sdk_metadata,
        )

Auto-initialize telemetry and logs from room attributes

def complete_span(span: opentelemetry.trace.span.Span | None,
status_code,
message: str = '',
end_time: float | None = None)
Expand source code
def complete_span(span: Optional[Span], status_code, message: str = "", end_time: Optional[float] = None):
    """
    Complete a trace span (convenience method)
    
    Args:
        span: Span to complete
        status_code: Status code
        message: Status message
        end_time: End time in seconds since epoch (optional)
    """
    telemetry = get_telemetry()
    if telemetry and span:
        telemetry.complete_span(span, status_code, message, end_time)

Complete a trace span (convenience method)

Args

span
Span to complete
status_code
Status code
message
Status message
end_time
End time in seconds since epoch (optional)
def create_log(message: str, log_level: str = 'INFO', attributes: Dict[str, Any] = None)
Expand source code
def create_log(message: str, log_level: str = "INFO", attributes: Dict[str, Any] = None):
    """
    Create a log entry (convenience method)
    
    Args:
        message: Log message
        log_level: Log level
        attributes: Additional attributes
    """
    logs = get_logs()
    if logs:
        logs.create_log(message, log_level, attributes)

Create a log entry (convenience method)

Args

message
Log message
log_level
Log level
attributes
Additional attributes
def create_span(span_name: str,
attributes: Dict[str, Any] = None,
parent_span: opentelemetry.trace.span.Span | None = None,
start_time: float | None = None)
Expand source code
def create_span(span_name: str, attributes: Dict[str, Any] = None, parent_span: Optional[Span] = None, start_time: Optional[float] = None):
    """
    Create a trace span (convenience method)
    
    Args:
        span_name: Name of the span
        attributes: Span attributes
        parent_span: Parent span (optional)
        start_time: Start time in seconds since epoch (optional)
        
    Returns:
        Span object or None
    """
    telemetry = get_telemetry()
    if telemetry:
        return telemetry.trace(span_name, attributes, parent_span, start_time)
    return None

Create a trace span (convenience method)

Args

span_name
Name of the span
attributes
Span attributes
parent_span
Parent span (optional)
start_time
Start time in seconds since epoch (optional)

Returns

Span object or None

Classes

class CascadingMetricsCollector
Expand source code
class CascadingMetricsCollector:
    """Collects and tracks performance metrics for AI agent turns"""
    
    def __init__(self):
        self.data = CascadingMetricsData()
        self.analytics_client = AnalyticsClient()
        self.traces_flow_manager: Optional[TracesFlowManager] = None
        self.active_spans: Dict[str, Span] = {}
        self.pending_user_start_time: Optional[float] = None
        
    def set_traces_flow_manager(self, manager: TracesFlowManager):
        """Set the TracesFlowManager instance"""
        self.traces_flow_manager = manager

    def _generate_interaction_id(self) -> str:
        """Generate a hash-based turn ID"""
        timestamp = str(time.time())
        session_id = self.data.session_id or "default"
        interaction_count = str(self.data.total_turns)
        
        hash_input = f"{timestamp}_{session_id}_{interaction_count}"
        return hashlib.md5(hash_input.encode()).hexdigest()[:16]
    
    def _round_latency(self, latency: float) -> float:
        """Convert latency from seconds to milliseconds and round to 4 decimal places"""
        return round(latency * 1000, 4)
    
    def _transform_to_camel_case(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform snake_case field names to camelCase for analytics"""
        field_mapping = {
            'user_speech_start_time': 'userSpeechStartTime',
            'user_speech_end_time': 'userSpeechEndTime',
            'stt_latency': 'sttLatency',
            'llm_latency': 'llmLatency',
            'tts_latency': 'ttsLatency',
            'eou_latency': 'eouLatency',
            'e2e_latency': 'e2eLatency',
            'function_tools_called': 'functionToolsCalled',
            'system_instructions': 'systemInstructions',
            'errors': 'errors',
            'function_tool_timestamps': 'functionToolTimestamps',
            'stt_start_time': 'sttStartTime',
            'stt_end_time': 'sttEndTime',
            'tts_start_time': 'ttsStartTime',
            'tts_end_time': 'ttsEndTime',
            'llm_start_time': 'llmStartTime',
            'llm_end_time': 'llmEndTime',
            'eou_start_time': 'eouStartTime',
            'eou_end_time': 'eouEndTime',
            'llm_provider_class': 'llmProviderClass',
            'llm_model_name': 'llmModelName',
            'stt_provider_class': 'sttProviderClass',
            'stt_model_name': 'sttModelName',
            'tts_provider_class': 'ttsProviderClass',
            'tts_model_name': 'ttsModelName',
            'vad_provider_class': 'vadProviderClass',
            'vad_model_name': 'vadModelName',
            'eou_provider_class': 'eouProviderClass',
            'eou_model_name': 'eouModelName',
            'handoff_occurred': 'handOffOccurred'
        }
        
        timeline_field_mapping = {
            'event_type': 'eventType',
            'start_time': 'startTime',
            'end_time': 'endTime',
            'duration_ms': 'durationInMs'
        }
        
        transformed_data = {}
        for key, value in interaction_data.items():
            camel_key = field_mapping.get(key, key)
            
            if key == 'timeline' and isinstance(value, list):
                transformed_timeline = []
                for event in value:
                    transformed_event = {}
                    for event_key, event_value in event.items():
                        camel_event_key = timeline_field_mapping.get(event_key, event_key)
                        transformed_event[camel_event_key] = event_value
                    transformed_timeline.append(transformed_event)
                transformed_data[camel_key] = transformed_timeline
            else:
                transformed_data[camel_key] = value
        
        return transformed_data

    def _remove_negatives(self, obj: Any) -> Any:
        """Recursively clamp any numeric value < 0 to 0 in dicts/lists."""
        if isinstance(obj, dict):
            for k, v in obj.items():
                if isinstance(v, (int, float)):
                    if v < 0:
                        obj[k] = 0
                elif isinstance(v, (dict, list)):
                    obj[k] = self._remove_negatives(v)
            return obj
        if isinstance(obj, list):
            for i, v in enumerate(obj):
                if isinstance(v, (int, float)):
                    if v < 0:
                        obj[i] = 0
                elif isinstance(v, (dict, list)):
                    obj[i] = self._remove_negatives(v)
            return obj
        return obj
    
    def _start_timeline_event(self, event_type: str, start_time: float) -> None:
        """Start a timeline event"""
        if self.data.current_turn:
            event = TimelineEvent(
                event_type=event_type,
                start_time=start_time
            )
            self.data.current_turn.timeline.append(event)
    
    def _end_timeline_event(self, event_type: str, end_time: float) -> None:
        """End a timeline event and calculate duration"""
        if self.data.current_turn:
            for event in reversed(self.data.current_turn.timeline):
                if event.event_type == event_type and event.end_time is None:
                    event.end_time = end_time
                    event.duration_ms = self._round_latency(end_time - event.start_time)
                    break
    
    def _update_timeline_event_text(self, event_type: str, text: str) -> None:
        """Update timeline event with text content"""
        if self.data.current_turn:
            for event in reversed(self.data.current_turn.timeline):
                if event.event_type == event_type and not event.text:
                    event.text = text
                    break
    
    def _calculate_e2e_metrics(self, turn: CascadingTurnData) -> None:
        """Calculate E2E and E2ET latencies based on individual component latencies"""
        e2e_components = []
        if turn.stt_latency:
            e2e_components.append(turn.stt_latency)
        if turn.eou_latency:
            e2e_components.append(turn.eou_latency)
        if turn.llm_latency:
            e2e_components.append(turn.llm_latency)
        if turn.tts_latency: 
            e2e_components.append(turn.tts_latency)
        
        if e2e_components:
            turn.e2e_latency = round(sum(e2e_components), 4)
        
    def _validate_interaction_has_required_latencies(self, turn: CascadingTurnData) -> bool:
        """
        Validate that the turn has at least one of the required latency metrics.
        Returns True if at least one latency is present, False if ALL are absent/None.
        """
        stt_present = turn.stt_latency is not None
        tts_present = turn.tts_latency is not None  
        llm_present = turn.llm_latency is not None
        eou_present = turn.eou_latency is not None
        
        if not any([stt_present, tts_present, llm_present, eou_present]):
            return False
        
        present_latencies = []
        if stt_present:
            present_latencies.append("STT")
        if tts_present:
            present_latencies.append("TTS") 
        if llm_present:
            present_latencies.append("LLM")
        if eou_present:
            present_latencies.append("EOU")
        
        return True

    def set_session_id(self, session_id: str):
        """Set the session ID for metrics tracking"""
        self.data.session_id = session_id
        self.analytics_client.set_session_id(session_id)
    
    def set_system_instructions(self, instructions: str):
        """Set the system instructions for this session"""
        self.data.system_instructions = instructions
    
    def set_provider_info(self, llm_provider: str = "", llm_model: str = "", 
                         stt_provider: str = "", stt_model: str = "",
                         tts_provider: str = "", tts_model: str = "",
                         vad_provider: str = "", vad_model: str = "",
                         eou_provider: str = "", eou_model: str = ""):
        """Set the provider class and model information for this session"""
        self.data.llm_provider_class = llm_provider
        self.data.llm_model_name = llm_model
        self.data.stt_provider_class = stt_provider
        self.data.stt_model_name = stt_model
        self.data.tts_provider_class = tts_provider
        self.data.tts_model_name = tts_model
        self.data.vad_provider_class = vad_provider
        self.data.vad_model_name = vad_model
        self.data.eou_provider_class = eou_provider
        self.data.eou_model_name = eou_model
    
    def start_new_interaction(self, user_transcript: str = "") -> None:
        """Start tracking a new user-agent turn"""
        if self.data.current_turn:
            self.complete_current_turn()
        
        self.data.total_turns += 1
        
        self.data.current_turn = CascadingTurnData(
            system_instructions=self.data.system_instructions if self.data.total_turns == 1 else "",
            # Provider and model info should be included in every turn
            llm_provider_class=self.data.llm_provider_class,
            llm_model_name=self.data.llm_model_name,
            stt_provider_class=self.data.stt_provider_class,
            stt_model_name=self.data.stt_model_name,
            tts_provider_class=self.data.tts_provider_class,
            tts_model_name=self.data.tts_model_name,
            vad_provider_class=self.data.vad_provider_class,
            vad_model_name=self.data.vad_model_name,
            eou_provider_class=self.data.eou_provider_class,
            eou_model_name=self.data.eou_model_name
        )
        
        if self.pending_user_start_time is not None:
            self.data.current_turn.user_speech_start_time = self.pending_user_start_time
            self._start_timeline_event("user_speech", self.pending_user_start_time)

        if self.data.is_user_speaking and self.data.user_input_start_time:
            if self.data.current_turn.user_speech_start_time is None:
                self.data.current_turn.user_speech_start_time = self.data.user_input_start_time
                if not any(ev.event_type == "user_speech" for ev in self.data.current_turn.timeline):
                    self._start_timeline_event("user_speech", self.data.user_input_start_time)

        if user_transcript:
            self.set_user_transcript(user_transcript)
    
    def complete_current_turn(self) -> None:
        """Complete and store the current turn"""
        if self.data.current_turn:
            self._calculate_e2e_metrics(self.data.current_turn)

            if not self._validate_interaction_has_required_latencies(self.data.current_turn):
                if self.data.current_turn.user_speech_start_time is not None:
                    if (self.pending_user_start_time is None or
                        self.data.current_turn.user_speech_start_time < self.pending_user_start_time):
                        self.pending_user_start_time = self.data.current_turn.user_speech_start_time
                        logger.info(f"[metrics] Caching earliest user start: {self.pending_user_start_time}")
                self.data.current_turn = None
                return

            if self.traces_flow_manager:
                self.traces_flow_manager.create_cascading_turn_trace(self.data.current_turn)

            self.data.turns.append(self.data.current_turn)
            interaction_data = asdict(self.data.current_turn)
            interaction_data['timeline'] = [asdict(event) for event in self.data.current_turn.timeline]
            transformed_data = self._transform_to_camel_case(interaction_data)
            # transformed_data = self._intify_latencies_and_timestamps(transformed_data)

            always_remove_fields = [
                'errors',
                'functionToolTimestamps',
                'sttStartTime', 'sttEndTime',
                'ttsStartTime', 'ttsEndTime',
                'llmStartTime', 'llmEndTime',
                'eouStartTime', 'eouEndTime',
                'is_a2a_enabled',
                "interactionId",
                "timestamp"
            ]

            if not self.data.current_turn.is_a2a_enabled: 
                always_remove_fields.append("handOffOccurred")

            for field in always_remove_fields:
                if field in transformed_data:
                    del transformed_data[field]

            if len(self.data.turns) > 1: 
                provider_fields = [
                    'systemInstructions',
                    'llmProviderClass', 'llmModelName',
                    'sttProviderClass', 'sttModelName',
                    'ttsProviderClass', 'ttsModelName'
                ]
                for field in provider_fields:
                    if field in transformed_data:
                        del transformed_data[field]

            transformed_data = self._remove_negatives(transformed_data)

            interaction_payload = {
                "data": [transformed_data]               
            }
            
            self.analytics_client.send_interaction_analytics_safe(interaction_payload) 
            self.data.current_turn = None
            self.pending_user_start_time = None
    
    def on_interrupted(self):
        """Called when the user interrupts the agent"""
        if self.data.is_agent_speaking:
            self.data.total_interruptions += 1
            if self.data.current_turn:
                self.data.current_turn.interrupted = True
                logger.info(f"User interrupted the agent. Total interruptions: {self.data.total_interruptions}")
    
    def on_user_speech_start(self):
        """Called when user starts speaking"""
        if self.data.is_user_speaking:
            return

        if not self.data.current_turn:
            self.start_new_interaction()

        self.data.is_user_speaking = True
        self.data.user_input_start_time = time.perf_counter()

        if self.data.current_turn:
            if self.data.current_turn.user_speech_start_time is None:
                self.data.current_turn.user_speech_start_time = self.data.user_input_start_time

            if not any(event.event_type == "user_speech" for event in self.data.current_turn.timeline):
                self._start_timeline_event("user_speech", self.data.user_input_start_time)
    
    def on_user_speech_end(self):
        """Called when user stops speaking"""
        self.data.is_user_speaking = False
        self.data.user_speech_end_time = time.perf_counter()
        
        if self.data.current_turn:
            self.data.current_turn.user_speech_end_time = self.data.user_speech_end_time
            self._end_timeline_event("user_speech", self.data.user_speech_end_time)
    
    def on_agent_speech_start(self):
        """Called when agent starts speaking (actual audio output)"""
        self.data.is_agent_speaking = True
        self.data.agent_speech_start_time = time.perf_counter()
        
        if self.data.current_turn:
            if not any(event.event_type == "agent_speech" and event.end_time is None for event in self.data.current_turn.timeline):
                self._start_timeline_event("agent_speech", self.data.agent_speech_start_time)
    
    def on_agent_speech_end(self):
        """Called when agent stops speaking"""
        self.data.is_agent_speaking = False
        agent_speech_end_time = time.perf_counter()
        
        if self.data.current_turn:
            self._end_timeline_event("agent_speech", agent_speech_end_time)
                
        if self.data.tts_start_time and self.data.tts_first_byte_time:
            total_tts_latency = self.data.tts_first_byte_time - self.data.tts_start_time
            if self.data.current_turn:
                self.data.current_turn.tts_end_time = agent_speech_end_time
                self.data.current_turn.tts_latency = self._round_latency(total_tts_latency)
            self.data.tts_start_time = None
            self.data.tts_first_byte_time = None
        elif self.data.tts_start_time:
            # If we have start time but no first byte time, just reset
            self.data.tts_start_time = None
            self.data.tts_first_byte_time = None
    
    def on_stt_start(self):
        """Called when STT processing starts"""
        self.data.stt_start_time = time.perf_counter()
        if self.data.current_turn:
            self.data.current_turn.stt_start_time = self.data.stt_start_time
    
    def on_stt_complete(self):
        """Called when STT processing completes"""
        
        if self.data.stt_start_time:
            stt_end_time = time.perf_counter()
            stt_latency = stt_end_time - self.data.stt_start_time
            if self.data.current_turn:
                self.data.current_turn.stt_end_time = stt_end_time
                self.data.current_turn.stt_latency = self._round_latency(stt_latency)
                logger.info(f"stt latency: {self.data.current_turn.stt_latency}ms")
            self.data.stt_start_time = None
    
    def on_llm_start(self):
        """Called when LLM processing starts"""
        self.data.llm_start_time = time.perf_counter()
        
        if self.data.current_turn:
            self.data.current_turn.llm_start_time = self.data.llm_start_time
    
    def on_llm_complete(self):
        """Called when LLM processing completes"""
        if self.data.llm_start_time:
            llm_end_time = time.perf_counter()
            llm_latency = llm_end_time - self.data.llm_start_time
            if self.data.current_turn:
                self.data.current_turn.llm_end_time = llm_end_time
                self.data.current_turn.llm_latency = self._round_latency(llm_latency)
                logger.info(f"llm latency: {self.data.current_turn.llm_latency}ms")
            self.data.llm_start_time = None
    
    def on_tts_start(self):
        """Called when TTS processing starts"""
        self.data.tts_start_time = time.perf_counter()
        self.data.tts_first_byte_time = None
        if self.data.current_turn:
            self.data.current_turn.tts_start_time = self.data.tts_start_time
    
    def on_tts_first_byte(self):
        """Called when TTS produces first audio byte - this is our TTS latency"""
        if self.data.tts_start_time:
            now = time.perf_counter()
            # ttfb = now - self.data.tts_start_time // no need to take the difference as we are using the start time of the tts span
            if self.data.current_turn:
                self.data.current_turn.ttfb = now
                logger.info(f"tts ttfb: {(self.data.current_turn.ttfb - self.data.tts_start_time) * 1000}ms")
            self.data.tts_first_byte_time = now
    
    def on_eou_start(self):
        """Called when EOU (End of Utterance) processing starts"""
        self.data.eou_start_time = time.perf_counter()
        if self.data.current_turn:
            self.data.current_turn.eou_start_time = self.data.eou_start_time
            
    
    def on_eou_complete(self):
        """Called when EOU processing completes"""
        if self.data.eou_start_time:
            eou_end_time = time.perf_counter()
            eou_latency = eou_end_time - self.data.eou_start_time
            if self.data.current_turn:
                self.data.current_turn.eou_end_time = eou_end_time
                self.data.current_turn.eou_latency = self._round_latency(eou_latency)
                # self._end_timeline_event("eou_processing", eou_end_time)
                logger.info(f"eou latency: {self.data.current_turn.eou_latency}ms")
            self.data.eou_start_time = None
    
    def set_user_transcript(self, transcript: str):
        """Set the user transcript for the current turn and update timeline"""
        if self.data.current_turn:
            logger.info(f"user input speech: {transcript}")
            user_speech_events = [event for event in self.data.current_turn.timeline 
                                if event.event_type == "user_speech"]
            
            if user_speech_events:
                most_recent_event = user_speech_events[-1]
                most_recent_event.text = transcript
            else:
                current_time = time.perf_counter()
                self._start_timeline_event("user_speech", current_time)
                if self.data.current_turn.timeline:
                    self.data.current_turn.timeline[-1].text = transcript
    
    def set_agent_response(self, response: str):
        """Set the agent response for the current turn and update timeline"""
        if self.data.current_turn:
            logger.info(f"agent output speech: {response}")
            if not any(event.event_type == "agent_speech" for event in self.data.current_turn.timeline):
                current_time = time.perf_counter()
                self._start_timeline_event("agent_speech", current_time)
            
            self._update_timeline_event_text("agent_speech", response)
    
    def add_function_tool_call(self, tool_name: str):
        """Track when a function tool is called in the current turn"""
        if self.data.current_turn:
            self.data.current_turn.function_tools_called.append(tool_name)
            tool_timestamp = {
                "tool_name": tool_name,
                "timestamp": time.perf_counter(),
                "readable_time": time.strftime("%H:%M:%S", time.localtime())
            }
            self.data.current_turn.function_tool_timestamps.append(tool_timestamp)

    def add_error(self, source: str, message: str):
        """Add an error to the current turn"""
        if self.data.current_turn:
            self.data.current_turn.errors.append({
                "source": source,
                "message": message,
                "timestamp": time.time()
            })

    def set_a2a_handoff(self):
        """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios."""
        if self.data.current_turn:
            self.data.current_turn.is_a2a_enabled = True
            self.data.current_turn.handoff_occurred = True

Collects and tracks performance metrics for AI agent turns

Methods

def add_error(self, source: str, message: str)
Expand source code
def add_error(self, source: str, message: str):
    """Add an error to the current turn"""
    if self.data.current_turn:
        self.data.current_turn.errors.append({
            "source": source,
            "message": message,
            "timestamp": time.time()
        })

Add an error to the current turn

def add_function_tool_call(self, tool_name: str)
Expand source code
def add_function_tool_call(self, tool_name: str):
    """Track when a function tool is called in the current turn"""
    if self.data.current_turn:
        self.data.current_turn.function_tools_called.append(tool_name)
        tool_timestamp = {
            "tool_name": tool_name,
            "timestamp": time.perf_counter(),
            "readable_time": time.strftime("%H:%M:%S", time.localtime())
        }
        self.data.current_turn.function_tool_timestamps.append(tool_timestamp)

Track when a function tool is called in the current turn

def complete_current_turn(self) ‑> None
Expand source code
def complete_current_turn(self) -> None:
    """Complete and store the current turn"""
    if self.data.current_turn:
        self._calculate_e2e_metrics(self.data.current_turn)

        if not self._validate_interaction_has_required_latencies(self.data.current_turn):
            if self.data.current_turn.user_speech_start_time is not None:
                if (self.pending_user_start_time is None or
                    self.data.current_turn.user_speech_start_time < self.pending_user_start_time):
                    self.pending_user_start_time = self.data.current_turn.user_speech_start_time
                    logger.info(f"[metrics] Caching earliest user start: {self.pending_user_start_time}")
            self.data.current_turn = None
            return

        if self.traces_flow_manager:
            self.traces_flow_manager.create_cascading_turn_trace(self.data.current_turn)

        self.data.turns.append(self.data.current_turn)
        interaction_data = asdict(self.data.current_turn)
        interaction_data['timeline'] = [asdict(event) for event in self.data.current_turn.timeline]
        transformed_data = self._transform_to_camel_case(interaction_data)
        # transformed_data = self._intify_latencies_and_timestamps(transformed_data)

        always_remove_fields = [
            'errors',
            'functionToolTimestamps',
            'sttStartTime', 'sttEndTime',
            'ttsStartTime', 'ttsEndTime',
            'llmStartTime', 'llmEndTime',
            'eouStartTime', 'eouEndTime',
            'is_a2a_enabled',
            "interactionId",
            "timestamp"
        ]

        if not self.data.current_turn.is_a2a_enabled: 
            always_remove_fields.append("handOffOccurred")

        for field in always_remove_fields:
            if field in transformed_data:
                del transformed_data[field]

        if len(self.data.turns) > 1: 
            provider_fields = [
                'systemInstructions',
                'llmProviderClass', 'llmModelName',
                'sttProviderClass', 'sttModelName',
                'ttsProviderClass', 'ttsModelName'
            ]
            for field in provider_fields:
                if field in transformed_data:
                    del transformed_data[field]

        transformed_data = self._remove_negatives(transformed_data)

        interaction_payload = {
            "data": [transformed_data]               
        }
        
        self.analytics_client.send_interaction_analytics_safe(interaction_payload) 
        self.data.current_turn = None
        self.pending_user_start_time = None

Complete and store the current turn

def on_agent_speech_end(self)
Expand source code
def on_agent_speech_end(self):
    """Called when agent stops speaking"""
    self.data.is_agent_speaking = False
    agent_speech_end_time = time.perf_counter()
    
    if self.data.current_turn:
        self._end_timeline_event("agent_speech", agent_speech_end_time)
            
    if self.data.tts_start_time and self.data.tts_first_byte_time:
        total_tts_latency = self.data.tts_first_byte_time - self.data.tts_start_time
        if self.data.current_turn:
            self.data.current_turn.tts_end_time = agent_speech_end_time
            self.data.current_turn.tts_latency = self._round_latency(total_tts_latency)
        self.data.tts_start_time = None
        self.data.tts_first_byte_time = None
    elif self.data.tts_start_time:
        # If we have start time but no first byte time, just reset
        self.data.tts_start_time = None
        self.data.tts_first_byte_time = None

Called when agent stops speaking

def on_agent_speech_start(self)
Expand source code
def on_agent_speech_start(self):
    """Called when agent starts speaking (actual audio output)"""
    self.data.is_agent_speaking = True
    self.data.agent_speech_start_time = time.perf_counter()
    
    if self.data.current_turn:
        if not any(event.event_type == "agent_speech" and event.end_time is None for event in self.data.current_turn.timeline):
            self._start_timeline_event("agent_speech", self.data.agent_speech_start_time)

Called when agent starts speaking (actual audio output)

def on_eou_complete(self)
Expand source code
def on_eou_complete(self):
    """Called when EOU processing completes"""
    if self.data.eou_start_time:
        eou_end_time = time.perf_counter()
        eou_latency = eou_end_time - self.data.eou_start_time
        if self.data.current_turn:
            self.data.current_turn.eou_end_time = eou_end_time
            self.data.current_turn.eou_latency = self._round_latency(eou_latency)
            # self._end_timeline_event("eou_processing", eou_end_time)
            logger.info(f"eou latency: {self.data.current_turn.eou_latency}ms")
        self.data.eou_start_time = None

Called when EOU processing completes

def on_eou_start(self)
Expand source code
def on_eou_start(self):
    """Called when EOU (End of Utterance) processing starts"""
    self.data.eou_start_time = time.perf_counter()
    if self.data.current_turn:
        self.data.current_turn.eou_start_time = self.data.eou_start_time

Called when EOU (End of Utterance) processing starts

def on_interrupted(self)
Expand source code
def on_interrupted(self):
    """Called when the user interrupts the agent"""
    if self.data.is_agent_speaking:
        self.data.total_interruptions += 1
        if self.data.current_turn:
            self.data.current_turn.interrupted = True
            logger.info(f"User interrupted the agent. Total interruptions: {self.data.total_interruptions}")

Called when the user interrupts the agent

def on_llm_complete(self)
Expand source code
def on_llm_complete(self):
    """Called when LLM processing completes"""
    if self.data.llm_start_time:
        llm_end_time = time.perf_counter()
        llm_latency = llm_end_time - self.data.llm_start_time
        if self.data.current_turn:
            self.data.current_turn.llm_end_time = llm_end_time
            self.data.current_turn.llm_latency = self._round_latency(llm_latency)
            logger.info(f"llm latency: {self.data.current_turn.llm_latency}ms")
        self.data.llm_start_time = None

Called when LLM processing completes

def on_llm_start(self)
Expand source code
def on_llm_start(self):
    """Called when LLM processing starts"""
    self.data.llm_start_time = time.perf_counter()
    
    if self.data.current_turn:
        self.data.current_turn.llm_start_time = self.data.llm_start_time

Called when LLM processing starts

def on_stt_complete(self)
Expand source code
def on_stt_complete(self):
    """Called when STT processing completes"""
    
    if self.data.stt_start_time:
        stt_end_time = time.perf_counter()
        stt_latency = stt_end_time - self.data.stt_start_time
        if self.data.current_turn:
            self.data.current_turn.stt_end_time = stt_end_time
            self.data.current_turn.stt_latency = self._round_latency(stt_latency)
            logger.info(f"stt latency: {self.data.current_turn.stt_latency}ms")
        self.data.stt_start_time = None

Called when STT processing completes

def on_stt_start(self)
Expand source code
def on_stt_start(self):
    """Called when STT processing starts"""
    self.data.stt_start_time = time.perf_counter()
    if self.data.current_turn:
        self.data.current_turn.stt_start_time = self.data.stt_start_time

Called when STT processing starts

def on_tts_first_byte(self)
Expand source code
def on_tts_first_byte(self):
    """Called when TTS produces first audio byte - this is our TTS latency"""
    if self.data.tts_start_time:
        now = time.perf_counter()
        # ttfb = now - self.data.tts_start_time // no need to take the difference as we are using the start time of the tts span
        if self.data.current_turn:
            self.data.current_turn.ttfb = now
            logger.info(f"tts ttfb: {(self.data.current_turn.ttfb - self.data.tts_start_time) * 1000}ms")
        self.data.tts_first_byte_time = now

Called when TTS produces first audio byte - this is our TTS latency

def on_tts_start(self)
Expand source code
def on_tts_start(self):
    """Called when TTS processing starts"""
    self.data.tts_start_time = time.perf_counter()
    self.data.tts_first_byte_time = None
    if self.data.current_turn:
        self.data.current_turn.tts_start_time = self.data.tts_start_time

Called when TTS processing starts

def on_user_speech_end(self)
Expand source code
def on_user_speech_end(self):
    """Called when user stops speaking"""
    self.data.is_user_speaking = False
    self.data.user_speech_end_time = time.perf_counter()
    
    if self.data.current_turn:
        self.data.current_turn.user_speech_end_time = self.data.user_speech_end_time
        self._end_timeline_event("user_speech", self.data.user_speech_end_time)

Called when user stops speaking

def on_user_speech_start(self)
Expand source code
def on_user_speech_start(self):
    """Called when user starts speaking"""
    if self.data.is_user_speaking:
        return

    if not self.data.current_turn:
        self.start_new_interaction()

    self.data.is_user_speaking = True
    self.data.user_input_start_time = time.perf_counter()

    if self.data.current_turn:
        if self.data.current_turn.user_speech_start_time is None:
            self.data.current_turn.user_speech_start_time = self.data.user_input_start_time

        if not any(event.event_type == "user_speech" for event in self.data.current_turn.timeline):
            self._start_timeline_event("user_speech", self.data.user_input_start_time)

Called when user starts speaking

def set_a2a_handoff(self)
Expand source code
def set_a2a_handoff(self):
    """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios."""
    if self.data.current_turn:
        self.data.current_turn.is_a2a_enabled = True
        self.data.current_turn.handoff_occurred = True

Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.

def set_agent_response(self, response: str)
Expand source code
def set_agent_response(self, response: str):
    """Set the agent response for the current turn and update timeline"""
    if self.data.current_turn:
        logger.info(f"agent output speech: {response}")
        if not any(event.event_type == "agent_speech" for event in self.data.current_turn.timeline):
            current_time = time.perf_counter()
            self._start_timeline_event("agent_speech", current_time)
        
        self._update_timeline_event_text("agent_speech", response)

Set the agent response for the current turn and update timeline

def set_provider_info(self,
llm_provider: str = '',
llm_model: str = '',
stt_provider: str = '',
stt_model: str = '',
tts_provider: str = '',
tts_model: str = '',
vad_provider: str = '',
vad_model: str = '',
eou_provider: str = '',
eou_model: str = '')
Expand source code
def set_provider_info(self, llm_provider: str = "", llm_model: str = "", 
                     stt_provider: str = "", stt_model: str = "",
                     tts_provider: str = "", tts_model: str = "",
                     vad_provider: str = "", vad_model: str = "",
                     eou_provider: str = "", eou_model: str = ""):
    """Set the provider class and model information for this session"""
    self.data.llm_provider_class = llm_provider
    self.data.llm_model_name = llm_model
    self.data.stt_provider_class = stt_provider
    self.data.stt_model_name = stt_model
    self.data.tts_provider_class = tts_provider
    self.data.tts_model_name = tts_model
    self.data.vad_provider_class = vad_provider
    self.data.vad_model_name = vad_model
    self.data.eou_provider_class = eou_provider
    self.data.eou_model_name = eou_model

Set the provider class and model information for this session

def set_session_id(self, session_id: str)
Expand source code
def set_session_id(self, session_id: str):
    """Set the session ID for metrics tracking"""
    self.data.session_id = session_id
    self.analytics_client.set_session_id(session_id)

Set the session ID for metrics tracking

def set_system_instructions(self, instructions: str)
Expand source code
def set_system_instructions(self, instructions: str):
    """Set the system instructions for this session"""
    self.data.system_instructions = instructions

Set the system instructions for this session

def set_traces_flow_manager(self,
manager: TracesFlowManager)
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager):
    """Set the TracesFlowManager instance"""
    self.traces_flow_manager = manager

Set the TracesFlowManager instance

def set_user_transcript(self, transcript: str)
Expand source code
def set_user_transcript(self, transcript: str):
    """Set the user transcript for the current turn and update timeline"""
    if self.data.current_turn:
        logger.info(f"user input speech: {transcript}")
        user_speech_events = [event for event in self.data.current_turn.timeline 
                            if event.event_type == "user_speech"]
        
        if user_speech_events:
            most_recent_event = user_speech_events[-1]
            most_recent_event.text = transcript
        else:
            current_time = time.perf_counter()
            self._start_timeline_event("user_speech", current_time)
            if self.data.current_turn.timeline:
                self.data.current_turn.timeline[-1].text = transcript

Set the user transcript for the current turn and update timeline

def start_new_interaction(self, user_transcript: str = '') ‑> None
Expand source code
def start_new_interaction(self, user_transcript: str = "") -> None:
    """Start tracking a new user-agent turn"""
    if self.data.current_turn:
        self.complete_current_turn()
    
    self.data.total_turns += 1
    
    self.data.current_turn = CascadingTurnData(
        system_instructions=self.data.system_instructions if self.data.total_turns == 1 else "",
        # Provider and model info should be included in every turn
        llm_provider_class=self.data.llm_provider_class,
        llm_model_name=self.data.llm_model_name,
        stt_provider_class=self.data.stt_provider_class,
        stt_model_name=self.data.stt_model_name,
        tts_provider_class=self.data.tts_provider_class,
        tts_model_name=self.data.tts_model_name,
        vad_provider_class=self.data.vad_provider_class,
        vad_model_name=self.data.vad_model_name,
        eou_provider_class=self.data.eou_provider_class,
        eou_model_name=self.data.eou_model_name
    )
    
    if self.pending_user_start_time is not None:
        self.data.current_turn.user_speech_start_time = self.pending_user_start_time
        self._start_timeline_event("user_speech", self.pending_user_start_time)

    if self.data.is_user_speaking and self.data.user_input_start_time:
        if self.data.current_turn.user_speech_start_time is None:
            self.data.current_turn.user_speech_start_time = self.data.user_input_start_time
            if not any(ev.event_type == "user_speech" for ev in self.data.current_turn.timeline):
                self._start_timeline_event("user_speech", self.data.user_input_start_time)

    if user_transcript:
        self.set_user_transcript(user_transcript)

Start tracking a new user-agent turn

class CascadingMetricsData (session_id: str | None = None,
session_start_time: float = <factory>,
system_instructions: str = '',
total_interruptions: int = 0,
total_turns: int = 0,
turns: List[CascadingTurnData] = <factory>,
current_turn: CascadingTurnData | None = None,
user_speech_end_time: float | None = None,
agent_speech_start_time: float | None = None,
stt_start_time: float | None = None,
llm_start_time: float | None = None,
tts_start_time: float | None = None,
eou_start_time: float | None = None,
user_input_start_time: float | None = None,
is_agent_speaking: bool = False,
is_user_speaking: bool = False,
tts_first_byte_time: float | None = None,
llm_provider_class: str = '',
llm_model_name: str = '',
stt_provider_class: str = '',
stt_model_name: str = '',
tts_provider_class: str = '',
tts_model_name: str = '',
vad_provider_class: str = '',
vad_model_name: str = '',
eou_provider_class: str = '',
eou_model_name: str = '')
Expand source code
@dataclass
class CascadingMetricsData:
    """Data structure to hold all metrics for a session"""
    session_id: Optional[str] = None
    session_start_time: float = field(default_factory=time.time)
    system_instructions: str = ""
    total_interruptions: int = 0
    total_turns: int = 0
    turns: List[CascadingTurnData] = field(default_factory=list)
    current_turn: Optional[CascadingTurnData] = None
    user_speech_end_time: Optional[float] = None
    agent_speech_start_time: Optional[float] = None
    stt_start_time: Optional[float] = None
    llm_start_time: Optional[float] = None
    tts_start_time: Optional[float] = None
    eou_start_time: Optional[float] = None
    user_input_start_time: Optional[float] = None
    is_agent_speaking: bool = False
    is_user_speaking: bool = False
    tts_first_byte_time: Optional[float] = None
    
    llm_provider_class: str = ""
    llm_model_name: str = ""
    stt_provider_class: str = ""
    stt_model_name: str = ""
    tts_provider_class: str = ""
    tts_model_name: str = ""
    vad_provider_class: str = ""
    vad_model_name: str = ""
    eou_provider_class: str = ""
    eou_model_name: str = ""

Data structure to hold all metrics for a session

Instance variables

var agent_speech_start_time : float | None
var current_turnCascadingTurnData | None
var eou_model_name : str
var eou_provider_class : str
var eou_start_time : float | None
var is_agent_speaking : bool
var is_user_speaking : bool
var llm_model_name : str
var llm_provider_class : str
var llm_start_time : float | None
var session_id : str | None
var session_start_time : float
var stt_model_name : str
var stt_provider_class : str
var stt_start_time : float | None
var system_instructions : str
var total_interruptions : int
var total_turns : int
var tts_first_byte_time : float | None
var tts_model_name : str
var tts_provider_class : str
var tts_start_time : float | None
var turns : List[CascadingTurnData]
var user_input_start_time : float | None
var user_speech_end_time : float | None
var vad_model_name : str
var vad_provider_class : str
class CascadingTurnData (user_speech_start_time: float | None = None,
user_speech_end_time: float | None = None,
stt_start_time: float | None = None,
stt_end_time: float | None = None,
stt_latency: float | None = None,
llm_start_time: float | None = None,
llm_end_time: float | None = None,
llm_latency: float | None = None,
tts_start_time: float | None = None,
tts_end_time: float | None = None,
tts_latency: float | None = None,
ttfb: float | None = None,
eou_start_time: float | None = None,
eou_end_time: float | None = None,
eou_latency: float | None = None,
function_tool_timestamps: List[Dict[str, Any]] = <factory>,
e2e_latency: float | None = None,
interrupted: bool = False,
timestamp: float = <factory>,
function_tools_called: List[str] = <factory>,
system_instructions: str = '',
llm_provider_class: str = '',
llm_model_name: str = '',
stt_provider_class: str = '',
stt_model_name: str = '',
tts_provider_class: str = '',
tts_model_name: str = '',
vad_provider_class: str = '',
vad_model_name: str = '',
eou_provider_class: str = '',
eou_model_name: str = '',
timeline: List[TimelineEvent] = <factory>,
errors: List[Dict[str, Any]] = <factory>,
is_a2a_enabled: bool = False,
handoff_occurred: bool = False)
Expand source code
@dataclass
class CascadingTurnData:
    """Data structure for a single user-agent turn"""
    user_speech_start_time: Optional[float] = None
    user_speech_end_time: Optional[float] = None
    
    stt_start_time: Optional[float] = None
    stt_end_time: Optional[float] = None
    stt_latency: Optional[float] = None
    
    llm_start_time: Optional[float] = None
    llm_end_time: Optional[float] = None
    llm_latency: Optional[float] = None
    
    tts_start_time: Optional[float] = None
    tts_end_time: Optional[float] = None
    tts_latency: Optional[float] = None 
    ttfb: Optional[float] = None
    
    eou_start_time: Optional[float] = None
    eou_end_time: Optional[float] = None
    eou_latency: Optional[float] = None
    
    function_tool_timestamps: List[Dict[str, Any]] = field(default_factory=list)
    
    e2e_latency: Optional[float] = None
    interrupted: bool = False
    timestamp: float = field(default_factory=time.time)
    function_tools_called: List[str] = field(default_factory=list)
    system_instructions: str = ""
    
    llm_provider_class: str = ""
    llm_model_name: str = ""
    stt_provider_class: str = ""
    stt_model_name: str = ""
    tts_provider_class: str = ""
    tts_model_name: str = ""
    vad_provider_class: str = ""
    vad_model_name: str = ""
    eou_provider_class: str = ""
    eou_model_name: str = ""
    
    timeline: List[TimelineEvent] = field(default_factory=list)
    errors: List[Dict[str, Any]] = field(default_factory=list)
    is_a2a_enabled: bool = False
    handoff_occurred: bool = False  

Data structure for a single user-agent turn

Instance variables

var e2e_latency : float | None
var eou_end_time : float | None
var eou_latency : float | None
var eou_model_name : str
var eou_provider_class : str
var eou_start_time : float | None
var errors : List[Dict[str, Any]]
var function_tool_timestamps : List[Dict[str, Any]]
var function_tools_called : List[str]
var handoff_occurred : bool
var interrupted : bool
var is_a2a_enabled : bool
var llm_end_time : float | None
var llm_latency : float | None
var llm_model_name : str
var llm_provider_class : str
var llm_start_time : float | None
var stt_end_time : float | None
var stt_latency : float | None
var stt_model_name : str
var stt_provider_class : str
var stt_start_time : float | None
var system_instructions : str
var timeline : List[TimelineEvent]
var timestamp : float
var ttfb : float | None
var tts_end_time : float | None
var tts_latency : float | None
var tts_model_name : str
var tts_provider_class : str
var tts_start_time : float | None
var user_speech_end_time : float | None
var user_speech_start_time : float | None
var vad_model_name : str
var vad_provider_class : str
class RealtimeMetricsCollector
Expand source code
class RealtimeMetricsCollector:

    _agent_info: Dict[str, Any] = {
        "provider_class_name": None,
        "provider_model_name": None,
        "system_instructions": None,
        "function_tools": [],
        "mcp_tools": []
    }

    def __init__(self) -> None:
        self.turns: List[RealtimeTurnData] = []
        self.current_turn: Optional[RealtimeTurnData] = None
        self.lock = asyncio.Lock()
        self.agent_speech_end_timer: Optional[asyncio.TimerHandle] = None
        self.analytics_client = AnalyticsClient()
        self.traces_flow_manager: Optional[TracesFlowManager] = None

    def set_session_id(self, session_id: str):
        """Set the session ID for metrics tracking"""
        self.analytics_client.set_session_id(session_id)

    def set_traces_flow_manager(self, manager: TracesFlowManager):
        """Set the TracesFlowManager instance for realtime tracing"""
        self.traces_flow_manager = manager
        
    def _transform_to_camel_case(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Converts snake_case to camelCase for analytics reporting."""
        
        def to_camel_case(snake_str: str) -> str:
            parts = snake_str.split('_')
            return parts[0] + ''.join(x.title() for x in parts[1:])

        if isinstance(data, list):
            return [self._transform_to_camel_case(item) for item in data]
        if isinstance(data, dict):
            return {to_camel_case(k): self._transform_to_camel_case(v) for k, v in data.items()}
        return data

    async def start_session(self, agent: Agent, pipeline: Pipeline) -> None:
        RealtimeMetricsCollector._agent_info = {
            "provider_class_name": pipeline.model.__class__.__name__,
            "provider_model_name": getattr(pipeline.model, "model", None),
            "system_instructions": agent.instructions,
            "function_tools": [
                getattr(tool, "name", tool.__name__ if callable(tool) else str(tool))
                for tool in (
                    [tool for tool in agent.tools if tool not in agent.mcp_manager.tools]
                    if agent.mcp_manager else agent.tools
                )
            ] if agent.tools else [],
            "mcp_tools": [
                tool._tool_info.name
                for tool in agent.mcp_manager.tools
            ] if agent.mcp_manager else [],
        }
        self.turns = []
        self.current_turn = None
        from . import cascading_metrics_collector as cascading_metrics_collector

        traces_flow_manager = cascading_metrics_collector.traces_flow_manager
        if traces_flow_manager:
            self.set_traces_flow_manager(traces_flow_manager)

            config_attributes = {
                **RealtimeMetricsCollector._agent_info,
                "pipeline": pipeline.__class__.__name__,
                "llm_provider": pipeline.model.__class__.__name__,
            }

            await traces_flow_manager.start_agent_session_config(config_attributes)
            await traces_flow_manager.start_agent_session({})

    async def _start_new_interaction(self) -> None:
        async with self.lock:
            self.current_turn = RealtimeTurnData(
                **RealtimeMetricsCollector._agent_info
            )
            self.turns.append(self.current_turn)

    async def set_user_speech_start(self) -> None:
        if self.current_turn:
            self._finalize_interaction_and_send()
        
        await self._start_new_interaction()
        if self.current_turn and self.current_turn.user_speech_start_time is None:
            self.current_turn.user_speech_start_time = time.perf_counter()
            await self.start_timeline_event("user_speech")

    async def set_user_speech_end(self) -> None:
        if self.current_turn and self.current_turn.user_speech_end_time is None:
            self.current_turn.user_speech_end_time = time.perf_counter()
            await self.end_timeline_event("user_speech")

    async def set_agent_speech_start(self) -> None:
        if not self.current_turn:
            await self._start_new_interaction()
        elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None:
            self.current_turn.user_speech_end_time = time.perf_counter()
            await self.end_timeline_event("user_speech")

        if self.current_turn and self.current_turn.agent_speech_start_time is None:
            self.current_turn.agent_speech_start_time = time.perf_counter()
            await self.start_timeline_event("agent_speech")
            if self.agent_speech_end_timer:
                self.agent_speech_end_timer.cancel()

    async def set_agent_speech_end(self, timeout: float = 1.0) -> None:
        if self.current_turn:
            if self.agent_speech_end_timer:
                self.agent_speech_end_timer.cancel()
            
            loop = asyncio.get_event_loop()
            self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send)
            await self.end_timeline_event("agent_speech")

    async def set_a2a_handoff(self) -> None:
        """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios."""
        if self.current_turn:
            self.current_turn.is_a2a_enabled = True
            self.current_turn.handoff_occurred = True

    def _finalize_agent_speech(self) -> None:
        if not self.current_turn or self.current_turn.agent_speech_end_time is not None:
            return
        
        is_valid_interaction = self.current_turn.user_speech_start_time is not None or \
                               self.current_turn.agent_speech_start_time is not None

        if not is_valid_interaction:
            return

        self.current_turn.agent_speech_end_time = time.perf_counter()
        self.agent_speech_end_timer = None

    def _finalize_interaction_and_send(self) -> None:
        if not self.current_turn:
            return
        
        self._finalize_agent_speech()

        if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time:
            self.current_turn.user_speech_end_time = time.perf_counter()

        current_time = time.perf_counter()
        for event in self.current_turn.timeline:
            if event.end_time is None:
                event.end_time = current_time
                event.duration_ms = round((current_time - event.start_time) * 1000, 4)

        is_valid_interaction = self.current_turn.user_speech_start_time is not None or \
                               self.current_turn.agent_speech_start_time is not None

        if not is_valid_interaction:
            self.current_turn = None
            return

        self.current_turn.compute_latencies()
        if not hasattr(self, 'traces_flow_manager') or self.traces_flow_manager is None:
            try:
                from . import cascading_metrics_collector as cascading_metrics_collector
                self.traces_flow_manager = cascading_metrics_collector.traces_flow_manager
            except Exception:
                pass
            
        if self.traces_flow_manager:
            self.traces_flow_manager.create_realtime_turn_trace(self.current_turn)
        interaction_data = asdict(self.current_turn)
        
        fields_to_remove = ["realtime_model_errors","is_a2a_enabled","session_id","agent_speech_duration","agent_speech_start_time","agent_speech_end_time","thinking_time","function_tools","mcp_tools"]
        
        if len(self.turns) > 1:
            fields_to_remove.extend([
                "provider_class_name", "provider_model_name", "system_instructions","function_tools", "mcp_tools"])
       
        if not self.current_turn.is_a2a_enabled: 
            fields_to_remove.extend(["handoff_occurred"])
        
        for field in fields_to_remove:
            if field in interaction_data:
                del interaction_data[field]

        transformed_data = self._transform_to_camel_case(interaction_data)
        self.analytics_client.send_interaction_analytics_safe({
            "data": [transformed_data]
        })
        self.turns.append(self.current_turn)
        self.current_turn = None

    async def add_timeline_event(self, event: TimelineEvent) -> None:
        if self.current_turn:
            self.current_turn.timeline.append(event)

    async def start_timeline_event(self, event_type: str) -> None:
        """Start a timeline event with a precise start time"""
        if self.current_turn:
            event = TimelineEvent(
                event_type=event_type,
                start_time=time.perf_counter()
            )
            self.current_turn.timeline.append(event)

    async def end_timeline_event(self, event_type: str) -> None:
        """End a timeline event and calculate duration"""
        if self.current_turn:
            end_time = time.perf_counter()
            for event in reversed(self.current_turn.timeline):
                if event.event_type == event_type and event.end_time is None:
                    event.end_time = end_time
                    event.duration_ms = round((end_time - event.start_time) * 1000, 4)
                    break

    async def update_timeline_event_text(self, event_type: str, text: str) -> None:
        """Update timeline event with text content"""
        if self.current_turn:
            for event in reversed(self.current_turn.timeline):
                if event.event_type == event_type and not event.text:
                    event.text = text
                    break

    async def add_tool_call(self, tool_name: str) -> None:
        if self.current_turn and tool_name not in self.current_turn.function_tools_called:
            self.current_turn.function_tools_called.append(tool_name)
            logger.info(f"function tool called: {tool_name}")

    async def set_user_transcript(self, text: str) -> None:
        """Set the user transcript for the current turn and update timeline"""
        if self.current_turn:
            if self.current_turn.user_speech_start_time is None:
                self.current_turn.user_speech_start_time = time.perf_counter()
                await self.start_timeline_event("user_speech")
            if self.current_turn.user_speech_end_time is None:
                self.current_turn.user_speech_end_time = time.perf_counter()
                await self.end_timeline_event("user_speech")
            logger.info(f"user input speech: {text}")
            await self.update_timeline_event_text("user_speech", text)

    async def set_agent_response(self, text: str) -> None:
        """Set the agent response for the current turn and update timeline"""
        if self.current_turn:
            if self.current_turn.agent_speech_start_time is None:
                self.current_turn.agent_speech_start_time = time.perf_counter()
                await self.start_timeline_event("agent_speech")
                logger.info(f"agent output speech: {text}")
            await self.update_timeline_event_text("agent_speech", text)

    def set_realtime_model_error(self, error: Dict[str, Any]) -> None:
        """Set a realtime model-specific error for the current turn"""
        if self.current_turn:
            logger.error(f"realtime model error: {error}")
            self.current_turn.realtime_model_errors.append(error)

    async def set_interrupted(self) -> None:
        if self.current_turn:
            self.current_turn.interrupted = True

    def finalize_session(self) -> None:
        asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop())

Methods

async def add_timeline_event(self,
event: TimelineEvent) ‑> None
Expand source code
async def add_timeline_event(self, event: TimelineEvent) -> None:
    if self.current_turn:
        self.current_turn.timeline.append(event)
async def add_tool_call(self, tool_name: str) ‑> None
Expand source code
async def add_tool_call(self, tool_name: str) -> None:
    if self.current_turn and tool_name not in self.current_turn.function_tools_called:
        self.current_turn.function_tools_called.append(tool_name)
        logger.info(f"function tool called: {tool_name}")
async def end_timeline_event(self, event_type: str) ‑> None
Expand source code
async def end_timeline_event(self, event_type: str) -> None:
    """End a timeline event and calculate duration"""
    if self.current_turn:
        end_time = time.perf_counter()
        for event in reversed(self.current_turn.timeline):
            if event.event_type == event_type and event.end_time is None:
                event.end_time = end_time
                event.duration_ms = round((end_time - event.start_time) * 1000, 4)
                break

End a timeline event and calculate duration

def finalize_session(self) ‑> None
Expand source code
def finalize_session(self) -> None:
    asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop())
async def set_a2a_handoff(self) ‑> None
Expand source code
async def set_a2a_handoff(self) -> None:
    """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios."""
    if self.current_turn:
        self.current_turn.is_a2a_enabled = True
        self.current_turn.handoff_occurred = True

Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.

async def set_agent_response(self, text: str) ‑> None
Expand source code
async def set_agent_response(self, text: str) -> None:
    """Set the agent response for the current turn and update timeline"""
    if self.current_turn:
        if self.current_turn.agent_speech_start_time is None:
            self.current_turn.agent_speech_start_time = time.perf_counter()
            await self.start_timeline_event("agent_speech")
            logger.info(f"agent output speech: {text}")
        await self.update_timeline_event_text("agent_speech", text)

Set the agent response for the current turn and update timeline

async def set_agent_speech_end(self, timeout: float = 1.0) ‑> None
Expand source code
async def set_agent_speech_end(self, timeout: float = 1.0) -> None:
    if self.current_turn:
        if self.agent_speech_end_timer:
            self.agent_speech_end_timer.cancel()
        
        loop = asyncio.get_event_loop()
        self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send)
        await self.end_timeline_event("agent_speech")
async def set_agent_speech_start(self) ‑> None
Expand source code
async def set_agent_speech_start(self) -> None:
    if not self.current_turn:
        await self._start_new_interaction()
    elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None:
        self.current_turn.user_speech_end_time = time.perf_counter()
        await self.end_timeline_event("user_speech")

    if self.current_turn and self.current_turn.agent_speech_start_time is None:
        self.current_turn.agent_speech_start_time = time.perf_counter()
        await self.start_timeline_event("agent_speech")
        if self.agent_speech_end_timer:
            self.agent_speech_end_timer.cancel()
async def set_interrupted(self) ‑> None
Expand source code
async def set_interrupted(self) -> None:
    if self.current_turn:
        self.current_turn.interrupted = True
def set_realtime_model_error(self, error: Dict[str, Any]) ‑> None
Expand source code
def set_realtime_model_error(self, error: Dict[str, Any]) -> None:
    """Set a realtime model-specific error for the current turn"""
    if self.current_turn:
        logger.error(f"realtime model error: {error}")
        self.current_turn.realtime_model_errors.append(error)

Set a realtime model-specific error for the current turn

def set_session_id(self, session_id: str)
Expand source code
def set_session_id(self, session_id: str):
    """Set the session ID for metrics tracking"""
    self.analytics_client.set_session_id(session_id)

Set the session ID for metrics tracking

def set_traces_flow_manager(self, manager: TracesFlowManager)
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager):
    """Set the TracesFlowManager instance for realtime tracing"""
    self.traces_flow_manager = manager

Set the TracesFlowManager instance for realtime tracing

async def set_user_speech_end(self) ‑> None
Expand source code
async def set_user_speech_end(self) -> None:
    if self.current_turn and self.current_turn.user_speech_end_time is None:
        self.current_turn.user_speech_end_time = time.perf_counter()
        await self.end_timeline_event("user_speech")
async def set_user_speech_start(self) ‑> None
Expand source code
async def set_user_speech_start(self) -> None:
    if self.current_turn:
        self._finalize_interaction_and_send()
    
    await self._start_new_interaction()
    if self.current_turn and self.current_turn.user_speech_start_time is None:
        self.current_turn.user_speech_start_time = time.perf_counter()
        await self.start_timeline_event("user_speech")
async def set_user_transcript(self, text: str) ‑> None
Expand source code
async def set_user_transcript(self, text: str) -> None:
    """Set the user transcript for the current turn and update timeline"""
    if self.current_turn:
        if self.current_turn.user_speech_start_time is None:
            self.current_turn.user_speech_start_time = time.perf_counter()
            await self.start_timeline_event("user_speech")
        if self.current_turn.user_speech_end_time is None:
            self.current_turn.user_speech_end_time = time.perf_counter()
            await self.end_timeline_event("user_speech")
        logger.info(f"user input speech: {text}")
        await self.update_timeline_event_text("user_speech", text)

Set the user transcript for the current turn and update timeline

async def start_session(self, agent: Agent, pipeline: Pipeline) ‑> None
Expand source code
async def start_session(self, agent: Agent, pipeline: Pipeline) -> None:
    RealtimeMetricsCollector._agent_info = {
        "provider_class_name": pipeline.model.__class__.__name__,
        "provider_model_name": getattr(pipeline.model, "model", None),
        "system_instructions": agent.instructions,
        "function_tools": [
            getattr(tool, "name", tool.__name__ if callable(tool) else str(tool))
            for tool in (
                [tool for tool in agent.tools if tool not in agent.mcp_manager.tools]
                if agent.mcp_manager else agent.tools
            )
        ] if agent.tools else [],
        "mcp_tools": [
            tool._tool_info.name
            for tool in agent.mcp_manager.tools
        ] if agent.mcp_manager else [],
    }
    self.turns = []
    self.current_turn = None
    from . import cascading_metrics_collector as cascading_metrics_collector

    traces_flow_manager = cascading_metrics_collector.traces_flow_manager
    if traces_flow_manager:
        self.set_traces_flow_manager(traces_flow_manager)

        config_attributes = {
            **RealtimeMetricsCollector._agent_info,
            "pipeline": pipeline.__class__.__name__,
            "llm_provider": pipeline.model.__class__.__name__,
        }

        await traces_flow_manager.start_agent_session_config(config_attributes)
        await traces_flow_manager.start_agent_session({})
async def start_timeline_event(self, event_type: str) ‑> None
Expand source code
async def start_timeline_event(self, event_type: str) -> None:
    """Start a timeline event with a precise start time"""
    if self.current_turn:
        event = TimelineEvent(
            event_type=event_type,
            start_time=time.perf_counter()
        )
        self.current_turn.timeline.append(event)

Start a timeline event with a precise start time

async def update_timeline_event_text(self, event_type: str, text: str) ‑> None
Expand source code
async def update_timeline_event_text(self, event_type: str, text: str) -> None:
    """Update timeline event with text content"""
    if self.current_turn:
        for event in reversed(self.current_turn.timeline):
            if event.event_type == event_type and not event.text:
                event.text = text
                break

Update timeline event with text content

class TimelineEvent (event_type: str,
start_time: float,
end_time: float | None = None,
duration_ms: float | None = None,
text: str = '')
Expand source code
@dataclass
class TimelineEvent:
    """Data structure for a single timeline event"""
    event_type: str 
    start_time: float
    end_time: Optional[float] = None
    duration_ms: Optional[float] = None
    text: str = "" 

Data structure for a single timeline event

Instance variables

var duration_ms : float | None
var end_time : float | None
var event_type : str
var start_time : float
var text : str