Module agents.metrics
Sub-modules
agents.metrics.analyticsagents.metrics.cascading_metrics_collectoragents.metrics.integrationagents.metrics.logsagents.metrics.modelsagents.metrics.realtime_metrics_collectoragents.metrics.telemetryagents.metrics.traces_flow
Functions
def auto_initialize_telemetry_and_logs(room_id: str,
peer_id: str,
room_attributes: Dict[str, Any] = None,
session_id: str = None,
sdk_metadata: Dict[str, Any] = None)-
Expand source code
def auto_initialize_telemetry_and_logs(room_id: str, peer_id: str, room_attributes: Dict[str, Any] = None, session_id: str = None, sdk_metadata: Dict[str, Any] = None): """ Auto-initialize telemetry and logs from room attributes """ if not room_attributes: return observability_jwt = room_attributes.get('observability', '') traces_config = room_attributes.get('traces', {}) if traces_config.get('enabled'): metadata = { } initialize_telemetry( room_id=room_id, peer_id=peer_id, sdk_name="agents", observability_jwt=observability_jwt, traces_config=traces_config, metadata=metadata, sdk_metadata=sdk_metadata ) logs_config = room_attributes.get('logs', {}) if logs_config.get('enabled'): initialize_logs( meeting_id=room_id, peer_id=peer_id, jwt_key=observability_jwt, log_config=logs_config, session_id=session_id, sdk_metadata=sdk_metadata, )Auto-initialize telemetry and logs from room attributes
def complete_span(span: opentelemetry.trace.span.Span | None,
status_code,
message: str = '',
end_time: float | None = None)-
Expand source code
def complete_span(span: Optional[Span], status_code, message: str = "", end_time: Optional[float] = None): """ Complete a trace span (convenience method) Args: span: Span to complete status_code: Status code message: Status message end_time: End time in seconds since epoch (optional) """ telemetry = get_telemetry() if telemetry and span: telemetry.complete_span(span, status_code, message, end_time)Complete a trace span (convenience method)
Args
span- Span to complete
status_code- Status code
message- Status message
end_time- End time in seconds since epoch (optional)
def create_log(message: str, log_level: str = 'INFO', attributes: Dict[str, Any] = None)-
Expand source code
def create_log(message: str, log_level: str = "INFO", attributes: Dict[str, Any] = None): """ Create a log entry (convenience method) Args: message: Log message log_level: Log level attributes: Additional attributes """ logs = get_logs() if logs: logs.create_log(message, log_level, attributes)Create a log entry (convenience method)
Args
message- Log message
log_level- Log level
attributes- Additional attributes
def create_span(span_name: str,
attributes: Dict[str, Any] = None,
parent_span: opentelemetry.trace.span.Span | None = None,
start_time: float | None = None)-
Expand source code
def create_span(span_name: str, attributes: Dict[str, Any] = None, parent_span: Optional[Span] = None, start_time: Optional[float] = None): """ Create a trace span (convenience method) Args: span_name: Name of the span attributes: Span attributes parent_span: Parent span (optional) start_time: Start time in seconds since epoch (optional) Returns: Span object or None """ telemetry = get_telemetry() if telemetry: return telemetry.trace(span_name, attributes, parent_span, start_time) return NoneCreate a trace span (convenience method)
Args
span_name- Name of the span
attributes- Span attributes
parent_span- Parent span (optional)
start_time- Start time in seconds since epoch (optional)
Returns
Span object or None
Classes
class CascadingMetricsCollector-
Expand source code
class CascadingMetricsCollector: """Collects and tracks performance metrics for AI agent turns""" def __init__(self): self.data = CascadingMetricsData() self.analytics_client = AnalyticsClient() self.traces_flow_manager: Optional[TracesFlowManager] = None self.active_spans: Dict[str, Span] = {} self.pending_user_start_time: Optional[float] = None self.playground = False def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance""" self.traces_flow_manager = manager def _generate_interaction_id(self) -> str: """Generate a hash-based turn ID""" timestamp = str(time.time()) session_id = self.data.session_id or "default" interaction_count = str(self.data.total_turns) hash_input = f"{timestamp}_{session_id}_{interaction_count}" return hashlib.md5(hash_input.encode()).hexdigest()[:16] def _round_latency(self, latency: float) -> float: """Convert latency from seconds to milliseconds and round to 4 decimal places""" return round(latency * 1000, 4) def _transform_to_camel_case(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]: """Transform snake_case field names to camelCase for analytics""" field_mapping = { # Speech Info 'user_speech_start_time': 'userSpeechStartTime', 'user_speech_end_time': 'userSpeechEndTime', 'user_speech_duration': 'userSpeechDuration', 'agent_speech_start_time': 'agentSpeechStartTime', 'agent_speech_end_time': 'agentSpeechEndTime', 'agent_speech_duration': 'agentSpeechDuration', # STT Metrics 'stt_latency': 'sttLatency', 'stt_start_time': 'sttStartTime', 'stt_end_time': 'sttEndTime', 'stt_preflight_latency': 'sttPreflightLatency', 'stt_interim_latency': 'sttInterimLatency', 'stt_duration': 'sttDuration', 'stt_confidence': 'sttConfidence', 'stt_preemptive_generation_occurred': 'sttPreemptiveGenerationOccurred', 'stt_preemptive_generation_enabled': 'sttPreemptiveGenerationEnabled', # For OpenAISTT only 'stt_input_tokens': 'sttInputTokens', 'stt_output_tokens': 'sttOutputTokens', 'stt_total_tokens': 'sttTotalTokens', # KB Metrics 'kb_retrieval_latency': 'kbRetrievalLatency', 'kb_documents': 'kbDocuments', 'kb_scores': 'kbScores', # LLM Metrics 'llm_input': 'llmInput', 'llm_duration': 'llmDuration', 'llm_start_time': 'llmStartTime', 'llm_end_time': 'llmEndTime', 'llm_ttft': 'ttft', 'prompt_tokens': 'promptTokens', 'completion_tokens': 'completionTokens', 'total_tokens': 'totalTokens', 'prompt_cached_tokens': 'promptCachedTokens', 'tokens_per_second': 'tokensPerSecond', # TTS Metrics 'tts_start_time': 'ttsStartTime', 'tts_end_time': 'ttsEndTime', 'tts_duration': 'ttsDuration', 'tts_characters': 'ttsCharacters', 'ttfb': 'ttfb', "tts_latency": "ttsLatency", # EOU Metrics 'eou_latency': 'eouLatency', 'eou_start_time': 'eouStartTime', 'eou_end_time': 'eouEndTime', # Providers & Metadata 'llm_provider_class': 'llmProviderClass', 'llm_model_name': 'llmModelName', 'stt_provider_class': 'sttProviderClass', 'stt_model_name': 'sttModelName', 'tts_provider_class': 'ttsProviderClass', 'tts_model_name': 'ttsModelName', 'vad_provider_class': 'vadProviderClass', 'vad_model_name': 'vadModelName', 'eou_provider_class': 'eouProviderClass', 'eou_model_name': 'eouModelName', # Logic & Tools 'e2e_latency': 'e2eLatency', 'interrupted': 'interrupted', 'timestamp': 'timestamp', 'function_tools_called': 'functionToolsCalled', 'function_tool_timestamps': 'functionToolTimestamps', 'system_instructions': 'systemInstructions', 'handoff_occurred': 'handOffOccurred', 'is_a2a_enabled': 'isA2aEnabled', 'errors': 'errors', } timeline_field_mapping = { 'event_type': 'eventType', 'start_time': 'startTime', 'end_time': 'endTime', 'duration_ms': 'durationInMs' } transformed_data = {} for key, value in interaction_data.items(): camel_key = field_mapping.get(key, key) if key == 'timeline' and isinstance(value, list): transformed_timeline = [] for event in value: transformed_event = {} for event_key, event_value in event.items(): camel_event_key = timeline_field_mapping.get(event_key, event_key) transformed_event[camel_event_key] = event_value transformed_timeline.append(transformed_event) transformed_data[camel_key] = transformed_timeline else: transformed_data[camel_key] = value return transformed_data def _remove_negatives(self, obj: Any) -> Any: """Recursively clamp any numeric value < 0 to 0 in dicts/lists.""" if isinstance(obj, dict): for k, v in obj.items(): if isinstance(v, (int, float)): if v < 0: obj[k] = 0 elif isinstance(v, (dict, list)): obj[k] = self._remove_negatives(v) return obj if isinstance(obj, list): for i, v in enumerate(obj): if isinstance(v, (int, float)): if v < 0: obj[i] = 0 elif isinstance(v, (dict, list)): obj[i] = self._remove_negatives(v) return obj return obj def _start_timeline_event(self, event_type: str, start_time: float) -> None: """Start a timeline event""" if self.data.current_turn: event = TimelineEvent( event_type=event_type, start_time=start_time ) self.data.current_turn.timeline.append(event) def _end_timeline_event(self, event_type: str, end_time: float) -> None: """End a timeline event and calculate duration""" if self.data.current_turn: for event in reversed(self.data.current_turn.timeline): if event.event_type == event_type and event.end_time is None: event.end_time = end_time event.duration_ms = self._round_latency(end_time - event.start_time) break def _update_timeline_event_text(self, event_type: str, text: str) -> None: """Update timeline event with text content""" if self.data.current_turn: for event in reversed(self.data.current_turn.timeline): if event.event_type == event_type and not event.text: event.text = text break def _calculate_e2e_metrics(self, turn: CascadingTurnData) -> None: """Calculate E2E and E2ET latencies based on individual component latencies""" e2e_components = [] if turn.stt_latency: e2e_components.append(turn.stt_latency) if turn.eou_latency: e2e_components.append(turn.eou_latency) if turn.llm_ttft: e2e_components.append(turn.llm_ttft) if turn.tts_latency: e2e_components.append(turn.tts_latency) if e2e_components: turn.e2e_latency = round(sum(e2e_components), 4) def _validate_interaction_has_required_latencies(self, turn: CascadingTurnData) -> bool: """ Validate that the turn has at least one of the required latency metrics. Returns True if at least one latency is present, False if ALL are absent/None. """ stt_present = turn.stt_latency is not None tts_present = turn.ttfb is not None llm_present = turn.llm_ttft is not None eou_present = turn.eou_latency is not None if not any([stt_present, tts_present, llm_present, eou_present]): return False return True def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.data.session_id = session_id self.analytics_client.set_session_id(session_id) def set_system_instructions(self, instructions: str): """Set the system instructions for this session""" self.data.system_instructions = instructions def set_provider_info(self, llm_provider: str = "", llm_model: str = "", stt_provider: str = "", stt_model: str = "", tts_provider: str = "", tts_model: str = "", vad_provider: str = "", vad_model: str = "", eou_provider: str = "", eou_model: str = ""): """Set the provider class and model information for this session""" self.data.llm_provider_class = llm_provider self.data.llm_model_name = llm_model self.data.stt_provider_class = stt_provider self.data.stt_model_name = stt_model self.data.tts_provider_class = tts_provider self.data.tts_model_name = tts_model self.data.vad_provider_class = vad_provider self.data.vad_model_name = vad_model self.data.eou_provider_class = eou_provider self.data.eou_model_name = eou_model def update_provider_class(self, component_type: str, provider_class: str): """Update the provider class for a specific component when fallback occurs. This updates both the session-level data and the current turn data if exists. Args: component_type: "STT", "LLM", or "TTS" provider_class: The new provider class name (e.g., "GoogleLLM") """ component_map = { "STT": ("stt_provider_class", "stt_provider_class"), "LLM": ("llm_provider_class", "llm_provider_class"), "TTS": ("tts_provider_class", "tts_provider_class"), } if component_type in component_map: session_attr, turn_attr = component_map[component_type] # Update session-level data setattr(self.data, session_attr, provider_class) # Update current turn if exists if self.data.current_turn: setattr(self.data.current_turn, turn_attr, provider_class) logger.info(f"Updated {component_type} provider class to: {provider_class}") def start_new_interaction(self, user_transcript: str = "") -> None: """Start tracking a new user-agent turn""" if self.data.current_turn: self.complete_current_turn() # Reset the lock for the new turn self.data.turn_timestamps_locked = False # Clear all start times to prevent stale timestamps from previous turns self.data.stt_start_time = None self.data.llm_start_time = None self.data.tts_start_time = None self.data.eou_start_time = None self.data.knowledge_base_start_time = None self.data.total_turns += 1 self.data.current_turn = CascadingTurnData( system_instructions=self.data.system_instructions if self.data.total_turns == 1 else "", # Provider and model info should be included in every turn llm_provider_class=self.data.llm_provider_class, llm_model_name=self.data.llm_model_name, stt_provider_class=self.data.stt_provider_class, stt_model_name=self.data.stt_model_name, tts_provider_class=self.data.tts_provider_class, tts_model_name=self.data.tts_model_name, vad_provider_class=self.data.vad_provider_class, vad_model_name=self.data.vad_model_name, vad_min_silence_duration=self.data.vad_min_silence_duration, vad_min_speech_duration=self.data.vad_min_speech_duration, vad_threshold=self.data.vad_threshold, eou_provider_class=self.data.eou_provider_class, eou_model_name=self.data.eou_model_name, stt_preemptive_generation_enabled=self.data.stt_preemptive_generation_enabled, min_speech_wait_timeout=self.data.min_speech_wait_timeout, max_speech_wait_timeout=self.data.max_speech_wait_timeout, interrupt_mode=self.data.interrupt_mode, interrupt_min_duration=self.data.interrupt_min_duration, interrupt_min_words=self.data.interrupt_min_words, false_interrupt_pause_duration=self.data.false_interrupt_pause_duration, resume_on_false_interrupt=self.data.resume_on_false_interrupt ) if self.pending_user_start_time is not None: self.data.current_turn.user_speech_start_time = self.pending_user_start_time self._start_timeline_event("user_speech", self.pending_user_start_time) if self.data.is_user_speaking and self.data.user_input_start_time: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time) if user_transcript: self.set_user_transcript(user_transcript) def complete_current_turn(self) -> None: """Complete and store the current turn""" if self.data.current_turn: self._calculate_e2e_metrics(self.data.current_turn) if not self._validate_interaction_has_required_latencies(self.data.current_turn) and self.data.total_turns > 1: if self.data.current_turn.user_speech_start_time is not None: if (self.pending_user_start_time is None or self.data.current_turn.user_speech_start_time < self.pending_user_start_time): self.pending_user_start_time = self.data.current_turn.user_speech_start_time logger.info(f"[metrics] Caching earliest user start: {self.pending_user_start_time}") self.data.current_turn = None return if self.traces_flow_manager: self.traces_flow_manager.create_cascading_turn_trace(self.data.current_turn) self.data.turns.append(self.data.current_turn) if self.playground: self.playground_manager.send_cascading_metrics(metrics=self.data.current_turn, full_turn_data=True) interaction_data = asdict(self.data.current_turn) interaction_data['timeline'] = [asdict(event) for event in self.data.current_turn.timeline] transformed_data = self._transform_to_camel_case(interaction_data) # transformed_data = self._intify_latencies_and_timestamps(transformed_data) always_remove_fields = [ 'kb_start_time', 'kb_end_time', 'user_speech', 'stt_preflight_end_time', 'stt_interim_end_time', 'errors', 'functionToolTimestamps', 'sttStartTime', 'sttEndTime', 'ttsStartTime', 'ttsEndTime', 'llmStartTime', 'llmEndTime', 'eouStartTime', 'eouEndTime', 'is_a2a_enabled', "interactionId", "timestamp", ] if not self.data.current_turn.is_a2a_enabled: always_remove_fields.append("handOffOccurred") for field in always_remove_fields: if field in transformed_data: del transformed_data[field] if len(self.data.turns) > 1: provider_fields = [ 'systemInstructions', # 'llmProviderClass', 'llmModelName', # 'sttProviderClass', 'sttModelName', # 'ttsProviderClass', 'ttsModelName', 'eouProviderClass', 'eouModelName' 'vadProviderClass', 'vadModelName' ] for field in provider_fields: if field in transformed_data: del transformed_data[field] transformed_data = self._remove_negatives(transformed_data) interaction_payload = { "data": [transformed_data] } self.analytics_client.send_interaction_analytics_safe(interaction_payload) self.data.current_turn = None self.data.is_interrupted = False self.pending_user_start_time = None def on_interrupted(self): """Called when the user interrupts the agent""" if self.data.is_interrupted: return if self.data.is_agent_speaking: self.data.total_interruptions += 1 self.data.is_interrupted = True if self.data.current_turn: self.data.current_turn.interrupted = True self.data.current_turn.interrupt_start_time = time.perf_counter() logger.info(f"User interrupted the agent. Total interruptions: {self.data.total_interruptions}") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"interrupted": self.data.current_turn.interrupted}) def on_interrupt_trigger(self, word_count: Optional[int] = 0, duration: Optional[float] = 0): """Called when the user interrupts the agent""" if self.data.is_interrupted: return if self.data.is_agent_speaking: if self.data.current_turn: self.data.current_turn.interrupt_words = word_count self.data.current_turn.interrupt_duration = duration if self.data.current_turn.interrupt_words: self.data.current_turn.interrupt_reason.append("STT") if self.data.current_turn.interrupt_duration: self.data.current_turn.interrupt_reason.append("VAD") def set_interrupt_config(self, mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"], min_duration: Optional[float] = None, min_words: Optional[int] = None, false_interrupt_pause_duration: Optional[float] = None, resume_on_false_interrupt: Optional[bool] = None): self.data.interrupt_mode = mode self.data.interrupt_min_duration = min_duration self.data.interrupt_min_words = min_words self.data.false_interrupt_pause_duration = false_interrupt_pause_duration self.data.resume_on_false_interrupt = resume_on_false_interrupt def on_false_interrupt_start(self, duration: float): """Called when false interrupt timer starts (potential resume scenario)""" if self.data.current_turn: self.data.current_turn.false_interrupt_start_time = time.perf_counter() logger.info(f"False interrupt started - waiting {duration}s to determine if real interrupt") def on_false_interrupt_resume(self): """Called when TTS resumes after false interrupt timeout (user didn't continue speaking)""" if self.data.current_turn: self.data.current_turn.is_false_interrupt = True self.data.current_turn.false_interrupt_end_time = time.perf_counter() if self.data.current_turn.false_interrupt_start_time: self.data.current_turn.false_interrupt_duration = self._round_latency( self.data.current_turn.false_interrupt_end_time - self.data.current_turn.false_interrupt_start_time ) self.data.current_turn.resumed_after_false_interrupt = True # Reset interrupted flag and data since this was NOT a true interrupt self.data.current_turn.interrupted = False self.data.current_turn.interrupt_start_time = None self.data.current_turn.interrupt_end_time = None self.data.current_turn.interrupt_words = None self.data.current_turn.interrupt_duration = None self.data.current_turn.interrupt_reason = [] self.data.is_interrupted = False logger.info(f"False interrupt ended - TTS resumed after {self.data.current_turn.false_interrupt_duration}ms") def on_false_interrupt_escalated(self, word_count: Optional[int] = None): """Called when a false interrupt escalates to a true interrupt (user continued speaking)""" if self.data.current_turn: self.data.current_turn.is_false_interrupt = True self.data.current_turn.false_interrupt_end_time = time.perf_counter() if self.data.current_turn.false_interrupt_start_time: self.data.current_turn.false_interrupt_duration = self._round_latency( self.data.current_turn.false_interrupt_end_time - self.data.current_turn.false_interrupt_start_time ) self.data.current_turn.resumed_after_false_interrupt = False if word_count is not None: self.data.current_turn.false_interrupt_words = word_count logger.info(f"False interrupt escalated to true interrupt after {self.data.current_turn.false_interrupt_duration}ms") def on_user_speech_start(self): """Called when user starts speaking""" if self.data.is_user_speaking: return if not self.data.current_turn: self.start_new_interaction() self.data.is_user_speaking = True self.data.user_input_start_time = time.perf_counter() if self.data.current_turn: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(event.event_type == "user_speech" for event in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time) def on_user_speech_end(self): """Called when user stops speaking""" self.data.is_user_speaking = False self.data.user_speech_end_time = time.perf_counter() if self.data.current_turn and self.data.current_turn.user_speech_start_time: self.data.current_turn.user_speech_end_time = self.data.user_speech_end_time self.data.current_turn.user_speech_duration = self._round_latency(self.data.current_turn.user_speech_end_time - self.data.current_turn.user_speech_start_time) self._end_timeline_event("user_speech", self.data.user_speech_end_time) logger.info(f"user speech duration: {self.data.current_turn.user_speech_duration}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"user_speech_duration": self.data.current_turn.user_speech_duration}) def on_agent_speech_start(self): """Called when agent starts speaking (actual audio output)""" self.data.is_agent_speaking = True self.data.agent_speech_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.agent_speech_start_time = self.data.agent_speech_start_time if not any(event.event_type == "agent_speech" and event.end_time is None for event in self.data.current_turn.timeline): self._start_timeline_event("agent_speech", self.data.agent_speech_start_time) def on_agent_speech_end(self): """Called when agent stops speaking""" self.data.is_agent_speaking = False agent_speech_end_time = time.perf_counter() if self.data.current_turn: self._end_timeline_event("agent_speech", agent_speech_end_time) self.data.current_turn.agent_speech_end_time = agent_speech_end_time if self.data.tts_start_time and self.data.tts_first_byte_time: total_tts_latency = self.data.tts_first_byte_time - self.data.tts_start_time if self.data.current_turn and self.data.current_turn.agent_speech_start_time: self.data.current_turn.tts_latency = self._round_latency(total_tts_latency) self.data.current_turn.agent_speech_duration = self._round_latency(agent_speech_end_time - self.data.current_turn.agent_speech_start_time) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"tts_latency": self.data.current_turn.tts_latency}) self.playground_manager.send_cascading_metrics(metrics={"agent_speech_duration": self.data.current_turn.agent_speech_duration}) self.data.tts_start_time = None self.data.tts_first_byte_time = None elif self.data.tts_start_time: # If we have start time but no first byte time, just reset self.data.tts_start_time = None self.data.tts_first_byte_time = None def on_stt_response(self, duration: float, confidence: float): if not self.data.current_turn: self.start_new_interaction() if self.data.current_turn: self.data.current_turn.stt_duration = duration self.data.current_turn.stt_confidence = confidence logger.info(f"Stt duration {duration}, confidence {confidence}") def on_stt_metrics(self, metrics: Dict[str, Any]): if self.data.current_turn: self.data.current_turn.stt_input_tokens = metrics.get("input_tokens") self.data.current_turn.stt_output_tokens = metrics.get("output_tokens") self.data.current_turn.stt_total_tokens = metrics.get("total_tokens") def on_stt_start(self): """Called when STT processing starts""" # Don't overwrite STT timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return self.data.stt_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.stt_start_time = self.data.stt_start_time def on_stt_complete(self, user_text: str): """Called when STT processing completes""" # Don't overwrite STT timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return if self.data.current_turn and self.data.current_turn.stt_preemptive_generation_enabled and self.data.current_turn.stt_preemptive_generation_occurred: logger.info("STT preemptive generation occurred, skipping stt complete") return if self.data.stt_start_time: stt_end_time = time.perf_counter() stt_latency = stt_end_time - self.data.stt_start_time if self.data.current_turn: self.data.current_turn.stt_transcript = user_text self.data.current_turn.stt_end_time = stt_end_time self.data.current_turn.stt_latency = self._round_latency(stt_latency) logger.info(f"stt latency: {self.data.current_turn.stt_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"stt_latency": self.data.current_turn.stt_latency}) self.data.stt_start_time = None def on_knowledge_base_start(self): """Called when knowledge base processing starts""" self.data.knowledge_base_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.kb_start_time = self.data.knowledge_base_start_time def on_knowledge_base_complete(self, documents: List[str], scores: List[float], record_id: List[str] = None): """Called when knowledge base processing completes""" if self.data.knowledge_base_start_time: knowledge_base_end_time = time.perf_counter() kb_retrieval_latency = knowledge_base_end_time - self.data.knowledge_base_start_time if self.data.current_turn: self.data.current_turn.kb_documents = documents self.data.current_turn.kb_scores = scores self.data.current_turn.kb_record_ids = record_id self.data.current_turn.kb_end_time = knowledge_base_end_time self.data.current_turn.kb_retrieval_latency = self._round_latency(kb_retrieval_latency) logger.info(f"knowledge base retrieval latency: {self.data.current_turn.kb_retrieval_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"kb_retrieval_latency": self.data.current_turn.kb_retrieval_latency}) self.data.knowledge_base_start_time = None def on_llm_start(self): """Called when LLM processing starts""" self.data.llm_start_time = time.perf_counter() # Lock timestamps to prevent STT/EOU overwrites from subsequent events self.data.turn_timestamps_locked = True if self.data.current_turn: self.data.current_turn.llm_start_time = self.data.llm_start_time def on_llm_input(self, text: str): """Record the actual text sent to LLM""" if self.data.current_turn: self.data.current_turn.llm_input = text def on_llm_complete(self): """Called when LLM processing completes""" if self.data.llm_start_time: llm_end_time = time.perf_counter() llm_duration = llm_end_time - self.data.llm_start_time if self.data.current_turn: self.data.current_turn.llm_end_time = llm_end_time self.data.current_turn.llm_duration = self._round_latency(llm_duration) logger.info(f"llm duration: {self.data.current_turn.llm_duration}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"llm_duration": self.data.current_turn.llm_duration}) self.data.llm_start_time = None def set_llm_input(self, text: str): """Record the actual text sent to LLM""" if self.data.current_turn: self.data.current_turn.llm_input = text def on_tts_start(self): """Called when TTS processing starts""" self.data.tts_start_time = time.perf_counter() self.data.tts_first_byte_time = None if self.data.current_turn: self.data.current_turn.tts_start_time = self.data.tts_start_time def on_tts_first_byte(self): """Called when TTS produces first audio byte - this is our TTS latency""" if self.data.tts_start_time: now = time.perf_counter() # ttfb = now - self.data.tts_start_time // no need to take the difference as we are using the start time of the tts span if self.data.current_turn: self.data.current_turn.tts_end_time = now self.data.current_turn.ttfb = self._round_latency((self.data.current_turn.tts_end_time - self.data.tts_start_time)) logger.info(f"tts ttfb: {self.data.current_turn.ttfb}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"ttfb": self.data.current_turn.ttfb}) self.data.tts_first_byte_time = now def on_eou_start(self): """Called when EOU (End of Utterance) processing starts""" # Don't overwrite EOU timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return self.data.eou_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.eou_start_time = self.data.eou_start_time def on_eou_complete(self): """Called when EOU processing completes""" # Don't overwrite EOU timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return if self.data.eou_start_time: eou_end_time = time.perf_counter() eou_latency = eou_end_time - self.data.eou_start_time if self.data.current_turn: self.data.current_turn.eou_end_time = eou_end_time self.data.current_turn.eou_latency = self._round_latency(eou_latency) # self._end_timeline_event("eou_processing", eou_end_time) logger.info(f"eou latency: {self.data.current_turn.eou_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"eou_latency": self.data.current_turn.eou_latency}) self.data.eou_start_time = None def on_wait_for_additional_speech(self, duration: float, eou_probability: float): """Called when waiting for additional speech""" if self.data.current_turn: self.data.current_turn.wait_for_additional_speech_duration = self._round_latency(duration) self.data.current_turn.waited_for_additional_speech = True self.data.current_turn.eou_probability = eou_probability logger.info(f"wait for additional speech duration: {self.data.current_turn.wait_for_additional_speech_duration}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"wait_for_additional_speech_duration": self.data.current_turn.wait_for_additional_speech_duration}) def set_user_transcript(self, transcript: str): """Set the user transcript for the current turn and update timeline""" if self.data.current_turn: self.data.current_turn.user_speech = transcript logger.info(f"user input speech: {transcript}") user_speech_events = [event for event in self.data.current_turn.timeline if event.event_type == "user_speech"] if self.playground: self.playground_manager.send_cascading_metrics(metrics={"user_speech": self.data.current_turn.user_speech}) if user_speech_events: most_recent_event = user_speech_events[-1] most_recent_event.text = transcript else: current_time = time.perf_counter() self._start_timeline_event("user_speech", current_time) if self.data.current_turn.timeline: self.data.current_turn.timeline[-1].text = transcript def set_agent_response(self, response: str): """Set the agent response for the current turn and update timeline""" if self.data.current_turn: self.data.current_turn.agent_speech = response logger.info(f"agent output speech: {response}") if not any(event.event_type == "agent_speech" for event in self.data.current_turn.timeline): current_time = time.perf_counter() self._start_timeline_event("agent_speech", current_time) self._update_timeline_event_text("agent_speech", response) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"agent_speech": self.data.current_turn.agent_speech}) def add_function_tool_call(self, tool_name: str): """Track when a function tool is called in the current turn""" if self.data.current_turn: self.data.current_turn.function_tools_called.append(tool_name) tool_timestamp = { "tool_name": tool_name, "timestamp": time.perf_counter(), "readable_time": time.strftime("%H:%M:%S", time.localtime()) } self.data.current_turn.function_tool_timestamps.append(tool_timestamp) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"function_tool_timestamps": self.data.current_turn.function_tool_timestamps}) def add_error(self, source: str, message: str): """Add an error to the current turn""" if self.data.current_turn: self.data.current_turn.errors.append({ "source": source, "message": message, "timestamp": time.time() }) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"errors": self.data.current_turn.errors}) def on_fallback_event(self, event_data: Dict[str, Any]): """Record a fallback event for the current turn""" if not self.data.current_turn: self.start_new_interaction() if self.data.current_turn: fallback_event = FallbackEvent( component_type=event_data.get("component_type", ""), temporary_disable_sec=event_data.get("temporary_disable_sec", 0), permanent_disable_after_attempts=event_data.get("permanent_disable_after_attempts", 0), recovery_attempt=event_data.get("recovery_attempt", 0), message=event_data.get("message", ""), start_time=event_data.get("start_time"), end_time=event_data.get("end_time"), duration_ms=event_data.get("duration_ms"), original_provider_label=event_data.get("original_provider_label"), original_connection_start=event_data.get("original_connection_start"), original_connection_end=event_data.get("original_connection_end"), original_connection_duration_ms=event_data.get("original_connection_duration_ms"), new_provider_label=event_data.get("new_provider_label"), new_connection_start=event_data.get("new_connection_start"), new_connection_end=event_data.get("new_connection_end"), new_connection_duration_ms=event_data.get("new_connection_duration_ms"), is_recovery=event_data.get("is_recovery", False), ) self.data.current_turn.fallback_events.append(fallback_event) logger.info(f"Fallback event recorded: {event_data.get('component_type')} - {event_data.get('message')}") def set_a2a_handoff(self): """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.data.current_turn: self.data.current_turn.is_a2a_enabled = True self.data.current_turn.handoff_occurred = True if self.playground: self.playground_manager.send_cascading_metrics(metrics={"handoff_occurred": self.data.current_turn.handoff_occurred}) def set_llm_usage(self, usage: Dict[str, int]): """Set token usage and calculate TPS""" if not self.data.current_turn or not usage: return if self.data.current_turn: self.data.current_turn.prompt_tokens = usage.get("prompt_tokens") self.data.current_turn.completion_tokens = usage.get("completion_tokens") self.data.current_turn.total_tokens = usage.get("total_tokens") self.data.current_turn.prompt_cached_tokens = usage.get("prompt_cached_tokens") if self.data.current_turn and self.data.current_turn.llm_duration and self.data.current_turn.llm_duration > 0 and self.data.current_turn.completion_tokens > 0: latency_seconds = self.data.current_turn.llm_duration / 1000 self.data.current_turn.tokens_per_second = round(self.data.current_turn.completion_tokens / latency_seconds, 2) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"prompt_tokens": self.data.current_turn.prompt_tokens}) self.playground_manager.send_cascading_metrics(metrics={"completion_tokens": self.data.current_turn.completion_tokens}) self.playground_manager.send_cascading_metrics(metrics={"total_tokens": self.data.current_turn.total_tokens}) self.playground_manager.send_cascading_metrics(metrics={"prompt_cached_tokens": self.data.current_turn.prompt_cached_tokens}) self.playground_manager.send_cascading_metrics(metrics={"tokens_per_second": self.data.current_turn.tokens_per_second}) def add_tts_characters(self, count: int): """Add to the total character count for the current turn""" if self.data.current_turn: if self.data.current_turn.tts_characters: self.data.current_turn.tts_characters += count else: self.data.current_turn.tts_characters = count if self.playground: self.playground_manager.send_cascading_metrics(metrics={"tts_characters": self.data.current_turn.tts_characters}) def on_stt_preflight_end(self): """Called when STT preflight event received""" if self.data.current_turn and self.data.current_turn.stt_start_time: self.data.current_turn.stt_preflight_end_time = time.perf_counter() self.data.current_turn.stt_preflight_latency = self._round_latency(self.data.current_turn.stt_preflight_end_time - self.data.current_turn.stt_start_time) logger.info(f"stt preflight latency: {self.data.current_turn.stt_preflight_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"stt_preflight_latency": self.data.current_turn.stt_preflight_latency}) def on_stt_interim_end(self): """Called when STT interim event received""" if self.data.current_turn and self.data.current_turn.stt_start_time: self.data.current_turn.stt_interim_end_time = time.perf_counter() self.data.current_turn.stt_interim_latency = self._round_latency(self.data.current_turn.stt_interim_end_time - self.data.current_turn.stt_start_time) logger.info(f"stt interim latency: {self.data.current_turn.stt_interim_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"stt_interim_latency": self.data.current_turn.stt_interim_latency}) def on_llm_first_token(self): """Called when LLM first token received""" if self.data.current_turn: self.data.current_turn.llm_first_token_time = time.perf_counter() self.data.current_turn.llm_ttft = self._round_latency(self.data.current_turn.llm_first_token_time - self.data.current_turn.llm_start_time) logger.info(f"llm ttft: {self.data.current_turn.llm_ttft}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"llm_ttft": self.data.current_turn.llm_ttft}) def set_preemptive_generation_enabled(self, enabled: bool): if self.data: self.data.stt_preemptive_generation_enabled = enabled def on_stt_preemptive_generation(self, text: str, match: bool): if self.data.current_turn: self.data.current_turn.stt_preflight_transcript = text self.data.current_turn.stt_preemptive_generation_occurred = match def config_vad(self, min_silence_duration: float = None, min_speech_duration: float = None, threshold: float = None): """Configure VAD parameters for metrics tracking""" if self.data: if min_silence_duration is not None: self.data.vad_min_silence_duration = min_silence_duration if min_speech_duration is not None: self.data.vad_min_speech_duration = min_speech_duration if threshold is not None: self.data.vad_threshold = threshold def on_vad_end_of_speech(self): """Called when VAD detects end of speech""" if self.data.current_turn: self.data.current_turn.vad_end_of_speech_time = time.perf_counter() logger.info(f"VAD end of speech detected at {self.data.current_turn.vad_end_of_speech_time}") def set_recording_started(self, started: bool): self.data.recording_started = started def set_recording_stopped(self, stopped: bool): self.data.recording_stopped = stopped def set_metrics(self, min_speech_wait_timeout: float, max_speech_wait_timeout: float): if self.data: self.data.min_speech_wait_timeout = min_speech_wait_timeout self.data.max_speech_wait_timeout = max_speech_wait_timeout def set_playground_manager(self, manager: Optional["PlaygroundManager"]): self.playground = True self.playground_manager = manager # Background Audio tracking methods def on_background_audio_start(self, file_path: str = None, looping: bool = False): """Called when background audio starts playing""" now = time.perf_counter() self.data.background_audio_start_time = now # Extract just the filename, not the full path file_name = os.path.basename(file_path) if file_path else None if self.data.current_turn: self.data.current_turn.background_audio_file_path = file_name self.data.current_turn.background_audio_looping = looping self._start_timeline_event("background_audio", now) # Create session-level span for background audio start if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_start_span(file_path=file_name, looping=looping, start_time=now) logger.info(f"Background audio started: file={file_name}, looping={looping}") def on_background_audio_stop(self): """Called when background audio stops""" now = time.perf_counter() self.data.background_audio_end_time = now if self.data.current_turn: self._end_timeline_event("background_audio", now) # Create session-level span for background audio stop if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_stop_span(end_time=now) logger.info(f"Background audio stopped") # Thinking Audio tracking methods def on_thinking_audio_start(self, file_path: str = None, looping: bool = True, override_thinking: bool = True): """Called when thinking audio starts playing""" now = time.perf_counter() self.data.thinking_audio_start_time = now # Extract just the filename, not the full path file_name = os.path.basename(file_path) if file_path else None if self.data.current_turn: self.data.current_turn.thinking_audio_file_path = file_name self.data.current_turn.thinking_audio_looping = looping self.data.current_turn.thinking_audio_override_thinking = override_thinking self._start_timeline_event("thinking_audio", now) logger.info(f"Thinking audio started: file={file_name}, looping={looping}, override_thinking={override_thinking}") def on_thinking_audio_stop(self): """Called when thinking audio stops""" now = time.perf_counter() self.data.thinking_audio_end_time = now if self.data.current_turn: self._end_timeline_event("thinking_audio", now) logger.info(f"Thinking audio stopped")Collects and tracks performance metrics for AI agent turns
Methods
def add_error(self, source: str, message: str)-
Expand source code
def add_error(self, source: str, message: str): """Add an error to the current turn""" if self.data.current_turn: self.data.current_turn.errors.append({ "source": source, "message": message, "timestamp": time.time() }) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"errors": self.data.current_turn.errors})Add an error to the current turn
def add_function_tool_call(self, tool_name: str)-
Expand source code
def add_function_tool_call(self, tool_name: str): """Track when a function tool is called in the current turn""" if self.data.current_turn: self.data.current_turn.function_tools_called.append(tool_name) tool_timestamp = { "tool_name": tool_name, "timestamp": time.perf_counter(), "readable_time": time.strftime("%H:%M:%S", time.localtime()) } self.data.current_turn.function_tool_timestamps.append(tool_timestamp) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"function_tool_timestamps": self.data.current_turn.function_tool_timestamps})Track when a function tool is called in the current turn
def add_tts_characters(self, count: int)-
Expand source code
def add_tts_characters(self, count: int): """Add to the total character count for the current turn""" if self.data.current_turn: if self.data.current_turn.tts_characters: self.data.current_turn.tts_characters += count else: self.data.current_turn.tts_characters = count if self.playground: self.playground_manager.send_cascading_metrics(metrics={"tts_characters": self.data.current_turn.tts_characters})Add to the total character count for the current turn
def complete_current_turn(self) ‑> None-
Expand source code
def complete_current_turn(self) -> None: """Complete and store the current turn""" if self.data.current_turn: self._calculate_e2e_metrics(self.data.current_turn) if not self._validate_interaction_has_required_latencies(self.data.current_turn) and self.data.total_turns > 1: if self.data.current_turn.user_speech_start_time is not None: if (self.pending_user_start_time is None or self.data.current_turn.user_speech_start_time < self.pending_user_start_time): self.pending_user_start_time = self.data.current_turn.user_speech_start_time logger.info(f"[metrics] Caching earliest user start: {self.pending_user_start_time}") self.data.current_turn = None return if self.traces_flow_manager: self.traces_flow_manager.create_cascading_turn_trace(self.data.current_turn) self.data.turns.append(self.data.current_turn) if self.playground: self.playground_manager.send_cascading_metrics(metrics=self.data.current_turn, full_turn_data=True) interaction_data = asdict(self.data.current_turn) interaction_data['timeline'] = [asdict(event) for event in self.data.current_turn.timeline] transformed_data = self._transform_to_camel_case(interaction_data) # transformed_data = self._intify_latencies_and_timestamps(transformed_data) always_remove_fields = [ 'kb_start_time', 'kb_end_time', 'user_speech', 'stt_preflight_end_time', 'stt_interim_end_time', 'errors', 'functionToolTimestamps', 'sttStartTime', 'sttEndTime', 'ttsStartTime', 'ttsEndTime', 'llmStartTime', 'llmEndTime', 'eouStartTime', 'eouEndTime', 'is_a2a_enabled', "interactionId", "timestamp", ] if not self.data.current_turn.is_a2a_enabled: always_remove_fields.append("handOffOccurred") for field in always_remove_fields: if field in transformed_data: del transformed_data[field] if len(self.data.turns) > 1: provider_fields = [ 'systemInstructions', # 'llmProviderClass', 'llmModelName', # 'sttProviderClass', 'sttModelName', # 'ttsProviderClass', 'ttsModelName', 'eouProviderClass', 'eouModelName' 'vadProviderClass', 'vadModelName' ] for field in provider_fields: if field in transformed_data: del transformed_data[field] transformed_data = self._remove_negatives(transformed_data) interaction_payload = { "data": [transformed_data] } self.analytics_client.send_interaction_analytics_safe(interaction_payload) self.data.current_turn = None self.data.is_interrupted = False self.pending_user_start_time = NoneComplete and store the current turn
def config_vad(self,
min_silence_duration: float = None,
min_speech_duration: float = None,
threshold: float = None)-
Expand source code
def config_vad(self, min_silence_duration: float = None, min_speech_duration: float = None, threshold: float = None): """Configure VAD parameters for metrics tracking""" if self.data: if min_silence_duration is not None: self.data.vad_min_silence_duration = min_silence_duration if min_speech_duration is not None: self.data.vad_min_speech_duration = min_speech_duration if threshold is not None: self.data.vad_threshold = thresholdConfigure VAD parameters for metrics tracking
def on_agent_speech_end(self)-
Expand source code
def on_agent_speech_end(self): """Called when agent stops speaking""" self.data.is_agent_speaking = False agent_speech_end_time = time.perf_counter() if self.data.current_turn: self._end_timeline_event("agent_speech", agent_speech_end_time) self.data.current_turn.agent_speech_end_time = agent_speech_end_time if self.data.tts_start_time and self.data.tts_first_byte_time: total_tts_latency = self.data.tts_first_byte_time - self.data.tts_start_time if self.data.current_turn and self.data.current_turn.agent_speech_start_time: self.data.current_turn.tts_latency = self._round_latency(total_tts_latency) self.data.current_turn.agent_speech_duration = self._round_latency(agent_speech_end_time - self.data.current_turn.agent_speech_start_time) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"tts_latency": self.data.current_turn.tts_latency}) self.playground_manager.send_cascading_metrics(metrics={"agent_speech_duration": self.data.current_turn.agent_speech_duration}) self.data.tts_start_time = None self.data.tts_first_byte_time = None elif self.data.tts_start_time: # If we have start time but no first byte time, just reset self.data.tts_start_time = None self.data.tts_first_byte_time = NoneCalled when agent stops speaking
def on_agent_speech_start(self)-
Expand source code
def on_agent_speech_start(self): """Called when agent starts speaking (actual audio output)""" self.data.is_agent_speaking = True self.data.agent_speech_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.agent_speech_start_time = self.data.agent_speech_start_time if not any(event.event_type == "agent_speech" and event.end_time is None for event in self.data.current_turn.timeline): self._start_timeline_event("agent_speech", self.data.agent_speech_start_time)Called when agent starts speaking (actual audio output)
def on_background_audio_start(self, file_path: str = None, looping: bool = False)-
Expand source code
def on_background_audio_start(self, file_path: str = None, looping: bool = False): """Called when background audio starts playing""" now = time.perf_counter() self.data.background_audio_start_time = now # Extract just the filename, not the full path file_name = os.path.basename(file_path) if file_path else None if self.data.current_turn: self.data.current_turn.background_audio_file_path = file_name self.data.current_turn.background_audio_looping = looping self._start_timeline_event("background_audio", now) # Create session-level span for background audio start if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_start_span(file_path=file_name, looping=looping, start_time=now) logger.info(f"Background audio started: file={file_name}, looping={looping}")Called when background audio starts playing
def on_background_audio_stop(self)-
Expand source code
def on_background_audio_stop(self): """Called when background audio stops""" now = time.perf_counter() self.data.background_audio_end_time = now if self.data.current_turn: self._end_timeline_event("background_audio", now) # Create session-level span for background audio stop if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_stop_span(end_time=now) logger.info(f"Background audio stopped")Called when background audio stops
def on_eou_complete(self)-
Expand source code
def on_eou_complete(self): """Called when EOU processing completes""" # Don't overwrite EOU timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return if self.data.eou_start_time: eou_end_time = time.perf_counter() eou_latency = eou_end_time - self.data.eou_start_time if self.data.current_turn: self.data.current_turn.eou_end_time = eou_end_time self.data.current_turn.eou_latency = self._round_latency(eou_latency) # self._end_timeline_event("eou_processing", eou_end_time) logger.info(f"eou latency: {self.data.current_turn.eou_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"eou_latency": self.data.current_turn.eou_latency}) self.data.eou_start_time = NoneCalled when EOU processing completes
def on_eou_start(self)-
Expand source code
def on_eou_start(self): """Called when EOU (End of Utterance) processing starts""" # Don't overwrite EOU timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return self.data.eou_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.eou_start_time = self.data.eou_start_timeCalled when EOU (End of Utterance) processing starts
def on_fallback_event(self, event_data: Dict[str, Any])-
Expand source code
def on_fallback_event(self, event_data: Dict[str, Any]): """Record a fallback event for the current turn""" if not self.data.current_turn: self.start_new_interaction() if self.data.current_turn: fallback_event = FallbackEvent( component_type=event_data.get("component_type", ""), temporary_disable_sec=event_data.get("temporary_disable_sec", 0), permanent_disable_after_attempts=event_data.get("permanent_disable_after_attempts", 0), recovery_attempt=event_data.get("recovery_attempt", 0), message=event_data.get("message", ""), start_time=event_data.get("start_time"), end_time=event_data.get("end_time"), duration_ms=event_data.get("duration_ms"), original_provider_label=event_data.get("original_provider_label"), original_connection_start=event_data.get("original_connection_start"), original_connection_end=event_data.get("original_connection_end"), original_connection_duration_ms=event_data.get("original_connection_duration_ms"), new_provider_label=event_data.get("new_provider_label"), new_connection_start=event_data.get("new_connection_start"), new_connection_end=event_data.get("new_connection_end"), new_connection_duration_ms=event_data.get("new_connection_duration_ms"), is_recovery=event_data.get("is_recovery", False), ) self.data.current_turn.fallback_events.append(fallback_event) logger.info(f"Fallback event recorded: {event_data.get('component_type')} - {event_data.get('message')}")Record a fallback event for the current turn
def on_false_interrupt_escalated(self, word_count: int | None = None)-
Expand source code
def on_false_interrupt_escalated(self, word_count: Optional[int] = None): """Called when a false interrupt escalates to a true interrupt (user continued speaking)""" if self.data.current_turn: self.data.current_turn.is_false_interrupt = True self.data.current_turn.false_interrupt_end_time = time.perf_counter() if self.data.current_turn.false_interrupt_start_time: self.data.current_turn.false_interrupt_duration = self._round_latency( self.data.current_turn.false_interrupt_end_time - self.data.current_turn.false_interrupt_start_time ) self.data.current_turn.resumed_after_false_interrupt = False if word_count is not None: self.data.current_turn.false_interrupt_words = word_count logger.info(f"False interrupt escalated to true interrupt after {self.data.current_turn.false_interrupt_duration}ms")Called when a false interrupt escalates to a true interrupt (user continued speaking)
def on_false_interrupt_resume(self)-
Expand source code
def on_false_interrupt_resume(self): """Called when TTS resumes after false interrupt timeout (user didn't continue speaking)""" if self.data.current_turn: self.data.current_turn.is_false_interrupt = True self.data.current_turn.false_interrupt_end_time = time.perf_counter() if self.data.current_turn.false_interrupt_start_time: self.data.current_turn.false_interrupt_duration = self._round_latency( self.data.current_turn.false_interrupt_end_time - self.data.current_turn.false_interrupt_start_time ) self.data.current_turn.resumed_after_false_interrupt = True # Reset interrupted flag and data since this was NOT a true interrupt self.data.current_turn.interrupted = False self.data.current_turn.interrupt_start_time = None self.data.current_turn.interrupt_end_time = None self.data.current_turn.interrupt_words = None self.data.current_turn.interrupt_duration = None self.data.current_turn.interrupt_reason = [] self.data.is_interrupted = False logger.info(f"False interrupt ended - TTS resumed after {self.data.current_turn.false_interrupt_duration}ms")Called when TTS resumes after false interrupt timeout (user didn't continue speaking)
def on_false_interrupt_start(self, duration: float)-
Expand source code
def on_false_interrupt_start(self, duration: float): """Called when false interrupt timer starts (potential resume scenario)""" if self.data.current_turn: self.data.current_turn.false_interrupt_start_time = time.perf_counter() logger.info(f"False interrupt started - waiting {duration}s to determine if real interrupt")Called when false interrupt timer starts (potential resume scenario)
def on_interrupt_trigger(self, word_count: int | None = 0, duration: float | None = 0)-
Expand source code
def on_interrupt_trigger(self, word_count: Optional[int] = 0, duration: Optional[float] = 0): """Called when the user interrupts the agent""" if self.data.is_interrupted: return if self.data.is_agent_speaking: if self.data.current_turn: self.data.current_turn.interrupt_words = word_count self.data.current_turn.interrupt_duration = duration if self.data.current_turn.interrupt_words: self.data.current_turn.interrupt_reason.append("STT") if self.data.current_turn.interrupt_duration: self.data.current_turn.interrupt_reason.append("VAD")Called when the user interrupts the agent
def on_interrupted(self)-
Expand source code
def on_interrupted(self): """Called when the user interrupts the agent""" if self.data.is_interrupted: return if self.data.is_agent_speaking: self.data.total_interruptions += 1 self.data.is_interrupted = True if self.data.current_turn: self.data.current_turn.interrupted = True self.data.current_turn.interrupt_start_time = time.perf_counter() logger.info(f"User interrupted the agent. Total interruptions: {self.data.total_interruptions}") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"interrupted": self.data.current_turn.interrupted})Called when the user interrupts the agent
def on_knowledge_base_complete(self, documents: List[str], scores: List[float], record_id: List[str] = None)-
Expand source code
def on_knowledge_base_complete(self, documents: List[str], scores: List[float], record_id: List[str] = None): """Called when knowledge base processing completes""" if self.data.knowledge_base_start_time: knowledge_base_end_time = time.perf_counter() kb_retrieval_latency = knowledge_base_end_time - self.data.knowledge_base_start_time if self.data.current_turn: self.data.current_turn.kb_documents = documents self.data.current_turn.kb_scores = scores self.data.current_turn.kb_record_ids = record_id self.data.current_turn.kb_end_time = knowledge_base_end_time self.data.current_turn.kb_retrieval_latency = self._round_latency(kb_retrieval_latency) logger.info(f"knowledge base retrieval latency: {self.data.current_turn.kb_retrieval_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"kb_retrieval_latency": self.data.current_turn.kb_retrieval_latency}) self.data.knowledge_base_start_time = NoneCalled when knowledge base processing completes
def on_knowledge_base_start(self)-
Expand source code
def on_knowledge_base_start(self): """Called when knowledge base processing starts""" self.data.knowledge_base_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.kb_start_time = self.data.knowledge_base_start_timeCalled when knowledge base processing starts
def on_llm_complete(self)-
Expand source code
def on_llm_complete(self): """Called when LLM processing completes""" if self.data.llm_start_time: llm_end_time = time.perf_counter() llm_duration = llm_end_time - self.data.llm_start_time if self.data.current_turn: self.data.current_turn.llm_end_time = llm_end_time self.data.current_turn.llm_duration = self._round_latency(llm_duration) logger.info(f"llm duration: {self.data.current_turn.llm_duration}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"llm_duration": self.data.current_turn.llm_duration}) self.data.llm_start_time = NoneCalled when LLM processing completes
def on_llm_first_token(self)-
Expand source code
def on_llm_first_token(self): """Called when LLM first token received""" if self.data.current_turn: self.data.current_turn.llm_first_token_time = time.perf_counter() self.data.current_turn.llm_ttft = self._round_latency(self.data.current_turn.llm_first_token_time - self.data.current_turn.llm_start_time) logger.info(f"llm ttft: {self.data.current_turn.llm_ttft}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"llm_ttft": self.data.current_turn.llm_ttft})Called when LLM first token received
def on_llm_input(self, text: str)-
Expand source code
def on_llm_input(self, text: str): """Record the actual text sent to LLM""" if self.data.current_turn: self.data.current_turn.llm_input = textRecord the actual text sent to LLM
def on_llm_start(self)-
Expand source code
def on_llm_start(self): """Called when LLM processing starts""" self.data.llm_start_time = time.perf_counter() # Lock timestamps to prevent STT/EOU overwrites from subsequent events self.data.turn_timestamps_locked = True if self.data.current_turn: self.data.current_turn.llm_start_time = self.data.llm_start_timeCalled when LLM processing starts
def on_stt_complete(self, user_text: str)-
Expand source code
def on_stt_complete(self, user_text: str): """Called when STT processing completes""" # Don't overwrite STT timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return if self.data.current_turn and self.data.current_turn.stt_preemptive_generation_enabled and self.data.current_turn.stt_preemptive_generation_occurred: logger.info("STT preemptive generation occurred, skipping stt complete") return if self.data.stt_start_time: stt_end_time = time.perf_counter() stt_latency = stt_end_time - self.data.stt_start_time if self.data.current_turn: self.data.current_turn.stt_transcript = user_text self.data.current_turn.stt_end_time = stt_end_time self.data.current_turn.stt_latency = self._round_latency(stt_latency) logger.info(f"stt latency: {self.data.current_turn.stt_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"stt_latency": self.data.current_turn.stt_latency}) self.data.stt_start_time = NoneCalled when STT processing completes
def on_stt_interim_end(self)-
Expand source code
def on_stt_interim_end(self): """Called when STT interim event received""" if self.data.current_turn and self.data.current_turn.stt_start_time: self.data.current_turn.stt_interim_end_time = time.perf_counter() self.data.current_turn.stt_interim_latency = self._round_latency(self.data.current_turn.stt_interim_end_time - self.data.current_turn.stt_start_time) logger.info(f"stt interim latency: {self.data.current_turn.stt_interim_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"stt_interim_latency": self.data.current_turn.stt_interim_latency})Called when STT interim event received
def on_stt_metrics(self, metrics: Dict[str, Any])-
Expand source code
def on_stt_metrics(self, metrics: Dict[str, Any]): if self.data.current_turn: self.data.current_turn.stt_input_tokens = metrics.get("input_tokens") self.data.current_turn.stt_output_tokens = metrics.get("output_tokens") self.data.current_turn.stt_total_tokens = metrics.get("total_tokens") def on_stt_preemptive_generation(self, text: str, match: bool)-
Expand source code
def on_stt_preemptive_generation(self, text: str, match: bool): if self.data.current_turn: self.data.current_turn.stt_preflight_transcript = text self.data.current_turn.stt_preemptive_generation_occurred = match def on_stt_preflight_end(self)-
Expand source code
def on_stt_preflight_end(self): """Called when STT preflight event received""" if self.data.current_turn and self.data.current_turn.stt_start_time: self.data.current_turn.stt_preflight_end_time = time.perf_counter() self.data.current_turn.stt_preflight_latency = self._round_latency(self.data.current_turn.stt_preflight_end_time - self.data.current_turn.stt_start_time) logger.info(f"stt preflight latency: {self.data.current_turn.stt_preflight_latency}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"stt_preflight_latency": self.data.current_turn.stt_preflight_latency})Called when STT preflight event received
def on_stt_response(self, duration: float, confidence: float)-
Expand source code
def on_stt_response(self, duration: float, confidence: float): if not self.data.current_turn: self.start_new_interaction() if self.data.current_turn: self.data.current_turn.stt_duration = duration self.data.current_turn.stt_confidence = confidence logger.info(f"Stt duration {duration}, confidence {confidence}") def on_stt_start(self)-
Expand source code
def on_stt_start(self): """Called when STT processing starts""" # Don't overwrite STT timestamps if turn is locked (LLM has started) if self.data.turn_timestamps_locked: return self.data.stt_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.stt_start_time = self.data.stt_start_timeCalled when STT processing starts
def on_thinking_audio_start(self, file_path: str = None, looping: bool = True, override_thinking: bool = True)-
Expand source code
def on_thinking_audio_start(self, file_path: str = None, looping: bool = True, override_thinking: bool = True): """Called when thinking audio starts playing""" now = time.perf_counter() self.data.thinking_audio_start_time = now # Extract just the filename, not the full path file_name = os.path.basename(file_path) if file_path else None if self.data.current_turn: self.data.current_turn.thinking_audio_file_path = file_name self.data.current_turn.thinking_audio_looping = looping self.data.current_turn.thinking_audio_override_thinking = override_thinking self._start_timeline_event("thinking_audio", now) logger.info(f"Thinking audio started: file={file_name}, looping={looping}, override_thinking={override_thinking}")Called when thinking audio starts playing
def on_thinking_audio_stop(self)-
Expand source code
def on_thinking_audio_stop(self): """Called when thinking audio stops""" now = time.perf_counter() self.data.thinking_audio_end_time = now if self.data.current_turn: self._end_timeline_event("thinking_audio", now) logger.info(f"Thinking audio stopped")Called when thinking audio stops
def on_tts_first_byte(self)-
Expand source code
def on_tts_first_byte(self): """Called when TTS produces first audio byte - this is our TTS latency""" if self.data.tts_start_time: now = time.perf_counter() # ttfb = now - self.data.tts_start_time // no need to take the difference as we are using the start time of the tts span if self.data.current_turn: self.data.current_turn.tts_end_time = now self.data.current_turn.ttfb = self._round_latency((self.data.current_turn.tts_end_time - self.data.tts_start_time)) logger.info(f"tts ttfb: {self.data.current_turn.ttfb}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"ttfb": self.data.current_turn.ttfb}) self.data.tts_first_byte_time = nowCalled when TTS produces first audio byte - this is our TTS latency
def on_tts_start(self)-
Expand source code
def on_tts_start(self): """Called when TTS processing starts""" self.data.tts_start_time = time.perf_counter() self.data.tts_first_byte_time = None if self.data.current_turn: self.data.current_turn.tts_start_time = self.data.tts_start_timeCalled when TTS processing starts
def on_user_speech_end(self)-
Expand source code
def on_user_speech_end(self): """Called when user stops speaking""" self.data.is_user_speaking = False self.data.user_speech_end_time = time.perf_counter() if self.data.current_turn and self.data.current_turn.user_speech_start_time: self.data.current_turn.user_speech_end_time = self.data.user_speech_end_time self.data.current_turn.user_speech_duration = self._round_latency(self.data.current_turn.user_speech_end_time - self.data.current_turn.user_speech_start_time) self._end_timeline_event("user_speech", self.data.user_speech_end_time) logger.info(f"user speech duration: {self.data.current_turn.user_speech_duration}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"user_speech_duration": self.data.current_turn.user_speech_duration})Called when user stops speaking
def on_user_speech_start(self)-
Expand source code
def on_user_speech_start(self): """Called when user starts speaking""" if self.data.is_user_speaking: return if not self.data.current_turn: self.start_new_interaction() self.data.is_user_speaking = True self.data.user_input_start_time = time.perf_counter() if self.data.current_turn: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(event.event_type == "user_speech" for event in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time)Called when user starts speaking
def on_vad_end_of_speech(self)-
Expand source code
def on_vad_end_of_speech(self): """Called when VAD detects end of speech""" if self.data.current_turn: self.data.current_turn.vad_end_of_speech_time = time.perf_counter() logger.info(f"VAD end of speech detected at {self.data.current_turn.vad_end_of_speech_time}")Called when VAD detects end of speech
def on_wait_for_additional_speech(self, duration: float, eou_probability: float)-
Expand source code
def on_wait_for_additional_speech(self, duration: float, eou_probability: float): """Called when waiting for additional speech""" if self.data.current_turn: self.data.current_turn.wait_for_additional_speech_duration = self._round_latency(duration) self.data.current_turn.waited_for_additional_speech = True self.data.current_turn.eou_probability = eou_probability logger.info(f"wait for additional speech duration: {self.data.current_turn.wait_for_additional_speech_duration}ms") if self.playground: self.playground_manager.send_cascading_metrics(metrics={"wait_for_additional_speech_duration": self.data.current_turn.wait_for_additional_speech_duration})Called when waiting for additional speech
def set_a2a_handoff(self)-
Expand source code
def set_a2a_handoff(self): """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.data.current_turn: self.data.current_turn.is_a2a_enabled = True self.data.current_turn.handoff_occurred = True if self.playground: self.playground_manager.send_cascading_metrics(metrics={"handoff_occurred": self.data.current_turn.handoff_occurred})Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.
def set_agent_response(self, response: str)-
Expand source code
def set_agent_response(self, response: str): """Set the agent response for the current turn and update timeline""" if self.data.current_turn: self.data.current_turn.agent_speech = response logger.info(f"agent output speech: {response}") if not any(event.event_type == "agent_speech" for event in self.data.current_turn.timeline): current_time = time.perf_counter() self._start_timeline_event("agent_speech", current_time) self._update_timeline_event_text("agent_speech", response) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"agent_speech": self.data.current_turn.agent_speech})Set the agent response for the current turn and update timeline
def set_interrupt_config(self,
mode: Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID'],
min_duration: float | None = None,
min_words: int | None = None,
false_interrupt_pause_duration: float | None = None,
resume_on_false_interrupt: bool | None = None)-
Expand source code
def set_interrupt_config(self, mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"], min_duration: Optional[float] = None, min_words: Optional[int] = None, false_interrupt_pause_duration: Optional[float] = None, resume_on_false_interrupt: Optional[bool] = None): self.data.interrupt_mode = mode self.data.interrupt_min_duration = min_duration self.data.interrupt_min_words = min_words self.data.false_interrupt_pause_duration = false_interrupt_pause_duration self.data.resume_on_false_interrupt = resume_on_false_interrupt def set_llm_input(self, text: str)-
Expand source code
def set_llm_input(self, text: str): """Record the actual text sent to LLM""" if self.data.current_turn: self.data.current_turn.llm_input = textRecord the actual text sent to LLM
def set_llm_usage(self, usage: Dict[str, int])-
Expand source code
def set_llm_usage(self, usage: Dict[str, int]): """Set token usage and calculate TPS""" if not self.data.current_turn or not usage: return if self.data.current_turn: self.data.current_turn.prompt_tokens = usage.get("prompt_tokens") self.data.current_turn.completion_tokens = usage.get("completion_tokens") self.data.current_turn.total_tokens = usage.get("total_tokens") self.data.current_turn.prompt_cached_tokens = usage.get("prompt_cached_tokens") if self.data.current_turn and self.data.current_turn.llm_duration and self.data.current_turn.llm_duration > 0 and self.data.current_turn.completion_tokens > 0: latency_seconds = self.data.current_turn.llm_duration / 1000 self.data.current_turn.tokens_per_second = round(self.data.current_turn.completion_tokens / latency_seconds, 2) if self.playground: self.playground_manager.send_cascading_metrics(metrics={"prompt_tokens": self.data.current_turn.prompt_tokens}) self.playground_manager.send_cascading_metrics(metrics={"completion_tokens": self.data.current_turn.completion_tokens}) self.playground_manager.send_cascading_metrics(metrics={"total_tokens": self.data.current_turn.total_tokens}) self.playground_manager.send_cascading_metrics(metrics={"prompt_cached_tokens": self.data.current_turn.prompt_cached_tokens}) self.playground_manager.send_cascading_metrics(metrics={"tokens_per_second": self.data.current_turn.tokens_per_second})Set token usage and calculate TPS
def set_metrics(self, min_speech_wait_timeout: float, max_speech_wait_timeout: float)-
Expand source code
def set_metrics(self, min_speech_wait_timeout: float, max_speech_wait_timeout: float): if self.data: self.data.min_speech_wait_timeout = min_speech_wait_timeout self.data.max_speech_wait_timeout = max_speech_wait_timeout def set_playground_manager(self, manager: ForwardRef('PlaygroundManager') | None)-
Expand source code
def set_playground_manager(self, manager: Optional["PlaygroundManager"]): self.playground = True self.playground_manager = manager def set_preemptive_generation_enabled(self, enabled: bool)-
Expand source code
def set_preemptive_generation_enabled(self, enabled: bool): if self.data: self.data.stt_preemptive_generation_enabled = enabled def set_provider_info(self,
llm_provider: str = '',
llm_model: str = '',
stt_provider: str = '',
stt_model: str = '',
tts_provider: str = '',
tts_model: str = '',
vad_provider: str = '',
vad_model: str = '',
eou_provider: str = '',
eou_model: str = '')-
Expand source code
def set_provider_info(self, llm_provider: str = "", llm_model: str = "", stt_provider: str = "", stt_model: str = "", tts_provider: str = "", tts_model: str = "", vad_provider: str = "", vad_model: str = "", eou_provider: str = "", eou_model: str = ""): """Set the provider class and model information for this session""" self.data.llm_provider_class = llm_provider self.data.llm_model_name = llm_model self.data.stt_provider_class = stt_provider self.data.stt_model_name = stt_model self.data.tts_provider_class = tts_provider self.data.tts_model_name = tts_model self.data.vad_provider_class = vad_provider self.data.vad_model_name = vad_model self.data.eou_provider_class = eou_provider self.data.eou_model_name = eou_modelSet the provider class and model information for this session
def set_recording_started(self, started: bool)-
Expand source code
def set_recording_started(self, started: bool): self.data.recording_started = started def set_recording_stopped(self, stopped: bool)-
Expand source code
def set_recording_stopped(self, stopped: bool): self.data.recording_stopped = stopped def set_session_id(self, session_id: str)-
Expand source code
def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.data.session_id = session_id self.analytics_client.set_session_id(session_id)Set the session ID for metrics tracking
def set_system_instructions(self, instructions: str)-
Expand source code
def set_system_instructions(self, instructions: str): """Set the system instructions for this session""" self.data.system_instructions = instructionsSet the system instructions for this session
def set_traces_flow_manager(self,
manager: TracesFlowManager)-
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance""" self.traces_flow_manager = managerSet the TracesFlowManager instance
def set_user_transcript(self, transcript: str)-
Expand source code
def set_user_transcript(self, transcript: str): """Set the user transcript for the current turn and update timeline""" if self.data.current_turn: self.data.current_turn.user_speech = transcript logger.info(f"user input speech: {transcript}") user_speech_events = [event for event in self.data.current_turn.timeline if event.event_type == "user_speech"] if self.playground: self.playground_manager.send_cascading_metrics(metrics={"user_speech": self.data.current_turn.user_speech}) if user_speech_events: most_recent_event = user_speech_events[-1] most_recent_event.text = transcript else: current_time = time.perf_counter() self._start_timeline_event("user_speech", current_time) if self.data.current_turn.timeline: self.data.current_turn.timeline[-1].text = transcriptSet the user transcript for the current turn and update timeline
def start_new_interaction(self, user_transcript: str = '') ‑> None-
Expand source code
def start_new_interaction(self, user_transcript: str = "") -> None: """Start tracking a new user-agent turn""" if self.data.current_turn: self.complete_current_turn() # Reset the lock for the new turn self.data.turn_timestamps_locked = False # Clear all start times to prevent stale timestamps from previous turns self.data.stt_start_time = None self.data.llm_start_time = None self.data.tts_start_time = None self.data.eou_start_time = None self.data.knowledge_base_start_time = None self.data.total_turns += 1 self.data.current_turn = CascadingTurnData( system_instructions=self.data.system_instructions if self.data.total_turns == 1 else "", # Provider and model info should be included in every turn llm_provider_class=self.data.llm_provider_class, llm_model_name=self.data.llm_model_name, stt_provider_class=self.data.stt_provider_class, stt_model_name=self.data.stt_model_name, tts_provider_class=self.data.tts_provider_class, tts_model_name=self.data.tts_model_name, vad_provider_class=self.data.vad_provider_class, vad_model_name=self.data.vad_model_name, vad_min_silence_duration=self.data.vad_min_silence_duration, vad_min_speech_duration=self.data.vad_min_speech_duration, vad_threshold=self.data.vad_threshold, eou_provider_class=self.data.eou_provider_class, eou_model_name=self.data.eou_model_name, stt_preemptive_generation_enabled=self.data.stt_preemptive_generation_enabled, min_speech_wait_timeout=self.data.min_speech_wait_timeout, max_speech_wait_timeout=self.data.max_speech_wait_timeout, interrupt_mode=self.data.interrupt_mode, interrupt_min_duration=self.data.interrupt_min_duration, interrupt_min_words=self.data.interrupt_min_words, false_interrupt_pause_duration=self.data.false_interrupt_pause_duration, resume_on_false_interrupt=self.data.resume_on_false_interrupt ) if self.pending_user_start_time is not None: self.data.current_turn.user_speech_start_time = self.pending_user_start_time self._start_timeline_event("user_speech", self.pending_user_start_time) if self.data.is_user_speaking and self.data.user_input_start_time: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time) if user_transcript: self.set_user_transcript(user_transcript)Start tracking a new user-agent turn
def update_provider_class(self, component_type: str, provider_class: str)-
Expand source code
def update_provider_class(self, component_type: str, provider_class: str): """Update the provider class for a specific component when fallback occurs. This updates both the session-level data and the current turn data if exists. Args: component_type: "STT", "LLM", or "TTS" provider_class: The new provider class name (e.g., "GoogleLLM") """ component_map = { "STT": ("stt_provider_class", "stt_provider_class"), "LLM": ("llm_provider_class", "llm_provider_class"), "TTS": ("tts_provider_class", "tts_provider_class"), } if component_type in component_map: session_attr, turn_attr = component_map[component_type] # Update session-level data setattr(self.data, session_attr, provider_class) # Update current turn if exists if self.data.current_turn: setattr(self.data.current_turn, turn_attr, provider_class) logger.info(f"Updated {component_type} provider class to: {provider_class}")Update the provider class for a specific component when fallback occurs. This updates both the session-level data and the current turn data if exists.
Args
component_type- "STT", "LLM", or "TTS"
provider_class- The new provider class name (e.g., "GoogleLLM")
class CascadingMetricsData (session_id: str | None = None,
session_start_time: float = <factory>,
system_instructions: str = '',
total_interruptions: int = 0,
total_turns: int = 0,
turns: List[CascadingTurnData] = <factory>,
current_turn: CascadingTurnData | None = None,
user_speech_end_time: float | None = None,
agent_speech_start_time: float | None = None,
agent_speech_end_time: float | None = None,
stt_start_time: float | None = None,
llm_start_time: float | None = None,
tts_start_time: float | None = None,
eou_start_time: float | None = None,
eou_probability: float | None = None,
wait_for_additional_speech_duration: float | None = None,
waited_for_additional_speech: bool = (False,),
min_speech_wait_timeout: float | None = (None,),
max_speech_wait_timeout: float | None = (None,),
user_input_start_time: float | None = None,
is_agent_speaking: bool = False,
is_user_speaking: bool = False,
tts_first_byte_time: float | None = None,
stt_preemptive_generation_enabled: bool = False,
turn_timestamps_locked: bool = False,
is_interrupted: bool = False,
interrupt_mode: Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID'] = 'HYBRID',
interrupt_min_duration: float | None = None,
interrupt_min_words: int | None = None,
false_interrupt_pause_duration: float | None = None,
resume_on_false_interrupt: bool | None = None,
is_false_interrupt: bool = False,
false_interrupt_duration: float | None = None,
false_interrupt_words: int | None = None,
false_interrupt_start_time: float | None = None,
false_interrupt_end_time: float | None = None,
resumed_after_false_interrupt: bool = False,
vad_min_silence_duration: float | None = None,
vad_min_speech_duration: float | None = None,
vad_threshold: float | None = None,
llm_provider_class: str = '',
llm_model_name: str = '',
stt_provider_class: str = '',
stt_model_name: str = '',
tts_provider_class: str = '',
tts_model_name: str = '',
vad_provider_class: str = '',
vad_model_name: str = '',
eou_provider_class: str = '',
eou_model_name: str = '',
background_audio_start_time: float | None = None,
background_audio_end_time: float | None = None,
thinking_audio_start_time: float | None = None,
thinking_audio_end_time: float | None = None)-
Expand source code
@dataclass class CascadingMetricsData: """Data structure to hold all metrics for a session""" session_id: Optional[str] = None session_start_time: float = field(default_factory=time.time) system_instructions: str = "" total_interruptions: int = 0 total_turns: int = 0 turns: List[CascadingTurnData] = field(default_factory=list) current_turn: Optional[CascadingTurnData] = None user_speech_end_time: Optional[float] = None agent_speech_start_time: Optional[float] = None agent_speech_end_time: Optional[float] = None stt_start_time: Optional[float] = None llm_start_time: Optional[float] = None tts_start_time: Optional[float] = None eou_start_time: Optional[float] = None eou_probability: Optional[float] = None wait_for_additional_speech_duration: Optional[float] = None waited_for_additional_speech: bool = False, min_speech_wait_timeout: Optional[float] = None, max_speech_wait_timeout: Optional[float] = None, user_input_start_time: Optional[float] = None is_agent_speaking: bool = False is_user_speaking: bool = False tts_first_byte_time: Optional[float] = None stt_preemptive_generation_enabled: bool = False # Lock flag to prevent STT/EOU timestamps from being overwritten once LLM starts turn_timestamps_locked: bool = False is_interrupted: bool = False interrupt_mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"] = "HYBRID" interrupt_min_duration: Optional[float] = None interrupt_min_words: Optional[int] = None false_interrupt_pause_duration: Optional[float] = None resume_on_false_interrupt: Optional[bool] = None is_false_interrupt: bool = False false_interrupt_duration: Optional[float] = None false_interrupt_words: Optional[int] = None false_interrupt_start_time: Optional[float] = None false_interrupt_end_time: Optional[float] = None resumed_after_false_interrupt: bool = False vad_min_silence_duration: Optional[float] = None vad_min_speech_duration: Optional[float] = None vad_threshold: Optional[float] = None llm_provider_class: str = "" llm_model_name: str = "" stt_provider_class: str = "" stt_model_name: str = "" tts_provider_class: str = "" tts_model_name: str = "" vad_provider_class: str = "" vad_model_name: str = "" eou_provider_class: str = "" eou_model_name: str = "" # Background Audio session-level tracking background_audio_start_time: Optional[float] = None background_audio_end_time: Optional[float] = None # Thinking Audio session-level tracking thinking_audio_start_time: Optional[float] = None thinking_audio_end_time: Optional[float] = NoneData structure to hold all metrics for a session
Instance variables
var agent_speech_end_time : float | Nonevar agent_speech_start_time : float | Nonevar background_audio_end_time : float | Nonevar background_audio_start_time : float | Nonevar current_turn : CascadingTurnData | Nonevar eou_model_name : strvar eou_probability : float | Nonevar eou_provider_class : strvar eou_start_time : float | Nonevar false_interrupt_duration : float | Nonevar false_interrupt_end_time : float | Nonevar false_interrupt_pause_duration : float | Nonevar false_interrupt_start_time : float | Nonevar false_interrupt_words : int | Nonevar interrupt_min_duration : float | Nonevar interrupt_min_words : int | Nonevar interrupt_mode : Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID']var is_agent_speaking : boolvar is_false_interrupt : boolvar is_interrupted : boolvar is_user_speaking : boolvar llm_model_name : strvar llm_provider_class : strvar llm_start_time : float | Nonevar max_speech_wait_timeout : float | Nonevar min_speech_wait_timeout : float | Nonevar resume_on_false_interrupt : bool | Nonevar resumed_after_false_interrupt : boolvar session_id : str | Nonevar session_start_time : floatvar stt_model_name : strvar stt_preemptive_generation_enabled : boolvar stt_provider_class : strvar stt_start_time : float | Nonevar system_instructions : strvar thinking_audio_end_time : float | Nonevar thinking_audio_start_time : float | Nonevar total_interruptions : intvar total_turns : intvar tts_first_byte_time : float | Nonevar tts_model_name : strvar tts_provider_class : strvar tts_start_time : float | Nonevar turn_timestamps_locked : boolvar turns : List[CascadingTurnData]var user_input_start_time : float | Nonevar user_speech_end_time : float | Nonevar vad_min_silence_duration : float | Nonevar vad_min_speech_duration : float | Nonevar vad_model_name : strvar vad_provider_class : strvar vad_threshold : float | Nonevar wait_for_additional_speech_duration : float | Nonevar waited_for_additional_speech : bool
class CascadingTurnData (user_speech_start_time: float | None = None,
user_speech_end_time: float | None = None,
user_speech_duration: float | None = None,
user_speech: str | None = None,
llm_input: str | None = None,
agent_speech_start_time: float | None = None,
agent_speech_end_time: float | None = None,
agent_speech_duration: float | None = None,
agent_speech: str | None = None,
kb_documents: List[str] | None = None,
kb_scores: List[float] | None = None,
kb_retrieval_latency: float | None = None,
kb_start_time: float | None = None,
kb_end_time: float | None = None,
stt_confidence: float | None = None,
stt_input_tokens: int | None = 0,
stt_output_tokens: int | None = 0,
stt_total_tokens: int | None = 0,
stt_latency: float | None = None,
stt_start_time: float | None = None,
stt_end_time: float | None = None,
stt_duration: float | None = None,
stt_preflight_end_time: float | None = None,
stt_preflight_latency: float | None = None,
stt_interim_end_time: float | None = None,
stt_interim_latency: float | None = None,
stt_ttfw: float | None = None,
stt_preemptive_generation_occurred: bool = False,
stt_transcript: str | None = None,
stt_preflight_transcript: str | None = None,
stt_preemptive_generation_enabled: bool = False,
llm_latency: float | None = None,
llm_start_time: float | None = None,
llm_end_time: float | None = None,
llm_duration: float | None = None,
llm_first_token_time: float | None = None,
llm_ttft: float | None = None,
prompt_tokens: int | None = 0,
completion_tokens: int | None = 0,
total_tokens: int | None = 0,
prompt_cached_tokens: int | None = 0,
tokens_per_second: float | None = 0,
tts_latency: float | None = None,
tts_start_time: float | None = None,
tts_end_time: float | None = None,
tts_duration: float | None = None,
tts_characters: int | None = 0,
ttfb: float | None = None,
eou_latency: float | None = None,
eou_start_time: float | None = None,
eou_end_time: float | None = None,
eou_probability: float | None = None,
min_speech_wait_timeout: float | None = None,
max_speech_wait_timeout: float | None = None,
eou_avg_delay: float | None = None,
waited_for_additional_speech: bool = False,
wait_for_additional_speech_duration: float | None = None,
interrupted: bool = False,
interrupt_words: int | None = None,
interrupt_duration: float | None = None,
interrupt_reason: List[str] = <factory>,
interrupt_start_time: float | None = None,
interrupt_end_time: float | None = None,
is_false_interrupt: bool = False,
false_interrupt_duration: float | None = None,
false_interrupt_words: int | None = None,
false_interrupt_start_time: float | None = None,
false_interrupt_end_time: float | None = None,
resumed_after_false_interrupt: bool = False,
interrupt_mode: Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID'] = 'HYBRID',
interrupt_min_duration: float | None = None,
interrupt_min_words: int | None = None,
false_interrupt_pause_duration: float | None = None,
resume_on_false_interrupt: bool | None = None,
function_tool_timestamps: List[Dict[str, Any]] = <factory>,
e2e_latency: float | None = None,
timestamp: float = <factory>,
function_tools_called: List[str] = <factory>,
system_instructions: str = '',
llm_provider_class: str = '',
llm_model_name: str = '',
stt_provider_class: str = '',
stt_model_name: str = '',
tts_provider_class: str = '',
tts_model_name: str = '',
vad_provider_class: str = '',
vad_model_name: str = '',
vad_min_silence_duration: float | None = None,
vad_min_speech_duration: float | None = None,
vad_threshold: float | None = None,
vad_end_of_speech_time: float | None = None,
eou_provider_class: str = '',
eou_model_name: str = '',
timeline: List[TimelineEvent] = <factory>,
errors: List[Dict[str, Any]] = <factory>,
fallback_events: List[ForwardRef('FallbackEvent')] = <factory>,
is_a2a_enabled: bool = False,
handoff_occurred: bool = False,
background_audio_file_path: str | None = None,
background_audio_looping: bool | None = None,
thinking_audio_file_path: str | None = None,
thinking_audio_looping: bool | None = None,
thinking_audio_override_thinking: bool | None = None)-
Expand source code
@dataclass class CascadingTurnData: """Data structure for a single user-agent turn""" user_speech_start_time: Optional[float] = None user_speech_end_time: Optional[float] = None user_speech_duration: Optional[float] = None user_speech: Optional[str] = None llm_input: Optional[str] = None agent_speech_start_time: Optional[float] = None agent_speech_end_time: Optional[float] = None agent_speech_duration: Optional[float] = None agent_speech: Optional[str] = None kb_documents: Optional[List[str]] = None kb_scores: Optional[List[float]] = None kb_retrieval_latency: Optional[float] = None kb_start_time: Optional[float] = None kb_end_time: Optional[float] = None stt_confidence: Optional[float] = None stt_input_tokens: Optional[int] = 0 stt_output_tokens: Optional[int] = 0 stt_total_tokens: Optional[int] = 0 stt_latency: Optional[float] = None stt_start_time: Optional[float] = None stt_end_time: Optional[float] = None stt_duration: Optional[float] = None stt_preflight_end_time: Optional[float] = None stt_preflight_latency: Optional[float] = None stt_interim_end_time: Optional[float] = None stt_interim_latency: Optional[float] = None stt_ttfw: Optional[float] = None # Time to first word - locked after first interim stt_preemptive_generation_occurred: bool = False stt_transcript: Optional[str] = None stt_preflight_transcript: Optional[str] = None stt_preemptive_generation_enabled: bool = False llm_latency: Optional[float] = None llm_start_time: Optional[float] = None llm_end_time: Optional[float] = None llm_duration: Optional[float] = None llm_first_token_time: Optional[float] = None llm_ttft: Optional[float] = None prompt_tokens: Optional[int] = 0 completion_tokens: Optional[int] = 0 total_tokens: Optional[int] = 0 prompt_cached_tokens: Optional[int] = 0 tokens_per_second: Optional[float] = 0 tts_latency: Optional[float] = None tts_start_time: Optional[float] = None tts_end_time: Optional[float] = None tts_duration: Optional[float] = None tts_characters: Optional[int] = 0 ttfb: Optional[float] = None eou_latency: Optional[float] = None eou_start_time: Optional[float] = None eou_end_time: Optional[float] = None eou_probability: Optional[float] = None min_speech_wait_timeout: Optional[float] = None max_speech_wait_timeout: Optional[float] = None eou_avg_delay: Optional[float] = None waited_for_additional_speech: bool = False wait_for_additional_speech_duration: Optional[float] = None interrupted: bool = False interrupt_words: Optional[int] = None interrupt_duration: Optional[float] = None interrupt_reason: List[str] = field(default_factory=list) interrupt_start_time: Optional[float] = None interrupt_end_time: Optional[float] = None is_false_interrupt: bool = False false_interrupt_duration: Optional[float] = None false_interrupt_words: Optional[int] = None false_interrupt_start_time: Optional[float] = None false_interrupt_end_time: Optional[float] = None resumed_after_false_interrupt: bool = False interrupt_mode: Literal["VAD_ONLY", "STT_ONLY", "HYBRID"] = "HYBRID" interrupt_min_duration: Optional[float] = None interrupt_min_words: Optional[int] = None false_interrupt_pause_duration: Optional[float] = None resume_on_false_interrupt: Optional[bool] = None function_tool_timestamps: List[Dict[str, Any]] = field(default_factory=list) e2e_latency: Optional[float] = None timestamp: float = field(default_factory=time.time) function_tools_called: List[str] = field(default_factory=list) system_instructions: str = "" llm_provider_class: str = "" llm_model_name: str = "" stt_provider_class: str = "" stt_model_name: str = "" tts_provider_class: str = "" tts_model_name: str = "" vad_provider_class: str = "" vad_model_name: str = "" vad_min_silence_duration: Optional[float] = None vad_min_speech_duration: Optional[float] = None vad_threshold: Optional[float] = None vad_end_of_speech_time: Optional[float] = None eou_provider_class: str = "" eou_model_name: str = "" timeline: List[TimelineEvent] = field(default_factory=list) errors: List[Dict[str, Any]] = field(default_factory=list) fallback_events: List['FallbackEvent'] = field(default_factory=list) is_a2a_enabled: bool = False handoff_occurred: bool = False # Background Audio attributes (no start/end times - can be played outside turn) background_audio_file_path: Optional[str] = None background_audio_looping: Optional[bool] = None # Thinking Audio attributes (no start/end times - can be played outside turn) thinking_audio_file_path: Optional[str] = None thinking_audio_looping: Optional[bool] = None thinking_audio_override_thinking: Optional[bool] = NoneData structure for a single user-agent turn
Instance variables
var agent_speech : str | Nonevar agent_speech_duration : float | Nonevar agent_speech_end_time : float | Nonevar agent_speech_start_time : float | Nonevar background_audio_file_path : str | Nonevar background_audio_looping : bool | Nonevar completion_tokens : int | Nonevar e2e_latency : float | Nonevar eou_avg_delay : float | Nonevar eou_end_time : float | Nonevar eou_latency : float | Nonevar eou_model_name : strvar eou_probability : float | Nonevar eou_provider_class : strvar eou_start_time : float | Nonevar errors : List[Dict[str, Any]]var fallback_events : List[FallbackEvent]var false_interrupt_duration : float | Nonevar false_interrupt_end_time : float | Nonevar false_interrupt_pause_duration : float | Nonevar false_interrupt_start_time : float | Nonevar false_interrupt_words : int | Nonevar function_tool_timestamps : List[Dict[str, Any]]var function_tools_called : List[str]var handoff_occurred : boolvar interrupt_duration : float | Nonevar interrupt_end_time : float | Nonevar interrupt_min_duration : float | Nonevar interrupt_min_words : int | Nonevar interrupt_mode : Literal['VAD_ONLY', 'STT_ONLY', 'HYBRID']var interrupt_reason : List[str]var interrupt_start_time : float | Nonevar interrupt_words : int | Nonevar interrupted : boolvar is_a2a_enabled : boolvar is_false_interrupt : boolvar kb_documents : List[str] | Nonevar kb_end_time : float | Nonevar kb_retrieval_latency : float | Nonevar kb_scores : List[float] | Nonevar kb_start_time : float | Nonevar llm_duration : float | Nonevar llm_end_time : float | Nonevar llm_first_token_time : float | Nonevar llm_input : str | Nonevar llm_latency : float | Nonevar llm_model_name : strvar llm_provider_class : strvar llm_start_time : float | Nonevar llm_ttft : float | Nonevar max_speech_wait_timeout : float | Nonevar min_speech_wait_timeout : float | Nonevar prompt_cached_tokens : int | Nonevar prompt_tokens : int | Nonevar resume_on_false_interrupt : bool | Nonevar resumed_after_false_interrupt : boolvar stt_confidence : float | Nonevar stt_duration : float | Nonevar stt_end_time : float | Nonevar stt_input_tokens : int | Nonevar stt_interim_end_time : float | Nonevar stt_interim_latency : float | Nonevar stt_latency : float | Nonevar stt_model_name : strvar stt_output_tokens : int | Nonevar stt_preemptive_generation_enabled : boolvar stt_preemptive_generation_occurred : boolvar stt_preflight_end_time : float | Nonevar stt_preflight_latency : float | Nonevar stt_preflight_transcript : str | Nonevar stt_provider_class : strvar stt_start_time : float | Nonevar stt_total_tokens : int | Nonevar stt_transcript : str | Nonevar stt_ttfw : float | Nonevar system_instructions : strvar thinking_audio_file_path : str | Nonevar thinking_audio_looping : bool | Nonevar thinking_audio_override_thinking : bool | Nonevar timeline : List[TimelineEvent]var timestamp : floatvar tokens_per_second : float | Nonevar total_tokens : int | Nonevar ttfb : float | Nonevar tts_characters : int | Nonevar tts_duration : float | Nonevar tts_end_time : float | Nonevar tts_latency : float | Nonevar tts_model_name : strvar tts_provider_class : strvar tts_start_time : float | Nonevar user_speech : str | Nonevar user_speech_duration : float | Nonevar user_speech_end_time : float | Nonevar user_speech_start_time : float | Nonevar vad_end_of_speech_time : float | Nonevar vad_min_silence_duration : float | Nonevar vad_min_speech_duration : float | Nonevar vad_model_name : strvar vad_provider_class : strvar vad_threshold : float | Nonevar wait_for_additional_speech_duration : float | Nonevar waited_for_additional_speech : bool
class RealtimeMetricsCollector-
Expand source code
class RealtimeMetricsCollector: _agent_info: Dict[str, Any] = { "realtime_provider_class": None, "realtime_model_name": None, "system_instructions": None, "function_tools": [], "mcp_tools": [] } def __init__(self) -> None: self.turns: List[RealtimeTurnData] = [] self.current_turn: Optional[RealtimeTurnData] = None self.lock = asyncio.Lock() self.agent_speech_end_timer: Optional[asyncio.TimerHandle] = None self.analytics_client = AnalyticsClient() self.traces_flow_manager: Optional[TracesFlowManager] = None self.playground: bool = False def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.analytics_client.set_session_id(session_id) def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance for realtime tracing""" self.traces_flow_manager = manager def _transform_to_camel_case(self, data: Dict[str, Any]) -> Dict[str, Any]: """Converts snake_case to camelCase for analytics reporting.""" def to_camel_case(snake_str: str) -> str: parts = snake_str.split('_') return parts[0] + ''.join(x.title() for x in parts[1:]) if isinstance(data, list): return [self._transform_to_camel_case(item) for item in data] if isinstance(data, dict): return {to_camel_case(k): self._transform_to_camel_case(v) for k, v in data.items()} return data async def start_session(self, agent: Agent, pipeline: Pipeline) -> None: RealtimeMetricsCollector._agent_info = { "realtime_provider_class": pipeline.model.__class__.__name__, "realtime_model_name": getattr(pipeline.model, "model", None), "system_instructions": agent.instructions, "function_tools": [ getattr(tool, "name", tool.__name__ if callable(tool) else str(tool)) for tool in ( [tool for tool in agent.tools if tool not in agent.mcp_manager.tools] if agent.mcp_manager else agent.tools ) ] if agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in agent.mcp_manager.tools ] if agent.mcp_manager else [], } self.turns = [] self.current_turn = None from . import cascading_metrics_collector as cascading_metrics_collector traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: self.set_traces_flow_manager(traces_flow_manager) config_attributes = { **RealtimeMetricsCollector._agent_info, "pipeline": pipeline.__class__.__name__, "llm_provider": pipeline.model.__class__.__name__, } await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({}) async def _start_new_interaction(self) -> None: async with self.lock: self.current_turn = RealtimeTurnData( **RealtimeMetricsCollector._agent_info ) self.turns.append(self.current_turn) async def set_user_speech_start(self) -> None: if self.current_turn: self._finalize_interaction_and_send() await self._start_new_interaction() if self.current_turn and self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") async def set_user_speech_end(self) -> None: if self.current_turn and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") async def set_agent_speech_start(self) -> None: if not self.current_turn: await self._start_new_interaction() elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") if self.current_turn and self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() async def set_agent_speech_end(self, timeout: float = 1.0) -> None: if self.current_turn: if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() loop = asyncio.get_event_loop() self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send) await self.end_timeline_event("agent_speech") async def set_a2a_handoff(self) -> None: """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.current_turn: self.current_turn.is_a2a_enabled = True self.current_turn.handoff_occurred = True def _finalize_agent_speech(self) -> None: if not self.current_turn or self.current_turn.agent_speech_end_time is not None: return is_valid_interaction = self.current_turn.user_speech_start_time is not None or \ self.current_turn.agent_speech_start_time is not None if not is_valid_interaction: return self.current_turn.agent_speech_end_time = time.perf_counter() self.agent_speech_end_timer = None def _finalize_interaction_and_send(self) -> None: if not self.current_turn: return self._finalize_agent_speech() if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time: self.current_turn.user_speech_end_time = time.perf_counter() current_time = time.perf_counter() for event in self.current_turn.timeline: if event.end_time is None: event.end_time = current_time event.duration_ms = round((current_time - event.start_time) * 1000, 4) is_valid_interaction = self.current_turn.user_speech_start_time is not None or \ self.current_turn.agent_speech_start_time is not None if not is_valid_interaction: self.current_turn = None return self.current_turn.compute_latencies() if self.playground: self.playground_manager.send_realtime_metrics(metrics=self.current_turn, full_turn_data=True) if not hasattr(self, 'traces_flow_manager') or self.traces_flow_manager is None: try: from . import cascading_metrics_collector as cascading_metrics_collector self.traces_flow_manager = cascading_metrics_collector.traces_flow_manager except Exception: pass if self.traces_flow_manager: self.traces_flow_manager.create_realtime_turn_trace(self.current_turn) interaction_data = asdict(self.current_turn) fields_to_remove = [ "realtime_model_errors", "is_a2a_enabled", "session_id", "agent_speech_duration", "agent_speech_start_time", "agent_speech_end_time", "thinking_time", "function_tools", "mcp_tools"] if len(self.turns) > 1: fields_to_remove.extend(["system_instructions","function_tools", "mcp_tools"]) if not self.current_turn.is_a2a_enabled: fields_to_remove.extend(["handoff_occurred"]) for field in fields_to_remove: if field in interaction_data: del interaction_data[field] transformed_data = self._transform_to_camel_case(interaction_data) self.analytics_client.send_interaction_analytics_safe({ "data": [transformed_data] }) self.turns.append(self.current_turn) self.current_turn = None async def add_timeline_event(self, event: TimelineEvent) -> None: if self.current_turn: self.current_turn.timeline.append(event) async def start_timeline_event(self, event_type: str) -> None: """Start a timeline event with a precise start time""" if self.current_turn: event = TimelineEvent( event_type=event_type, start_time=time.perf_counter() ) self.current_turn.timeline.append(event) async def end_timeline_event(self, event_type: str) -> None: """End a timeline event and calculate duration""" if self.current_turn: end_time = time.perf_counter() for event in reversed(self.current_turn.timeline): if event.event_type == event_type and event.end_time is None: event.end_time = end_time event.duration_ms = round((end_time - event.start_time) * 1000, 4) break async def update_timeline_event_text(self, event_type: str, text: str) -> None: """Update timeline event with text content""" if self.current_turn: for event in reversed(self.current_turn.timeline): if event.event_type == event_type and not event.text: event.text = text break async def add_tool_call(self, tool_name: str) -> None: if self.current_turn and tool_name not in self.current_turn.function_tools_called: self.current_turn.function_tools_called.append(tool_name) logger.info(f"function tool called: {tool_name}") async def set_user_transcript(self, text: str) -> None: """Set the user transcript for the current turn and update timeline""" if self.current_turn: if self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") if self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") logger.info(f"user input speech: {text}") await self.update_timeline_event_text("user_speech", text) async def set_agent_response(self, text: str) -> None: """Set the agent response for the current turn and update timeline""" if self.current_turn: if self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") logger.info(f"agent output speech: {text}") await self.update_timeline_event_text("agent_speech", text) def set_realtime_model_error(self, error: Dict[str, Any]) -> None: """Set a realtime model-specific error for the current turn""" if self.current_turn: logger.error(f"realtime model error: {error}") self.current_turn.realtime_model_errors.append(error) async def set_interrupted(self) -> None: if self.current_turn: self.current_turn.interrupted = True def finalize_session(self) -> None: asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop()) def set_playground_manager(self, manager: Optional["PlaygroundManager"]): self.playground = True self.playground_manager = manager def set_token_details(self, token_details: Dict[str, Any]): if self.current_turn: self.current_turn.realtime_input_tokens = token_details.get("input_tokens") self.current_turn.realtime_total_tokens = token_details.get("total_tokens") self.current_turn.realtime_output_tokens = token_details.get("output_tokens") self.current_turn.realtime_input_text_tokens = token_details.get("input_text_tokens") self.current_turn.realtime_input_audio_tokens = token_details.get("input_audio_tokens") self.current_turn.realtime_input_image_tokens = token_details.get("input_image_tokens") self.current_turn.realtime_input_cached_tokens = token_details.get("input_cached_tokens") self.current_turn.realtime_cached_text_tokens = token_details.get("cached_text_tokens") self.current_turn.realtime_cached_audio_tokens = token_details.get("cached_audio_tokens") self.current_turn.realtime_cached_image_tokens = token_details.get("cached_image_tokens") self.current_turn.realtime_output_text_tokens = token_details.get("output_text_tokens") self.current_turn.realtime_output_audio_tokens = token_details.get("output_audio_tokens") self.current_turn.realtime_output_image_tokens = token_details.get("output_image_tokens")Methods
async def add_timeline_event(self,
event: TimelineEvent) ‑> None-
Expand source code
async def add_timeline_event(self, event: TimelineEvent) -> None: if self.current_turn: self.current_turn.timeline.append(event) async def add_tool_call(self, tool_name: str) ‑> None-
Expand source code
async def add_tool_call(self, tool_name: str) -> None: if self.current_turn and tool_name not in self.current_turn.function_tools_called: self.current_turn.function_tools_called.append(tool_name) logger.info(f"function tool called: {tool_name}") async def end_timeline_event(self, event_type: str) ‑> None-
Expand source code
async def end_timeline_event(self, event_type: str) -> None: """End a timeline event and calculate duration""" if self.current_turn: end_time = time.perf_counter() for event in reversed(self.current_turn.timeline): if event.event_type == event_type and event.end_time is None: event.end_time = end_time event.duration_ms = round((end_time - event.start_time) * 1000, 4) breakEnd a timeline event and calculate duration
def finalize_session(self) ‑> None-
Expand source code
def finalize_session(self) -> None: asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop()) async def set_a2a_handoff(self) ‑> None-
Expand source code
async def set_a2a_handoff(self) -> None: """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.current_turn: self.current_turn.is_a2a_enabled = True self.current_turn.handoff_occurred = TrueSet the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.
async def set_agent_response(self, text: str) ‑> None-
Expand source code
async def set_agent_response(self, text: str) -> None: """Set the agent response for the current turn and update timeline""" if self.current_turn: if self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") logger.info(f"agent output speech: {text}") await self.update_timeline_event_text("agent_speech", text)Set the agent response for the current turn and update timeline
async def set_agent_speech_end(self, timeout: float = 1.0) ‑> None-
Expand source code
async def set_agent_speech_end(self, timeout: float = 1.0) -> None: if self.current_turn: if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() loop = asyncio.get_event_loop() self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send) await self.end_timeline_event("agent_speech") async def set_agent_speech_start(self) ‑> None-
Expand source code
async def set_agent_speech_start(self) -> None: if not self.current_turn: await self._start_new_interaction() elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") if self.current_turn and self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() async def set_interrupted(self) ‑> None-
Expand source code
async def set_interrupted(self) -> None: if self.current_turn: self.current_turn.interrupted = True def set_playground_manager(self, manager: "Optional['PlaygroundManager']")-
Expand source code
def set_playground_manager(self, manager: Optional["PlaygroundManager"]): self.playground = True self.playground_manager = manager def set_realtime_model_error(self, error: Dict[str, Any]) ‑> None-
Expand source code
def set_realtime_model_error(self, error: Dict[str, Any]) -> None: """Set a realtime model-specific error for the current turn""" if self.current_turn: logger.error(f"realtime model error: {error}") self.current_turn.realtime_model_errors.append(error)Set a realtime model-specific error for the current turn
def set_session_id(self, session_id: str)-
Expand source code
def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.analytics_client.set_session_id(session_id)Set the session ID for metrics tracking
def set_token_details(self, token_details: Dict[str, Any])-
Expand source code
def set_token_details(self, token_details: Dict[str, Any]): if self.current_turn: self.current_turn.realtime_input_tokens = token_details.get("input_tokens") self.current_turn.realtime_total_tokens = token_details.get("total_tokens") self.current_turn.realtime_output_tokens = token_details.get("output_tokens") self.current_turn.realtime_input_text_tokens = token_details.get("input_text_tokens") self.current_turn.realtime_input_audio_tokens = token_details.get("input_audio_tokens") self.current_turn.realtime_input_image_tokens = token_details.get("input_image_tokens") self.current_turn.realtime_input_cached_tokens = token_details.get("input_cached_tokens") self.current_turn.realtime_cached_text_tokens = token_details.get("cached_text_tokens") self.current_turn.realtime_cached_audio_tokens = token_details.get("cached_audio_tokens") self.current_turn.realtime_cached_image_tokens = token_details.get("cached_image_tokens") self.current_turn.realtime_output_text_tokens = token_details.get("output_text_tokens") self.current_turn.realtime_output_audio_tokens = token_details.get("output_audio_tokens") self.current_turn.realtime_output_image_tokens = token_details.get("output_image_tokens") def set_traces_flow_manager(self, manager: TracesFlowManager)-
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance for realtime tracing""" self.traces_flow_manager = managerSet the TracesFlowManager instance for realtime tracing
async def set_user_speech_end(self) ‑> None-
Expand source code
async def set_user_speech_end(self) -> None: if self.current_turn and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") async def set_user_speech_start(self) ‑> None-
Expand source code
async def set_user_speech_start(self) -> None: if self.current_turn: self._finalize_interaction_and_send() await self._start_new_interaction() if self.current_turn and self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") async def set_user_transcript(self, text: str) ‑> None-
Expand source code
async def set_user_transcript(self, text: str) -> None: """Set the user transcript for the current turn and update timeline""" if self.current_turn: if self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") if self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") logger.info(f"user input speech: {text}") await self.update_timeline_event_text("user_speech", text)Set the user transcript for the current turn and update timeline
async def start_session(self, agent: Agent, pipeline: Pipeline) ‑> None-
Expand source code
async def start_session(self, agent: Agent, pipeline: Pipeline) -> None: RealtimeMetricsCollector._agent_info = { "realtime_provider_class": pipeline.model.__class__.__name__, "realtime_model_name": getattr(pipeline.model, "model", None), "system_instructions": agent.instructions, "function_tools": [ getattr(tool, "name", tool.__name__ if callable(tool) else str(tool)) for tool in ( [tool for tool in agent.tools if tool not in agent.mcp_manager.tools] if agent.mcp_manager else agent.tools ) ] if agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in agent.mcp_manager.tools ] if agent.mcp_manager else [], } self.turns = [] self.current_turn = None from . import cascading_metrics_collector as cascading_metrics_collector traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: self.set_traces_flow_manager(traces_flow_manager) config_attributes = { **RealtimeMetricsCollector._agent_info, "pipeline": pipeline.__class__.__name__, "llm_provider": pipeline.model.__class__.__name__, } await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({}) async def start_timeline_event(self, event_type: str) ‑> None-
Expand source code
async def start_timeline_event(self, event_type: str) -> None: """Start a timeline event with a precise start time""" if self.current_turn: event = TimelineEvent( event_type=event_type, start_time=time.perf_counter() ) self.current_turn.timeline.append(event)Start a timeline event with a precise start time
async def update_timeline_event_text(self, event_type: str, text: str) ‑> None-
Expand source code
async def update_timeline_event_text(self, event_type: str, text: str) -> None: """Update timeline event with text content""" if self.current_turn: for event in reversed(self.current_turn.timeline): if event.event_type == event_type and not event.text: event.text = text breakUpdate timeline event with text content
class TimelineEvent (event_type: str,
start_time: float,
end_time: float | None = None,
duration_ms: float | None = None,
text: str = '')-
Expand source code
@dataclass class TimelineEvent: """Data structure for a single timeline event""" event_type: str start_time: float end_time: Optional[float] = None duration_ms: Optional[float] = None text: str = ""Data structure for a single timeline event
Instance variables
var duration_ms : float | Nonevar end_time : float | Nonevar event_type : strvar start_time : floatvar text : str