Module agents.metrics
Sub-modules
agents.metrics.analyticsagents.metrics.cascading_metrics_collectoragents.metrics.integrationagents.metrics.logsagents.metrics.modelsagents.metrics.realtime_metrics_collectoragents.metrics.telemetryagents.metrics.traces_flow
Functions
def auto_initialize_telemetry_and_logs(room_id: str,
peer_id: str,
room_attributes: Dict[str, Any] = None,
session_id: str = None,
sdk_metadata: Dict[str, Any] = None)-
Expand source code
def auto_initialize_telemetry_and_logs(room_id: str, peer_id: str, room_attributes: Dict[str, Any] = None, session_id: str = None, sdk_metadata: Dict[str, Any] = None): """ Auto-initialize telemetry and logs from room attributes """ if not room_attributes: return observability_jwt = room_attributes.get('observability', '') traces_config = room_attributes.get('traces', {}) if traces_config.get('enabled'): metadata = { } initialize_telemetry( room_id=room_id, peer_id=peer_id, sdk_name="agents", observability_jwt=observability_jwt, traces_config=traces_config, metadata=metadata, sdk_metadata=sdk_metadata ) logs_config = room_attributes.get('logs', {}) if logs_config.get('enabled'): initialize_logs( meeting_id=room_id, peer_id=peer_id, jwt_key=observability_jwt, log_config=logs_config, session_id=session_id, sdk_metadata=sdk_metadata, )Auto-initialize telemetry and logs from room attributes
def complete_span(span: opentelemetry.trace.span.Span | None,
status_code,
message: str = '',
end_time: float | None = None)-
Expand source code
def complete_span(span: Optional[Span], status_code, message: str = "", end_time: Optional[float] = None): """ Complete a trace span (convenience method) Args: span: Span to complete status_code: Status code message: Status message end_time: End time in seconds since epoch (optional) """ telemetry = get_telemetry() if telemetry and span: telemetry.complete_span(span, status_code, message, end_time)Complete a trace span (convenience method)
Args
span- Span to complete
status_code- Status code
message- Status message
end_time- End time in seconds since epoch (optional)
def create_log(message: str, log_level: str = 'INFO', attributes: Dict[str, Any] = None)-
Expand source code
def create_log(message: str, log_level: str = "INFO", attributes: Dict[str, Any] = None): """ Create a log entry (convenience method) Args: message: Log message log_level: Log level attributes: Additional attributes """ logs = get_logs() if logs: logs.create_log(message, log_level, attributes)Create a log entry (convenience method)
Args
message- Log message
log_level- Log level
attributes- Additional attributes
def create_span(span_name: str,
attributes: Dict[str, Any] = None,
parent_span: opentelemetry.trace.span.Span | None = None,
start_time: float | None = None)-
Expand source code
def create_span(span_name: str, attributes: Dict[str, Any] = None, parent_span: Optional[Span] = None, start_time: Optional[float] = None): """ Create a trace span (convenience method) Args: span_name: Name of the span attributes: Span attributes parent_span: Parent span (optional) start_time: Start time in seconds since epoch (optional) Returns: Span object or None """ telemetry = get_telemetry() if telemetry: return telemetry.trace(span_name, attributes, parent_span, start_time) return NoneCreate a trace span (convenience method)
Args
span_name- Name of the span
attributes- Span attributes
parent_span- Parent span (optional)
start_time- Start time in seconds since epoch (optional)
Returns
Span object or None
Classes
class CascadingMetricsCollector-
Expand source code
class CascadingMetricsCollector: """Collects and tracks performance metrics for AI agent turns""" def __init__(self): self.data = CascadingMetricsData() self.analytics_client = AnalyticsClient() self.traces_flow_manager: Optional[TracesFlowManager] = None self.active_spans: Dict[str, Span] = {} self.pending_user_start_time: Optional[float] = None def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance""" self.traces_flow_manager = manager def _generate_interaction_id(self) -> str: """Generate a hash-based turn ID""" timestamp = str(time.time()) session_id = self.data.session_id or "default" interaction_count = str(self.data.total_turns) hash_input = f"{timestamp}_{session_id}_{interaction_count}" return hashlib.md5(hash_input.encode()).hexdigest()[:16] def _round_latency(self, latency: float) -> float: """Convert latency from seconds to milliseconds and round to 4 decimal places""" return round(latency * 1000, 4) def _transform_to_camel_case(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]: """Transform snake_case field names to camelCase for analytics""" field_mapping = { 'user_speech_start_time': 'userSpeechStartTime', 'user_speech_end_time': 'userSpeechEndTime', 'stt_latency': 'sttLatency', 'llm_latency': 'llmLatency', 'tts_latency': 'ttsLatency', 'eou_latency': 'eouLatency', 'e2e_latency': 'e2eLatency', 'function_tools_called': 'functionToolsCalled', 'system_instructions': 'systemInstructions', 'errors': 'errors', 'function_tool_timestamps': 'functionToolTimestamps', 'stt_start_time': 'sttStartTime', 'stt_end_time': 'sttEndTime', 'tts_start_time': 'ttsStartTime', 'tts_end_time': 'ttsEndTime', 'llm_start_time': 'llmStartTime', 'llm_end_time': 'llmEndTime', 'eou_start_time': 'eouStartTime', 'eou_end_time': 'eouEndTime', 'llm_provider_class': 'llmProviderClass', 'llm_model_name': 'llmModelName', 'stt_provider_class': 'sttProviderClass', 'stt_model_name': 'sttModelName', 'tts_provider_class': 'ttsProviderClass', 'tts_model_name': 'ttsModelName', 'vad_provider_class': 'vadProviderClass', 'vad_model_name': 'vadModelName', 'eou_provider_class': 'eouProviderClass', 'eou_model_name': 'eouModelName', 'handoff_occurred': 'handOffOccurred' } timeline_field_mapping = { 'event_type': 'eventType', 'start_time': 'startTime', 'end_time': 'endTime', 'duration_ms': 'durationInMs' } transformed_data = {} for key, value in interaction_data.items(): camel_key = field_mapping.get(key, key) if key == 'timeline' and isinstance(value, list): transformed_timeline = [] for event in value: transformed_event = {} for event_key, event_value in event.items(): camel_event_key = timeline_field_mapping.get(event_key, event_key) transformed_event[camel_event_key] = event_value transformed_timeline.append(transformed_event) transformed_data[camel_key] = transformed_timeline else: transformed_data[camel_key] = value return transformed_data def _remove_negatives(self, obj: Any) -> Any: """Recursively clamp any numeric value < 0 to 0 in dicts/lists.""" if isinstance(obj, dict): for k, v in obj.items(): if isinstance(v, (int, float)): if v < 0: obj[k] = 0 elif isinstance(v, (dict, list)): obj[k] = self._remove_negatives(v) return obj if isinstance(obj, list): for i, v in enumerate(obj): if isinstance(v, (int, float)): if v < 0: obj[i] = 0 elif isinstance(v, (dict, list)): obj[i] = self._remove_negatives(v) return obj return obj def _start_timeline_event(self, event_type: str, start_time: float) -> None: """Start a timeline event""" if self.data.current_turn: event = TimelineEvent( event_type=event_type, start_time=start_time ) self.data.current_turn.timeline.append(event) def _end_timeline_event(self, event_type: str, end_time: float) -> None: """End a timeline event and calculate duration""" if self.data.current_turn: for event in reversed(self.data.current_turn.timeline): if event.event_type == event_type and event.end_time is None: event.end_time = end_time event.duration_ms = self._round_latency(end_time - event.start_time) break def _update_timeline_event_text(self, event_type: str, text: str) -> None: """Update timeline event with text content""" if self.data.current_turn: for event in reversed(self.data.current_turn.timeline): if event.event_type == event_type and not event.text: event.text = text break def _calculate_e2e_metrics(self, turn: CascadingTurnData) -> None: """Calculate E2E and E2ET latencies based on individual component latencies""" e2e_components = [] if turn.stt_latency: e2e_components.append(turn.stt_latency) if turn.eou_latency: e2e_components.append(turn.eou_latency) if turn.llm_latency: e2e_components.append(turn.llm_latency) if turn.tts_latency: e2e_components.append(turn.tts_latency) if e2e_components: turn.e2e_latency = round(sum(e2e_components), 4) def _validate_interaction_has_required_latencies(self, turn: CascadingTurnData) -> bool: """ Validate that the turn has at least one of the required latency metrics. Returns True if at least one latency is present, False if ALL are absent/None. """ stt_present = turn.stt_latency is not None tts_present = turn.tts_latency is not None llm_present = turn.llm_latency is not None eou_present = turn.eou_latency is not None if not any([stt_present, tts_present, llm_present, eou_present]): return False present_latencies = [] if stt_present: present_latencies.append("STT") if tts_present: present_latencies.append("TTS") if llm_present: present_latencies.append("LLM") if eou_present: present_latencies.append("EOU") return True def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.data.session_id = session_id self.analytics_client.set_session_id(session_id) def set_system_instructions(self, instructions: str): """Set the system instructions for this session""" self.data.system_instructions = instructions def set_provider_info(self, llm_provider: str = "", llm_model: str = "", stt_provider: str = "", stt_model: str = "", tts_provider: str = "", tts_model: str = "", vad_provider: str = "", vad_model: str = "", eou_provider: str = "", eou_model: str = ""): """Set the provider class and model information for this session""" self.data.llm_provider_class = llm_provider self.data.llm_model_name = llm_model self.data.stt_provider_class = stt_provider self.data.stt_model_name = stt_model self.data.tts_provider_class = tts_provider self.data.tts_model_name = tts_model self.data.vad_provider_class = vad_provider self.data.vad_model_name = vad_model self.data.eou_provider_class = eou_provider self.data.eou_model_name = eou_model def start_new_interaction(self, user_transcript: str = "") -> None: """Start tracking a new user-agent turn""" if self.data.current_turn: self.complete_current_turn() self.data.total_turns += 1 self.data.current_turn = CascadingTurnData( system_instructions=self.data.system_instructions if self.data.total_turns == 1 else "", # Provider and model info should be included in every turn llm_provider_class=self.data.llm_provider_class, llm_model_name=self.data.llm_model_name, stt_provider_class=self.data.stt_provider_class, stt_model_name=self.data.stt_model_name, tts_provider_class=self.data.tts_provider_class, tts_model_name=self.data.tts_model_name, vad_provider_class=self.data.vad_provider_class, vad_model_name=self.data.vad_model_name, eou_provider_class=self.data.eou_provider_class, eou_model_name=self.data.eou_model_name ) if self.pending_user_start_time is not None: self.data.current_turn.user_speech_start_time = self.pending_user_start_time self._start_timeline_event("user_speech", self.pending_user_start_time) if self.data.is_user_speaking and self.data.user_input_start_time: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time) if user_transcript: self.set_user_transcript(user_transcript) def complete_current_turn(self) -> None: """Complete and store the current turn""" if self.data.current_turn: self._calculate_e2e_metrics(self.data.current_turn) if not self._validate_interaction_has_required_latencies(self.data.current_turn): if self.data.current_turn.user_speech_start_time is not None: if (self.pending_user_start_time is None or self.data.current_turn.user_speech_start_time < self.pending_user_start_time): self.pending_user_start_time = self.data.current_turn.user_speech_start_time logger.info(f"[metrics] Caching earliest user start: {self.pending_user_start_time}") self.data.current_turn = None return if self.traces_flow_manager: self.traces_flow_manager.create_cascading_turn_trace(self.data.current_turn) self.data.turns.append(self.data.current_turn) interaction_data = asdict(self.data.current_turn) interaction_data['timeline'] = [asdict(event) for event in self.data.current_turn.timeline] transformed_data = self._transform_to_camel_case(interaction_data) # transformed_data = self._intify_latencies_and_timestamps(transformed_data) always_remove_fields = [ 'errors', 'functionToolTimestamps', 'sttStartTime', 'sttEndTime', 'ttsStartTime', 'ttsEndTime', 'llmStartTime', 'llmEndTime', 'eouStartTime', 'eouEndTime', 'is_a2a_enabled', "interactionId", "timestamp" ] if not self.data.current_turn.is_a2a_enabled: always_remove_fields.append("handOffOccurred") for field in always_remove_fields: if field in transformed_data: del transformed_data[field] if len(self.data.turns) > 1: provider_fields = [ 'systemInstructions', 'llmProviderClass', 'llmModelName', 'sttProviderClass', 'sttModelName', 'ttsProviderClass', 'ttsModelName' ] for field in provider_fields: if field in transformed_data: del transformed_data[field] transformed_data = self._remove_negatives(transformed_data) interaction_payload = { "data": [transformed_data] } self.analytics_client.send_interaction_analytics_safe(interaction_payload) self.data.current_turn = None self.pending_user_start_time = None def on_interrupted(self): """Called when the user interrupts the agent""" if self.data.is_agent_speaking: self.data.total_interruptions += 1 if self.data.current_turn: self.data.current_turn.interrupted = True logger.info(f"User interrupted the agent. Total interruptions: {self.data.total_interruptions}") def on_user_speech_start(self): """Called when user starts speaking""" if self.data.is_user_speaking: return if not self.data.current_turn: self.start_new_interaction() self.data.is_user_speaking = True self.data.user_input_start_time = time.perf_counter() if self.data.current_turn: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(event.event_type == "user_speech" for event in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time) def on_user_speech_end(self): """Called when user stops speaking""" self.data.is_user_speaking = False self.data.user_speech_end_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.user_speech_end_time = self.data.user_speech_end_time self._end_timeline_event("user_speech", self.data.user_speech_end_time) def on_agent_speech_start(self): """Called when agent starts speaking (actual audio output)""" self.data.is_agent_speaking = True self.data.agent_speech_start_time = time.perf_counter() if self.data.current_turn: if not any(event.event_type == "agent_speech" and event.end_time is None for event in self.data.current_turn.timeline): self._start_timeline_event("agent_speech", self.data.agent_speech_start_time) def on_agent_speech_end(self): """Called when agent stops speaking""" self.data.is_agent_speaking = False agent_speech_end_time = time.perf_counter() if self.data.current_turn: self._end_timeline_event("agent_speech", agent_speech_end_time) if self.data.tts_start_time and self.data.tts_first_byte_time: total_tts_latency = self.data.tts_first_byte_time - self.data.tts_start_time if self.data.current_turn: self.data.current_turn.tts_end_time = agent_speech_end_time self.data.current_turn.tts_latency = self._round_latency(total_tts_latency) self.data.tts_start_time = None self.data.tts_first_byte_time = None elif self.data.tts_start_time: # If we have start time but no first byte time, just reset self.data.tts_start_time = None self.data.tts_first_byte_time = None def on_stt_start(self): """Called when STT processing starts""" self.data.stt_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.stt_start_time = self.data.stt_start_time def on_stt_complete(self): """Called when STT processing completes""" if self.data.stt_start_time: stt_end_time = time.perf_counter() stt_latency = stt_end_time - self.data.stt_start_time if self.data.current_turn: self.data.current_turn.stt_end_time = stt_end_time self.data.current_turn.stt_latency = self._round_latency(stt_latency) logger.info(f"stt latency: {self.data.current_turn.stt_latency}ms") self.data.stt_start_time = None def on_llm_start(self): """Called when LLM processing starts""" self.data.llm_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.llm_start_time = self.data.llm_start_time def on_llm_complete(self): """Called when LLM processing completes""" if self.data.llm_start_time: llm_end_time = time.perf_counter() llm_latency = llm_end_time - self.data.llm_start_time if self.data.current_turn: self.data.current_turn.llm_end_time = llm_end_time self.data.current_turn.llm_latency = self._round_latency(llm_latency) logger.info(f"llm latency: {self.data.current_turn.llm_latency}ms") self.data.llm_start_time = None def on_tts_start(self): """Called when TTS processing starts""" self.data.tts_start_time = time.perf_counter() self.data.tts_first_byte_time = None if self.data.current_turn: self.data.current_turn.tts_start_time = self.data.tts_start_time def on_tts_first_byte(self): """Called when TTS produces first audio byte - this is our TTS latency""" if self.data.tts_start_time: now = time.perf_counter() # ttfb = now - self.data.tts_start_time // no need to take the difference as we are using the start time of the tts span if self.data.current_turn: self.data.current_turn.ttfb = now logger.info(f"tts ttfb: {(self.data.current_turn.ttfb - self.data.tts_start_time) * 1000}ms") self.data.tts_first_byte_time = now def on_eou_start(self): """Called when EOU (End of Utterance) processing starts""" self.data.eou_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.eou_start_time = self.data.eou_start_time def on_eou_complete(self): """Called when EOU processing completes""" if self.data.eou_start_time: eou_end_time = time.perf_counter() eou_latency = eou_end_time - self.data.eou_start_time if self.data.current_turn: self.data.current_turn.eou_end_time = eou_end_time self.data.current_turn.eou_latency = self._round_latency(eou_latency) # self._end_timeline_event("eou_processing", eou_end_time) logger.info(f"eou latency: {self.data.current_turn.eou_latency}ms") self.data.eou_start_time = None def set_user_transcript(self, transcript: str): """Set the user transcript for the current turn and update timeline""" if self.data.current_turn: logger.info(f"user input speech: {transcript}") user_speech_events = [event for event in self.data.current_turn.timeline if event.event_type == "user_speech"] if user_speech_events: most_recent_event = user_speech_events[-1] most_recent_event.text = transcript else: current_time = time.perf_counter() self._start_timeline_event("user_speech", current_time) if self.data.current_turn.timeline: self.data.current_turn.timeline[-1].text = transcript def set_agent_response(self, response: str): """Set the agent response for the current turn and update timeline""" if self.data.current_turn: logger.info(f"agent output speech: {response}") if not any(event.event_type == "agent_speech" for event in self.data.current_turn.timeline): current_time = time.perf_counter() self._start_timeline_event("agent_speech", current_time) self._update_timeline_event_text("agent_speech", response) def add_function_tool_call(self, tool_name: str): """Track when a function tool is called in the current turn""" if self.data.current_turn: self.data.current_turn.function_tools_called.append(tool_name) tool_timestamp = { "tool_name": tool_name, "timestamp": time.perf_counter(), "readable_time": time.strftime("%H:%M:%S", time.localtime()) } self.data.current_turn.function_tool_timestamps.append(tool_timestamp) def add_error(self, source: str, message: str): """Add an error to the current turn""" if self.data.current_turn: self.data.current_turn.errors.append({ "source": source, "message": message, "timestamp": time.time() }) def set_a2a_handoff(self): """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.data.current_turn: self.data.current_turn.is_a2a_enabled = True self.data.current_turn.handoff_occurred = TrueCollects and tracks performance metrics for AI agent turns
Methods
def add_error(self, source: str, message: str)-
Expand source code
def add_error(self, source: str, message: str): """Add an error to the current turn""" if self.data.current_turn: self.data.current_turn.errors.append({ "source": source, "message": message, "timestamp": time.time() })Add an error to the current turn
def add_function_tool_call(self, tool_name: str)-
Expand source code
def add_function_tool_call(self, tool_name: str): """Track when a function tool is called in the current turn""" if self.data.current_turn: self.data.current_turn.function_tools_called.append(tool_name) tool_timestamp = { "tool_name": tool_name, "timestamp": time.perf_counter(), "readable_time": time.strftime("%H:%M:%S", time.localtime()) } self.data.current_turn.function_tool_timestamps.append(tool_timestamp)Track when a function tool is called in the current turn
def complete_current_turn(self) ‑> None-
Expand source code
def complete_current_turn(self) -> None: """Complete and store the current turn""" if self.data.current_turn: self._calculate_e2e_metrics(self.data.current_turn) if not self._validate_interaction_has_required_latencies(self.data.current_turn): if self.data.current_turn.user_speech_start_time is not None: if (self.pending_user_start_time is None or self.data.current_turn.user_speech_start_time < self.pending_user_start_time): self.pending_user_start_time = self.data.current_turn.user_speech_start_time logger.info(f"[metrics] Caching earliest user start: {self.pending_user_start_time}") self.data.current_turn = None return if self.traces_flow_manager: self.traces_flow_manager.create_cascading_turn_trace(self.data.current_turn) self.data.turns.append(self.data.current_turn) interaction_data = asdict(self.data.current_turn) interaction_data['timeline'] = [asdict(event) for event in self.data.current_turn.timeline] transformed_data = self._transform_to_camel_case(interaction_data) # transformed_data = self._intify_latencies_and_timestamps(transformed_data) always_remove_fields = [ 'errors', 'functionToolTimestamps', 'sttStartTime', 'sttEndTime', 'ttsStartTime', 'ttsEndTime', 'llmStartTime', 'llmEndTime', 'eouStartTime', 'eouEndTime', 'is_a2a_enabled', "interactionId", "timestamp" ] if not self.data.current_turn.is_a2a_enabled: always_remove_fields.append("handOffOccurred") for field in always_remove_fields: if field in transformed_data: del transformed_data[field] if len(self.data.turns) > 1: provider_fields = [ 'systemInstructions', 'llmProviderClass', 'llmModelName', 'sttProviderClass', 'sttModelName', 'ttsProviderClass', 'ttsModelName' ] for field in provider_fields: if field in transformed_data: del transformed_data[field] transformed_data = self._remove_negatives(transformed_data) interaction_payload = { "data": [transformed_data] } self.analytics_client.send_interaction_analytics_safe(interaction_payload) self.data.current_turn = None self.pending_user_start_time = NoneComplete and store the current turn
def on_agent_speech_end(self)-
Expand source code
def on_agent_speech_end(self): """Called when agent stops speaking""" self.data.is_agent_speaking = False agent_speech_end_time = time.perf_counter() if self.data.current_turn: self._end_timeline_event("agent_speech", agent_speech_end_time) if self.data.tts_start_time and self.data.tts_first_byte_time: total_tts_latency = self.data.tts_first_byte_time - self.data.tts_start_time if self.data.current_turn: self.data.current_turn.tts_end_time = agent_speech_end_time self.data.current_turn.tts_latency = self._round_latency(total_tts_latency) self.data.tts_start_time = None self.data.tts_first_byte_time = None elif self.data.tts_start_time: # If we have start time but no first byte time, just reset self.data.tts_start_time = None self.data.tts_first_byte_time = NoneCalled when agent stops speaking
def on_agent_speech_start(self)-
Expand source code
def on_agent_speech_start(self): """Called when agent starts speaking (actual audio output)""" self.data.is_agent_speaking = True self.data.agent_speech_start_time = time.perf_counter() if self.data.current_turn: if not any(event.event_type == "agent_speech" and event.end_time is None for event in self.data.current_turn.timeline): self._start_timeline_event("agent_speech", self.data.agent_speech_start_time)Called when agent starts speaking (actual audio output)
def on_eou_complete(self)-
Expand source code
def on_eou_complete(self): """Called when EOU processing completes""" if self.data.eou_start_time: eou_end_time = time.perf_counter() eou_latency = eou_end_time - self.data.eou_start_time if self.data.current_turn: self.data.current_turn.eou_end_time = eou_end_time self.data.current_turn.eou_latency = self._round_latency(eou_latency) # self._end_timeline_event("eou_processing", eou_end_time) logger.info(f"eou latency: {self.data.current_turn.eou_latency}ms") self.data.eou_start_time = NoneCalled when EOU processing completes
def on_eou_start(self)-
Expand source code
def on_eou_start(self): """Called when EOU (End of Utterance) processing starts""" self.data.eou_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.eou_start_time = self.data.eou_start_timeCalled when EOU (End of Utterance) processing starts
def on_interrupted(self)-
Expand source code
def on_interrupted(self): """Called when the user interrupts the agent""" if self.data.is_agent_speaking: self.data.total_interruptions += 1 if self.data.current_turn: self.data.current_turn.interrupted = True logger.info(f"User interrupted the agent. Total interruptions: {self.data.total_interruptions}")Called when the user interrupts the agent
def on_llm_complete(self)-
Expand source code
def on_llm_complete(self): """Called when LLM processing completes""" if self.data.llm_start_time: llm_end_time = time.perf_counter() llm_latency = llm_end_time - self.data.llm_start_time if self.data.current_turn: self.data.current_turn.llm_end_time = llm_end_time self.data.current_turn.llm_latency = self._round_latency(llm_latency) logger.info(f"llm latency: {self.data.current_turn.llm_latency}ms") self.data.llm_start_time = NoneCalled when LLM processing completes
def on_llm_start(self)-
Expand source code
def on_llm_start(self): """Called when LLM processing starts""" self.data.llm_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.llm_start_time = self.data.llm_start_timeCalled when LLM processing starts
def on_stt_complete(self)-
Expand source code
def on_stt_complete(self): """Called when STT processing completes""" if self.data.stt_start_time: stt_end_time = time.perf_counter() stt_latency = stt_end_time - self.data.stt_start_time if self.data.current_turn: self.data.current_turn.stt_end_time = stt_end_time self.data.current_turn.stt_latency = self._round_latency(stt_latency) logger.info(f"stt latency: {self.data.current_turn.stt_latency}ms") self.data.stt_start_time = NoneCalled when STT processing completes
def on_stt_start(self)-
Expand source code
def on_stt_start(self): """Called when STT processing starts""" self.data.stt_start_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.stt_start_time = self.data.stt_start_timeCalled when STT processing starts
def on_tts_first_byte(self)-
Expand source code
def on_tts_first_byte(self): """Called when TTS produces first audio byte - this is our TTS latency""" if self.data.tts_start_time: now = time.perf_counter() # ttfb = now - self.data.tts_start_time // no need to take the difference as we are using the start time of the tts span if self.data.current_turn: self.data.current_turn.ttfb = now logger.info(f"tts ttfb: {(self.data.current_turn.ttfb - self.data.tts_start_time) * 1000}ms") self.data.tts_first_byte_time = nowCalled when TTS produces first audio byte - this is our TTS latency
def on_tts_start(self)-
Expand source code
def on_tts_start(self): """Called when TTS processing starts""" self.data.tts_start_time = time.perf_counter() self.data.tts_first_byte_time = None if self.data.current_turn: self.data.current_turn.tts_start_time = self.data.tts_start_timeCalled when TTS processing starts
def on_user_speech_end(self)-
Expand source code
def on_user_speech_end(self): """Called when user stops speaking""" self.data.is_user_speaking = False self.data.user_speech_end_time = time.perf_counter() if self.data.current_turn: self.data.current_turn.user_speech_end_time = self.data.user_speech_end_time self._end_timeline_event("user_speech", self.data.user_speech_end_time)Called when user stops speaking
def on_user_speech_start(self)-
Expand source code
def on_user_speech_start(self): """Called when user starts speaking""" if self.data.is_user_speaking: return if not self.data.current_turn: self.start_new_interaction() self.data.is_user_speaking = True self.data.user_input_start_time = time.perf_counter() if self.data.current_turn: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(event.event_type == "user_speech" for event in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time)Called when user starts speaking
def set_a2a_handoff(self)-
Expand source code
def set_a2a_handoff(self): """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.data.current_turn: self.data.current_turn.is_a2a_enabled = True self.data.current_turn.handoff_occurred = TrueSet the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.
def set_agent_response(self, response: str)-
Expand source code
def set_agent_response(self, response: str): """Set the agent response for the current turn and update timeline""" if self.data.current_turn: logger.info(f"agent output speech: {response}") if not any(event.event_type == "agent_speech" for event in self.data.current_turn.timeline): current_time = time.perf_counter() self._start_timeline_event("agent_speech", current_time) self._update_timeline_event_text("agent_speech", response)Set the agent response for the current turn and update timeline
def set_provider_info(self,
llm_provider: str = '',
llm_model: str = '',
stt_provider: str = '',
stt_model: str = '',
tts_provider: str = '',
tts_model: str = '',
vad_provider: str = '',
vad_model: str = '',
eou_provider: str = '',
eou_model: str = '')-
Expand source code
def set_provider_info(self, llm_provider: str = "", llm_model: str = "", stt_provider: str = "", stt_model: str = "", tts_provider: str = "", tts_model: str = "", vad_provider: str = "", vad_model: str = "", eou_provider: str = "", eou_model: str = ""): """Set the provider class and model information for this session""" self.data.llm_provider_class = llm_provider self.data.llm_model_name = llm_model self.data.stt_provider_class = stt_provider self.data.stt_model_name = stt_model self.data.tts_provider_class = tts_provider self.data.tts_model_name = tts_model self.data.vad_provider_class = vad_provider self.data.vad_model_name = vad_model self.data.eou_provider_class = eou_provider self.data.eou_model_name = eou_modelSet the provider class and model information for this session
def set_session_id(self, session_id: str)-
Expand source code
def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.data.session_id = session_id self.analytics_client.set_session_id(session_id)Set the session ID for metrics tracking
def set_system_instructions(self, instructions: str)-
Expand source code
def set_system_instructions(self, instructions: str): """Set the system instructions for this session""" self.data.system_instructions = instructionsSet the system instructions for this session
def set_traces_flow_manager(self,
manager: TracesFlowManager)-
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance""" self.traces_flow_manager = managerSet the TracesFlowManager instance
def set_user_transcript(self, transcript: str)-
Expand source code
def set_user_transcript(self, transcript: str): """Set the user transcript for the current turn and update timeline""" if self.data.current_turn: logger.info(f"user input speech: {transcript}") user_speech_events = [event for event in self.data.current_turn.timeline if event.event_type == "user_speech"] if user_speech_events: most_recent_event = user_speech_events[-1] most_recent_event.text = transcript else: current_time = time.perf_counter() self._start_timeline_event("user_speech", current_time) if self.data.current_turn.timeline: self.data.current_turn.timeline[-1].text = transcriptSet the user transcript for the current turn and update timeline
def start_new_interaction(self, user_transcript: str = '') ‑> None-
Expand source code
def start_new_interaction(self, user_transcript: str = "") -> None: """Start tracking a new user-agent turn""" if self.data.current_turn: self.complete_current_turn() self.data.total_turns += 1 self.data.current_turn = CascadingTurnData( system_instructions=self.data.system_instructions if self.data.total_turns == 1 else "", # Provider and model info should be included in every turn llm_provider_class=self.data.llm_provider_class, llm_model_name=self.data.llm_model_name, stt_provider_class=self.data.stt_provider_class, stt_model_name=self.data.stt_model_name, tts_provider_class=self.data.tts_provider_class, tts_model_name=self.data.tts_model_name, vad_provider_class=self.data.vad_provider_class, vad_model_name=self.data.vad_model_name, eou_provider_class=self.data.eou_provider_class, eou_model_name=self.data.eou_model_name ) if self.pending_user_start_time is not None: self.data.current_turn.user_speech_start_time = self.pending_user_start_time self._start_timeline_event("user_speech", self.pending_user_start_time) if self.data.is_user_speaking and self.data.user_input_start_time: if self.data.current_turn.user_speech_start_time is None: self.data.current_turn.user_speech_start_time = self.data.user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.data.current_turn.timeline): self._start_timeline_event("user_speech", self.data.user_input_start_time) if user_transcript: self.set_user_transcript(user_transcript)Start tracking a new user-agent turn
class CascadingMetricsData (session_id: str | None = None,
session_start_time: float = <factory>,
system_instructions: str = '',
total_interruptions: int = 0,
total_turns: int = 0,
turns: List[CascadingTurnData] = <factory>,
current_turn: CascadingTurnData | None = None,
user_speech_end_time: float | None = None,
agent_speech_start_time: float | None = None,
stt_start_time: float | None = None,
llm_start_time: float | None = None,
tts_start_time: float | None = None,
eou_start_time: float | None = None,
user_input_start_time: float | None = None,
is_agent_speaking: bool = False,
is_user_speaking: bool = False,
tts_first_byte_time: float | None = None,
llm_provider_class: str = '',
llm_model_name: str = '',
stt_provider_class: str = '',
stt_model_name: str = '',
tts_provider_class: str = '',
tts_model_name: str = '',
vad_provider_class: str = '',
vad_model_name: str = '',
eou_provider_class: str = '',
eou_model_name: str = '')-
Expand source code
@dataclass class CascadingMetricsData: """Data structure to hold all metrics for a session""" session_id: Optional[str] = None session_start_time: float = field(default_factory=time.time) system_instructions: str = "" total_interruptions: int = 0 total_turns: int = 0 turns: List[CascadingTurnData] = field(default_factory=list) current_turn: Optional[CascadingTurnData] = None user_speech_end_time: Optional[float] = None agent_speech_start_time: Optional[float] = None stt_start_time: Optional[float] = None llm_start_time: Optional[float] = None tts_start_time: Optional[float] = None eou_start_time: Optional[float] = None user_input_start_time: Optional[float] = None is_agent_speaking: bool = False is_user_speaking: bool = False tts_first_byte_time: Optional[float] = None llm_provider_class: str = "" llm_model_name: str = "" stt_provider_class: str = "" stt_model_name: str = "" tts_provider_class: str = "" tts_model_name: str = "" vad_provider_class: str = "" vad_model_name: str = "" eou_provider_class: str = "" eou_model_name: str = ""Data structure to hold all metrics for a session
Instance variables
var agent_speech_start_time : float | Nonevar current_turn : CascadingTurnData | Nonevar eou_model_name : strvar eou_provider_class : strvar eou_start_time : float | Nonevar is_agent_speaking : boolvar is_user_speaking : boolvar llm_model_name : strvar llm_provider_class : strvar llm_start_time : float | Nonevar session_id : str | Nonevar session_start_time : floatvar stt_model_name : strvar stt_provider_class : strvar stt_start_time : float | Nonevar system_instructions : strvar total_interruptions : intvar total_turns : intvar tts_first_byte_time : float | Nonevar tts_model_name : strvar tts_provider_class : strvar tts_start_time : float | Nonevar turns : List[CascadingTurnData]var user_input_start_time : float | Nonevar user_speech_end_time : float | Nonevar vad_model_name : strvar vad_provider_class : str
class CascadingTurnData (user_speech_start_time: float | None = None,
user_speech_end_time: float | None = None,
stt_start_time: float | None = None,
stt_end_time: float | None = None,
stt_latency: float | None = None,
llm_start_time: float | None = None,
llm_end_time: float | None = None,
llm_latency: float | None = None,
tts_start_time: float | None = None,
tts_end_time: float | None = None,
tts_latency: float | None = None,
ttfb: float | None = None,
eou_start_time: float | None = None,
eou_end_time: float | None = None,
eou_latency: float | None = None,
function_tool_timestamps: List[Dict[str, Any]] = <factory>,
e2e_latency: float | None = None,
interrupted: bool = False,
timestamp: float = <factory>,
function_tools_called: List[str] = <factory>,
system_instructions: str = '',
llm_provider_class: str = '',
llm_model_name: str = '',
stt_provider_class: str = '',
stt_model_name: str = '',
tts_provider_class: str = '',
tts_model_name: str = '',
vad_provider_class: str = '',
vad_model_name: str = '',
eou_provider_class: str = '',
eou_model_name: str = '',
timeline: List[TimelineEvent] = <factory>,
errors: List[Dict[str, Any]] = <factory>,
is_a2a_enabled: bool = False,
handoff_occurred: bool = False)-
Expand source code
@dataclass class CascadingTurnData: """Data structure for a single user-agent turn""" user_speech_start_time: Optional[float] = None user_speech_end_time: Optional[float] = None stt_start_time: Optional[float] = None stt_end_time: Optional[float] = None stt_latency: Optional[float] = None llm_start_time: Optional[float] = None llm_end_time: Optional[float] = None llm_latency: Optional[float] = None tts_start_time: Optional[float] = None tts_end_time: Optional[float] = None tts_latency: Optional[float] = None ttfb: Optional[float] = None eou_start_time: Optional[float] = None eou_end_time: Optional[float] = None eou_latency: Optional[float] = None function_tool_timestamps: List[Dict[str, Any]] = field(default_factory=list) e2e_latency: Optional[float] = None interrupted: bool = False timestamp: float = field(default_factory=time.time) function_tools_called: List[str] = field(default_factory=list) system_instructions: str = "" llm_provider_class: str = "" llm_model_name: str = "" stt_provider_class: str = "" stt_model_name: str = "" tts_provider_class: str = "" tts_model_name: str = "" vad_provider_class: str = "" vad_model_name: str = "" eou_provider_class: str = "" eou_model_name: str = "" timeline: List[TimelineEvent] = field(default_factory=list) errors: List[Dict[str, Any]] = field(default_factory=list) is_a2a_enabled: bool = False handoff_occurred: bool = FalseData structure for a single user-agent turn
Instance variables
var e2e_latency : float | Nonevar eou_end_time : float | Nonevar eou_latency : float | Nonevar eou_model_name : strvar eou_provider_class : strvar eou_start_time : float | Nonevar errors : List[Dict[str, Any]]var function_tool_timestamps : List[Dict[str, Any]]var function_tools_called : List[str]var handoff_occurred : boolvar interrupted : boolvar is_a2a_enabled : boolvar llm_end_time : float | Nonevar llm_latency : float | Nonevar llm_model_name : strvar llm_provider_class : strvar llm_start_time : float | Nonevar stt_end_time : float | Nonevar stt_latency : float | Nonevar stt_model_name : strvar stt_provider_class : strvar stt_start_time : float | Nonevar system_instructions : strvar timeline : List[TimelineEvent]var timestamp : floatvar ttfb : float | Nonevar tts_end_time : float | Nonevar tts_latency : float | Nonevar tts_model_name : strvar tts_provider_class : strvar tts_start_time : float | Nonevar user_speech_end_time : float | Nonevar user_speech_start_time : float | Nonevar vad_model_name : strvar vad_provider_class : str
class RealtimeMetricsCollector-
Expand source code
class RealtimeMetricsCollector: _agent_info: Dict[str, Any] = { "provider_class_name": None, "provider_model_name": None, "system_instructions": None, "function_tools": [], "mcp_tools": [] } def __init__(self) -> None: self.turns: List[RealtimeTurnData] = [] self.current_turn: Optional[RealtimeTurnData] = None self.lock = asyncio.Lock() self.agent_speech_end_timer: Optional[asyncio.TimerHandle] = None self.analytics_client = AnalyticsClient() self.traces_flow_manager: Optional[TracesFlowManager] = None def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.analytics_client.set_session_id(session_id) def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance for realtime tracing""" self.traces_flow_manager = manager def _transform_to_camel_case(self, data: Dict[str, Any]) -> Dict[str, Any]: """Converts snake_case to camelCase for analytics reporting.""" def to_camel_case(snake_str: str) -> str: parts = snake_str.split('_') return parts[0] + ''.join(x.title() for x in parts[1:]) if isinstance(data, list): return [self._transform_to_camel_case(item) for item in data] if isinstance(data, dict): return {to_camel_case(k): self._transform_to_camel_case(v) for k, v in data.items()} return data async def start_session(self, agent: Agent, pipeline: Pipeline) -> None: RealtimeMetricsCollector._agent_info = { "provider_class_name": pipeline.model.__class__.__name__, "provider_model_name": getattr(pipeline.model, "model", None), "system_instructions": agent.instructions, "function_tools": [ getattr(tool, "name", tool.__name__ if callable(tool) else str(tool)) for tool in ( [tool for tool in agent.tools if tool not in agent.mcp_manager.tools] if agent.mcp_manager else agent.tools ) ] if agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in agent.mcp_manager.tools ] if agent.mcp_manager else [], } self.turns = [] self.current_turn = None from . import cascading_metrics_collector as cascading_metrics_collector traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: self.set_traces_flow_manager(traces_flow_manager) config_attributes = { **RealtimeMetricsCollector._agent_info, "pipeline": pipeline.__class__.__name__, "llm_provider": pipeline.model.__class__.__name__, } await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({}) async def _start_new_interaction(self) -> None: async with self.lock: self.current_turn = RealtimeTurnData( **RealtimeMetricsCollector._agent_info ) self.turns.append(self.current_turn) async def set_user_speech_start(self) -> None: if self.current_turn: self._finalize_interaction_and_send() await self._start_new_interaction() if self.current_turn and self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") async def set_user_speech_end(self) -> None: if self.current_turn and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") async def set_agent_speech_start(self) -> None: if not self.current_turn: await self._start_new_interaction() elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") if self.current_turn and self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() async def set_agent_speech_end(self, timeout: float = 1.0) -> None: if self.current_turn: if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() loop = asyncio.get_event_loop() self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send) await self.end_timeline_event("agent_speech") async def set_a2a_handoff(self) -> None: """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.current_turn: self.current_turn.is_a2a_enabled = True self.current_turn.handoff_occurred = True def _finalize_agent_speech(self) -> None: if not self.current_turn or self.current_turn.agent_speech_end_time is not None: return is_valid_interaction = self.current_turn.user_speech_start_time is not None or \ self.current_turn.agent_speech_start_time is not None if not is_valid_interaction: return self.current_turn.agent_speech_end_time = time.perf_counter() self.agent_speech_end_timer = None def _finalize_interaction_and_send(self) -> None: if not self.current_turn: return self._finalize_agent_speech() if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time: self.current_turn.user_speech_end_time = time.perf_counter() current_time = time.perf_counter() for event in self.current_turn.timeline: if event.end_time is None: event.end_time = current_time event.duration_ms = round((current_time - event.start_time) * 1000, 4) is_valid_interaction = self.current_turn.user_speech_start_time is not None or \ self.current_turn.agent_speech_start_time is not None if not is_valid_interaction: self.current_turn = None return self.current_turn.compute_latencies() if not hasattr(self, 'traces_flow_manager') or self.traces_flow_manager is None: try: from . import cascading_metrics_collector as cascading_metrics_collector self.traces_flow_manager = cascading_metrics_collector.traces_flow_manager except Exception: pass if self.traces_flow_manager: self.traces_flow_manager.create_realtime_turn_trace(self.current_turn) interaction_data = asdict(self.current_turn) fields_to_remove = ["realtime_model_errors","is_a2a_enabled","session_id","agent_speech_duration","agent_speech_start_time","agent_speech_end_time","thinking_time","function_tools","mcp_tools"] if len(self.turns) > 1: fields_to_remove.extend([ "provider_class_name", "provider_model_name", "system_instructions","function_tools", "mcp_tools"]) if not self.current_turn.is_a2a_enabled: fields_to_remove.extend(["handoff_occurred"]) for field in fields_to_remove: if field in interaction_data: del interaction_data[field] transformed_data = self._transform_to_camel_case(interaction_data) self.analytics_client.send_interaction_analytics_safe({ "data": [transformed_data] }) self.turns.append(self.current_turn) self.current_turn = None async def add_timeline_event(self, event: TimelineEvent) -> None: if self.current_turn: self.current_turn.timeline.append(event) async def start_timeline_event(self, event_type: str) -> None: """Start a timeline event with a precise start time""" if self.current_turn: event = TimelineEvent( event_type=event_type, start_time=time.perf_counter() ) self.current_turn.timeline.append(event) async def end_timeline_event(self, event_type: str) -> None: """End a timeline event and calculate duration""" if self.current_turn: end_time = time.perf_counter() for event in reversed(self.current_turn.timeline): if event.event_type == event_type and event.end_time is None: event.end_time = end_time event.duration_ms = round((end_time - event.start_time) * 1000, 4) break async def update_timeline_event_text(self, event_type: str, text: str) -> None: """Update timeline event with text content""" if self.current_turn: for event in reversed(self.current_turn.timeline): if event.event_type == event_type and not event.text: event.text = text break async def add_tool_call(self, tool_name: str) -> None: if self.current_turn and tool_name not in self.current_turn.function_tools_called: self.current_turn.function_tools_called.append(tool_name) logger.info(f"function tool called: {tool_name}") async def set_user_transcript(self, text: str) -> None: """Set the user transcript for the current turn and update timeline""" if self.current_turn: if self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") if self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") logger.info(f"user input speech: {text}") await self.update_timeline_event_text("user_speech", text) async def set_agent_response(self, text: str) -> None: """Set the agent response for the current turn and update timeline""" if self.current_turn: if self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") logger.info(f"agent output speech: {text}") await self.update_timeline_event_text("agent_speech", text) def set_realtime_model_error(self, error: Dict[str, Any]) -> None: """Set a realtime model-specific error for the current turn""" if self.current_turn: logger.error(f"realtime model error: {error}") self.current_turn.realtime_model_errors.append(error) async def set_interrupted(self) -> None: if self.current_turn: self.current_turn.interrupted = True def finalize_session(self) -> None: asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop())Methods
async def add_timeline_event(self,
event: TimelineEvent) ‑> None-
Expand source code
async def add_timeline_event(self, event: TimelineEvent) -> None: if self.current_turn: self.current_turn.timeline.append(event) async def add_tool_call(self, tool_name: str) ‑> None-
Expand source code
async def add_tool_call(self, tool_name: str) -> None: if self.current_turn and tool_name not in self.current_turn.function_tools_called: self.current_turn.function_tools_called.append(tool_name) logger.info(f"function tool called: {tool_name}") async def end_timeline_event(self, event_type: str) ‑> None-
Expand source code
async def end_timeline_event(self, event_type: str) -> None: """End a timeline event and calculate duration""" if self.current_turn: end_time = time.perf_counter() for event in reversed(self.current_turn.timeline): if event.event_type == event_type and event.end_time is None: event.end_time = end_time event.duration_ms = round((end_time - event.start_time) * 1000, 4) breakEnd a timeline event and calculate duration
def finalize_session(self) ‑> None-
Expand source code
def finalize_session(self) -> None: asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop()) async def set_a2a_handoff(self) ‑> None-
Expand source code
async def set_a2a_handoff(self) -> None: """Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.""" if self.current_turn: self.current_turn.is_a2a_enabled = True self.current_turn.handoff_occurred = TrueSet the A2A enabled and handoff occurred flags for the current turn in A2A scenarios.
async def set_agent_response(self, text: str) ‑> None-
Expand source code
async def set_agent_response(self, text: str) -> None: """Set the agent response for the current turn and update timeline""" if self.current_turn: if self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") logger.info(f"agent output speech: {text}") await self.update_timeline_event_text("agent_speech", text)Set the agent response for the current turn and update timeline
async def set_agent_speech_end(self, timeout: float = 1.0) ‑> None-
Expand source code
async def set_agent_speech_end(self, timeout: float = 1.0) -> None: if self.current_turn: if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() loop = asyncio.get_event_loop() self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send) await self.end_timeline_event("agent_speech") async def set_agent_speech_start(self) ‑> None-
Expand source code
async def set_agent_speech_start(self) -> None: if not self.current_turn: await self._start_new_interaction() elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") if self.current_turn and self.current_turn.agent_speech_start_time is None: self.current_turn.agent_speech_start_time = time.perf_counter() await self.start_timeline_event("agent_speech") if self.agent_speech_end_timer: self.agent_speech_end_timer.cancel() async def set_interrupted(self) ‑> None-
Expand source code
async def set_interrupted(self) -> None: if self.current_turn: self.current_turn.interrupted = True def set_realtime_model_error(self, error: Dict[str, Any]) ‑> None-
Expand source code
def set_realtime_model_error(self, error: Dict[str, Any]) -> None: """Set a realtime model-specific error for the current turn""" if self.current_turn: logger.error(f"realtime model error: {error}") self.current_turn.realtime_model_errors.append(error)Set a realtime model-specific error for the current turn
def set_session_id(self, session_id: str)-
Expand source code
def set_session_id(self, session_id: str): """Set the session ID for metrics tracking""" self.analytics_client.set_session_id(session_id)Set the session ID for metrics tracking
def set_traces_flow_manager(self, manager: TracesFlowManager)-
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager): """Set the TracesFlowManager instance for realtime tracing""" self.traces_flow_manager = managerSet the TracesFlowManager instance for realtime tracing
async def set_user_speech_end(self) ‑> None-
Expand source code
async def set_user_speech_end(self) -> None: if self.current_turn and self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") async def set_user_speech_start(self) ‑> None-
Expand source code
async def set_user_speech_start(self) -> None: if self.current_turn: self._finalize_interaction_and_send() await self._start_new_interaction() if self.current_turn and self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") async def set_user_transcript(self, text: str) ‑> None-
Expand source code
async def set_user_transcript(self, text: str) -> None: """Set the user transcript for the current turn and update timeline""" if self.current_turn: if self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = time.perf_counter() await self.start_timeline_event("user_speech") if self.current_turn.user_speech_end_time is None: self.current_turn.user_speech_end_time = time.perf_counter() await self.end_timeline_event("user_speech") logger.info(f"user input speech: {text}") await self.update_timeline_event_text("user_speech", text)Set the user transcript for the current turn and update timeline
async def start_session(self, agent: Agent, pipeline: Pipeline) ‑> None-
Expand source code
async def start_session(self, agent: Agent, pipeline: Pipeline) -> None: RealtimeMetricsCollector._agent_info = { "provider_class_name": pipeline.model.__class__.__name__, "provider_model_name": getattr(pipeline.model, "model", None), "system_instructions": agent.instructions, "function_tools": [ getattr(tool, "name", tool.__name__ if callable(tool) else str(tool)) for tool in ( [tool for tool in agent.tools if tool not in agent.mcp_manager.tools] if agent.mcp_manager else agent.tools ) ] if agent.tools else [], "mcp_tools": [ tool._tool_info.name for tool in agent.mcp_manager.tools ] if agent.mcp_manager else [], } self.turns = [] self.current_turn = None from . import cascading_metrics_collector as cascading_metrics_collector traces_flow_manager = cascading_metrics_collector.traces_flow_manager if traces_flow_manager: self.set_traces_flow_manager(traces_flow_manager) config_attributes = { **RealtimeMetricsCollector._agent_info, "pipeline": pipeline.__class__.__name__, "llm_provider": pipeline.model.__class__.__name__, } await traces_flow_manager.start_agent_session_config(config_attributes) await traces_flow_manager.start_agent_session({}) async def start_timeline_event(self, event_type: str) ‑> None-
Expand source code
async def start_timeline_event(self, event_type: str) -> None: """Start a timeline event with a precise start time""" if self.current_turn: event = TimelineEvent( event_type=event_type, start_time=time.perf_counter() ) self.current_turn.timeline.append(event)Start a timeline event with a precise start time
async def update_timeline_event_text(self, event_type: str, text: str) ‑> None-
Expand source code
async def update_timeline_event_text(self, event_type: str, text: str) -> None: """Update timeline event with text content""" if self.current_turn: for event in reversed(self.current_turn.timeline): if event.event_type == event_type and not event.text: event.text = text breakUpdate timeline event with text content
class TimelineEvent (event_type: str,
start_time: float,
end_time: float | None = None,
duration_ms: float | None = None,
text: str = '')-
Expand source code
@dataclass class TimelineEvent: """Data structure for a single timeline event""" event_type: str start_time: float end_time: Optional[float] = None duration_ms: Optional[float] = None text: str = ""Data structure for a single timeline event
Instance variables
var duration_ms : float | Nonevar end_time : float | Nonevar event_type : strvar start_time : floatvar text : str