Module agents.metrics
Sub-modules
agents.metrics.analyticsagents.metrics.integrationagents.metrics.logger_handleragents.metrics.metrics_collectoragents.metrics.metrics_schemaagents.metrics.telemetryagents.metrics.traces_flow
Functions
def auto_initialize_telemetry_and_logs(room_id: str,
peer_id: str,
room_attributes: Dict[str, Any] = None,
session_id: str = None,
sdk_metadata: Dict[str, Any] = None,
custom_traces_config: Dict[str, Any] = None)-
Expand source code
def auto_initialize_telemetry_and_logs(room_id: str, peer_id: str, room_attributes: Dict[str, Any] = None, session_id: str = None, sdk_metadata: Dict[str, Any] = None, custom_traces_config: Dict[str, Any] = None): """ Auto-initialize telemetry and logs from room attributes """ if not room_attributes: return observability_jwt = room_attributes.get('observability', '') traces_config = custom_traces_config or room_attributes.get('traces', {}) if traces_config.get('enabled'): metadata = { } initialize_telemetry( room_id=room_id, peer_id=peer_id, sdk_name="agents", observability_jwt=observability_jwt, traces_config=traces_config, metadata=metadata, sdk_metadata=sdk_metadata )Auto-initialize telemetry and logs from room attributes
def complete_span(span: opentelemetry.trace.span.Span | None,
status_code,
message: str = '',
end_time: float | None = None)-
Expand source code
def complete_span(span: Optional[Span], status_code, message: str = "", end_time: Optional[float] = None): """ Complete a trace span (convenience method) Args: span: Span to complete status_code: Status code message: Status message end_time: End time in seconds since epoch (optional) """ telemetry = get_telemetry() if telemetry and span: telemetry.complete_span(span, status_code, message, end_time)Complete a trace span (convenience method)
Args
span- Span to complete
status_code- Status code
message- Status message
end_time- End time in seconds since epoch (optional)
def create_span(span_name: str,
attributes: Dict[str, Any] = None,
parent_span: opentelemetry.trace.span.Span | None = None,
start_time: float | None = None)-
Expand source code
def create_span(span_name: str, attributes: Dict[str, Any] = None, parent_span: Optional[Span] = None, start_time: Optional[float] = None): """ Create a trace span (convenience method) Args: span_name: Name of the span attributes: Span attributes parent_span: Parent span (optional) start_time: Start time in seconds since epoch (optional) Returns: Span object or None """ telemetry = get_telemetry() if telemetry: return telemetry.trace(span_name, attributes, parent_span, start_time) return NoneCreate a trace span (convenience method)
Args
span_name- Name of the span
attributes- Span attributes
parent_span- Parent span (optional)
start_time- Start time in seconds since epoch (optional)
Returns
Span object or None
Classes
class JobLogger (queue: queue.Queue,
room_id: str,
peer_id: str,
auth_token: str,
session_id: str | None = None,
dashboard_log_level: str = 'INFO',
sdk_metadata: Dict[str, Any] | None = None,
send_logs_to_dashboard: bool = False)-
Expand source code
class JobLogger: """ Intercepts ALL existing ``logger.info()`` / ``.debug()`` / ``.warning()`` etc. calls under the ``videosdk.agents`` namespace by attaching a ``QueueHandler`` to the parent logger. * Does ZERO I/O – records are written to a ``queue.Queue``. * Attaches job context (roomId, sessionId, peerId, …) via a Filter. * Thread-safe: queue.Queue is thread-safe; multiple jobs each own their own queue, so no cross-agent interference. """ def __init__( self, queue: "queue.Queue", room_id: str, peer_id: str, auth_token: str, session_id: Optional[str] = None, dashboard_log_level: str = "INFO", sdk_metadata: Optional[Dict[str, Any]] = None, send_logs_to_dashboard: bool = False, ): self._queue = queue self._context: Dict[str, Any] = { "roomId": room_id, "peerId": peer_id, "authToken": auth_token, "sessionId": session_id or "", "sdk_name": "", "sdk_version": "", "service_name": "videosdk-otel-telemetry-agents", "send_logs_to_dashboard": send_logs_to_dashboard, } if sdk_metadata: self._context["sdk_name"] = sdk_metadata.get("sdk", "AGENTS").upper() self._context["sdk_version"] = sdk_metadata.get("sdk_version", "0.0.62") self._parent_logger = logging.getLogger("videosdk.agents") target_level = getattr(logging, dashboard_log_level.upper(), logging.INFO) if self._parent_logger.getEffectiveLevel() > target_level: self._parent_logger.setLevel(target_level) self._queue_handler = QueueHandler(queue) self._queue_handler.setLevel(target_level) self._context_filter = _JobContextFilter(self._context) self._queue_handler.addFilter(self._context_filter) self._parent_logger.addHandler(self._queue_handler) def update_context(self, **kwargs) -> None: """Update context fields (e.g. sessionId once available).""" self._context.update(kwargs) def set_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None: """ Send the log endpoint and optional custom headers to the consumer thread via the queue so the LogConsumerHandler can start posting logs. Called from room.py when room attributes become available. """ record = logging.LogRecord( name="videosdk.agents._config", level=logging.INFO, pathname="", lineno=0, msg="", args=(), exc_info=None, ) record._config_endpoint = endpoint record._config_jwt_key = jwt_key record._config_custom_headers = custom_headers self._queue.put_nowait(record) def cleanup(self) -> None: """Remove the QueueHandler from the parent logger (called on job end).""" self._parent_logger.removeHandler(self._queue_handler)Intercepts ALL existing
logger.info()/.debug()/.warning()etc. calls under thevideosdk.agentsnamespace by attaching aQueueHandlerto the parent logger.- Does ZERO I/O – records are written to a
queue.Queue. - Attaches job context (roomId, sessionId, peerId, …) via a Filter.
- Thread-safe: queue.Queue is thread-safe; multiple jobs each own their own queue, so no cross-agent interference.
Methods
def cleanup(self) ‑> None-
Expand source code
def cleanup(self) -> None: """Remove the QueueHandler from the parent logger (called on job end).""" self._parent_logger.removeHandler(self._queue_handler)Remove the QueueHandler from the parent logger (called on job end).
def set_endpoint(self,
endpoint: str,
jwt_key: str = '',
custom_headers: Dict[str, str] | None = None) ‑> None-
Expand source code
def set_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None: """ Send the log endpoint and optional custom headers to the consumer thread via the queue so the LogConsumerHandler can start posting logs. Called from room.py when room attributes become available. """ record = logging.LogRecord( name="videosdk.agents._config", level=logging.INFO, pathname="", lineno=0, msg="", args=(), exc_info=None, ) record._config_endpoint = endpoint record._config_jwt_key = jwt_key record._config_custom_headers = custom_headers self._queue.put_nowait(record)Send the log endpoint and optional custom headers to the consumer thread via the queue so the LogConsumerHandler can start posting logs.
Called from room.py when room attributes become available.
def update_context(self, **kwargs) ‑> None-
Expand source code
def update_context(self, **kwargs) -> None: """Update context fields (e.g. sessionId once available).""" self._context.update(kwargs)Update context fields (e.g. sessionId once available).
- Does ZERO I/O – records are written to a
class LogManager (batch_size: int = 50, flush_interval_seconds: float = 5.0)-
Expand source code
class LogManager: """ One instance per **Job** (not per Worker). Owns: * A ``queue.Queue`` (thread-safe, no extra manager process needed). * A ``QueueListener`` that consumes from the queue and dispatches to ``LogConsumerHandler``. Lifecycle:: mgr = LogManager() mgr.start(auth_token="...") # starts background listener thread # … job runs, logs are captured … mgr.stop() # flushes & tears down The ``queue`` attribute is passed to ``JobLogger`` (producer side). """ def __init__( self, batch_size: int = 50, flush_interval_seconds: float = 5.0, ): self.queue: queue.Queue = queue.Queue() self._listener: Optional[QueueListener] = None self._consumer_handler: Optional[LogConsumerHandler] = None self._batch_size = batch_size self._flush_interval = flush_interval_seconds def start(self, auth_token: str = "") -> None: """Start consuming from the queue.""" self._consumer_handler = LogConsumerHandler( auth_token=auth_token, batch_size=self._batch_size, flush_interval_seconds=self._flush_interval, ) self._listener = QueueListener( self.queue, self._consumer_handler, respect_handler_level=True ) self._listener.start() def update_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None: """Set/update the endpoint and flush buffered records.""" if self._consumer_handler: self._consumer_handler.update_endpoint(endpoint, jwt_key, custom_headers) def stop(self) -> None: """Stop the listener and flush remaining logs.""" if self._listener: self._listener.stop() self._listener = None if self._consumer_handler: self._consumer_handler.close() self._consumer_handler = None def get_queue(self) -> queue.Queue: """Return the shared queue for the JobLogger (producer).""" return self.queueOne instance per Job (not per Worker).
Owns: * A
queue.Queue(thread-safe, no extra manager process needed). * AQueueListenerthat consumes from the queue and dispatches toLogConsumerHandler.Lifecycle::
mgr = LogManager() mgr.start(auth_token="...") # starts background listener thread # … job runs, logs are captured … mgr.stop() # flushes & tears downThe
queueattribute is passed toJobLogger(producer side).Methods
def get_queue(self) ‑> queue.Queue-
Expand source code
def get_queue(self) -> queue.Queue: """Return the shared queue for the JobLogger (producer).""" return self.queueReturn the shared queue for the JobLogger (producer).
def start(self, auth_token: str = '') ‑> None-
Expand source code
def start(self, auth_token: str = "") -> None: """Start consuming from the queue.""" self._consumer_handler = LogConsumerHandler( auth_token=auth_token, batch_size=self._batch_size, flush_interval_seconds=self._flush_interval, ) self._listener = QueueListener( self.queue, self._consumer_handler, respect_handler_level=True ) self._listener.start()Start consuming from the queue.
def stop(self) ‑> None-
Expand source code
def stop(self) -> None: """Stop the listener and flush remaining logs.""" if self._listener: self._listener.stop() self._listener = None if self._consumer_handler: self._consumer_handler.close() self._consumer_handler = NoneStop the listener and flush remaining logs.
def update_endpoint(self,
endpoint: str,
jwt_key: str = '',
custom_headers: Dict[str, str] | None = None) ‑> None-
Expand source code
def update_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None: """Set/update the endpoint and flush buffered records.""" if self._consumer_handler: self._consumer_handler.update_endpoint(endpoint, jwt_key, custom_headers)Set/update the endpoint and flush buffered records.
class MetricsCollector-
Expand source code
class MetricsCollector: """Single metrics collector for all pipeline modes. Replaces both CascadingMetricsCollector and RealtimeMetricsCollector with one collector that uses the component-wise metrics schema. """ def __init__(self) -> None: self.session = SessionMetrics() self.current_turn: Optional[TurnMetrics] = None self.turns: List[TurnMetrics] = [] self.analytics_client = AnalyticsClient() self.traces_flow_manager: Optional[TracesFlowManager] = None self.playground_manager: Optional[PlaygroundManager] = None self.playground: bool = False # Pipeline configuration self.pipeline_mode: Optional[str] = None self.realtime_mode: Optional[str] = None self.active_components: Optional[frozenset] = None # Transient timing state (not part of schema) self._stt_start_time: Optional[float] = None self._llm_start_time: Optional[float] = None self._tts_start_time: Optional[float] = None self._eou_start_time: Optional[float] = None self._tts_first_byte_time: Optional[float] = None # Speech state tracking self._is_agent_speaking: bool = False self._is_user_speaking: bool = False self._user_input_start_time: Optional[float] = None self._user_speech_end_time: Optional[float] = None self._agent_speech_start_time: Optional[float] = None self._pending_user_start_time: Optional[float] = None # Turn counting self._total_turns: int = 0 self._total_interruptions: int = 0 # Realtime agent speech end timer self._agent_speech_end_timer: Optional[asyncio.TimerHandle] = None self.preemtive_generation_enabled: bool = False self.session_set_in_trace: bool = False self.transport_mode: Optional[str] = None # Buffered data from an interrupted turn's interrupting speech self._pending_interrupt_timeline: Optional[dict] = None self._pending_interrupt_stt: Optional['SttMetrics'] = None self._pending_interrupt_eou: Optional['EouMetrics'] = None self._pending_interrupt_vad: Optional['VadMetrics'] = None # Audio playback timing (session-level) self._background_audio_start_time: Optional[float] = None self._thinking_audio_start_time: Optional[float] = None # ────────────────────────────────────────────── # Session lifecycle # ────────────────────────────────────────────── def configure_pipeline( self, pipeline_mode: Any, realtime_mode: Any = None, active_components: Any = None, ) -> None: """Configure the collector with pipeline information. Args: pipeline_mode: PipelineMode enum value realtime_mode: RealtimeMode enum value (optional) active_components: frozenset of PipelineComponent (optional) """ self.pipeline_mode = pipeline_mode.value if hasattr(pipeline_mode, "value") else str(pipeline_mode) self.realtime_mode = realtime_mode.value if realtime_mode and hasattr(realtime_mode, "value") else (str(realtime_mode) if realtime_mode else None) self.active_components = active_components self.session.pipeline_mode = self.pipeline_mode self.session.realtime_mode = self.realtime_mode if active_components: self.session.components = sorted( c.value if hasattr(c, "value") else str(c) for c in active_components ) logger.info(f"[metrics] Pipeline configured: mode={self.pipeline_mode}, realtime={self.realtime_mode}") def set_session_id(self, session_id: str) -> None: """Set the session ID for metrics tracking.""" self.session.session_id = session_id self.analytics_client.set_session_id(session_id) def set_system_instructions(self, instructions: str) -> None: """Set the system instructions for this session.""" self.session.system_instruction = instructions def set_provider_info(self, component_type: str, provider_class: str, model_name: str) -> None: """Set provider info for a specific component. Args: component_type: e.g. "stt", "llm", "tts", "vad", "eou", "realtime" provider_class: class name of the provider model_name: model identifier """ self.session.provider_per_component[component_type] = { "provider_class": provider_class, "model_name": model_name, } def update_provider_class(self, component_type: str, provider_class: str, model_name: str = "") -> None: """Update the provider class and model name for a specific component when fallback occurs. Args: component_type: "STT", "LLM", "TTS", etc. provider_class: The new provider class name (e.g., "GoogleLLM") model_name: The new model name """ target_type = component_type.lower() if target_type in self.session.provider_per_component: self.session.provider_per_component[target_type]["provider_class"] = provider_class if model_name: self.session.provider_per_component[target_type]["model_name"] = model_name logger.info(f"Updated {target_type} provider class to: {provider_class}, model: {model_name}") @staticmethod def _eou_config_to_dict(eou_config: Any) -> Dict[str, Any]: """Convert EOUConfig to a serializable dict for session storage.""" if eou_config is None: return {} return { "mode": getattr(eou_config, "mode", None), "min_max_speech_wait_timeout": getattr(eou_config, "min_max_speech_wait_timeout", None), } @staticmethod def _interrupt_config_to_dict(interrupt_config: Any) -> Dict[str, Any]: """Convert InterruptConfig to a serializable dict for session storage.""" if interrupt_config is None: return {} return { "mode": getattr(interrupt_config, "mode", None), "interrupt_min_duration": getattr(interrupt_config, "interrupt_min_duration", None), "interrupt_min_words": getattr(interrupt_config, "interrupt_min_words", None), "false_interrupt_pause_duration": getattr(interrupt_config, "false_interrupt_pause_duration", None), "resume_on_false_interrupt": getattr(interrupt_config, "resume_on_false_interrupt", None), } def set_eou_config(self, eou_config: Any) -> None: """Store EOU config on session for later use (internal tracking, not sent).""" self.session.eou_config = self._eou_config_to_dict(eou_config) def set_interrupt_config(self, interrupt_config: Any) -> None: """Store Interrupt config on session for later use (internal tracking, not sent).""" self.session.interrupt_config = self._interrupt_config_to_dict(interrupt_config) def add_participant_metrics( self, participant_id: str, kind: Optional[str] = None, sip_user: Optional[bool] = None, join_time: Optional[float] = None, meta: Optional[Dict[str, Any]] = None, ) -> None: """Append a participant entry (agent or user) into session.participant_metrics.""" self.session.participant_metrics.append( ParticipantMetrics( participant_id=participant_id, kind=kind, sip_user=sip_user, join_time=join_time, meta=meta, ) ) self.traces_flow_manager.participant_metrics.append( ParticipantMetrics( participant_id=participant_id, kind=kind, sip_user=sip_user, join_time=join_time, ) ) def set_traces_flow_manager(self, manager: TracesFlowManager) -> None: """Set the TracesFlowManager instance.""" self.traces_flow_manager = manager def set_playground_manager(self, manager: Optional[PlaygroundManager]) -> None: """Set the PlaygroundManager instance.""" self.playground = True self.playground_manager = manager def finalize_session(self) -> None: """Finalize the session, completing any in-progress turn.""" if self.current_turn: self.complete_turn() self.session.session_end_time = time.time() # ────────────────────────────────────────────── # Turn lifecycle # ────────────────────────────────────────────── def _generate_turn_id(self) -> str: """Generate a hash-based turn ID.""" timestamp = str(time.time()) session_id = self.session.session_id or "default" turn_count = str(self._total_turns) hash_input = f"{timestamp}_{session_id}_{turn_count}" return hashlib.md5(hash_input.encode()).hexdigest()[:16] @staticmethod def _round_latency(latency: float) -> float: """Convert latency from seconds to milliseconds and round.""" return round(latency * 1000, 4) def start_turn(self) -> None: """Start a new turn. Completes any existing turn first.""" logger.info(f"[metrics] start_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}") if self.current_turn: logger.info(f"[metrics] start_turn() completing existing turn before creating new one") self.complete_turn() self._total_turns += 1 self.current_turn = TurnMetrics(turn_id=self._generate_turn_id(), preemtive_generation_enabled=self.preemtive_generation_enabled) # Carry over pending user start time from a previously discarded turn if self._pending_user_start_time is not None: self.current_turn.user_speech_start_time = self._pending_user_start_time self._start_timeline_event("user_speech", self._pending_user_start_time) # Carry over buffered data from an interrupted turn if self._pending_interrupt_timeline is not None: tl = self._pending_interrupt_timeline event = TimelineEvent( event_type="user_speech", start_time=tl["start_time"], end_time=tl.get("end_time"), duration_ms=tl.get("duration_ms"), text=tl.get("text") ) self.current_turn.timeline_event_metrics.append(event) logger.info(f"[metrics] start_turn() carried over interrupt timeline: {tl.get('text')}") self._pending_interrupt_timeline = None if self._pending_interrupt_stt is not None: self.current_turn.stt_metrics.append(self._pending_interrupt_stt) logger.info(f"[metrics] start_turn() carried over interrupt STT: {self._pending_interrupt_stt.stt_transcript}") self._pending_interrupt_stt = None if self._pending_interrupt_eou is not None: self.current_turn.eou_metrics.append(self._pending_interrupt_eou) logger.info(f"[metrics] start_turn() carried over interrupt EOU: {self._pending_interrupt_eou.eou_latency}") self._pending_interrupt_eou = None if self._pending_interrupt_vad is not None: self.current_turn.vad_metrics.append(self._pending_interrupt_vad) logger.info(f"[metrics] start_turn() carried over interrupt VAD") self._pending_interrupt_vad = None # If user is currently speaking, capture the start time if self._is_user_speaking and self._user_input_start_time: if self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = self._user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics): self._start_timeline_event("user_speech", self._user_input_start_time) def complete_turn(self) -> None: """Complete the current turn, calculate E2E, validate, serialize and send.""" logger.info(f"[metrics] complete_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}") if not self.current_turn: logger.info(f"[metrics] complete_turn() early return — no current_turn") return self.current_turn.session_metrics = self.session # Calculate E2E latency self.current_turn.compute_e2e_latency() # Validate turn has meaningful data has_stt = bool(self.current_turn.stt_metrics and any(s.stt_latency is not None for s in self.current_turn.stt_metrics)) has_llm = bool(self.current_turn.llm_metrics and any(l.llm_ttft is not None for l in self.current_turn.llm_metrics)) has_tts = bool(self.current_turn.tts_metrics and any(t.tts_latency is not None or t.ttfb is not None for t in self.current_turn.tts_metrics)) logger.info(f"[metrics] complete_turn() validation | stt={has_stt} llm={has_llm} tts={has_tts} | user_speech_start={self.current_turn.user_speech_start_time is not None} | e2e={self.current_turn.e2e_latency}") if not self._validate_turn(self.current_turn) and self._total_turns > 1: logger.warning(f"[metrics] complete_turn() DISCARDING turn — validation failed | total_turns={self._total_turns}") # Cache user start time for next turn if self.current_turn.user_speech_start_time is not None: if (self._pending_user_start_time is None or self.current_turn.user_speech_start_time < self._pending_user_start_time): self._pending_user_start_time = self.current_turn.user_speech_start_time logger.info(f"[metrics] Caching earliest user start: {self._pending_user_start_time}") self.current_turn = None return # If this turn was interrupted, buffer the interrupting user's metrics # (STT, EOU, VAD, timeline) for carry-over to the next turn. if self.current_turn.is_interrupted: # Buffer STT metrics from interrupting speech if len(self.current_turn.stt_metrics) >= 2: self._pending_interrupt_stt = self.current_turn.stt_metrics.pop() logger.info(f"[metrics] complete_turn() buffering interrupt STT for next turn: {self._pending_interrupt_stt.stt_transcript}") # Buffer EOU metrics from interrupting speech if len(self.current_turn.eou_metrics) >= 2: self._pending_interrupt_eou = self.current_turn.eou_metrics.pop() logger.info(f"[metrics] complete_turn() buffering interrupt EOU for next turn: {self._pending_interrupt_eou.eou_latency}") # Buffer VAD metrics from interrupting speech and construct timeline from it if len(self.current_turn.vad_metrics) >= 2: self._pending_interrupt_vad = self.current_turn.vad_metrics.pop() # Build the pending timeline from the buffered VAD data self._pending_interrupt_timeline = { "start_time": self._pending_interrupt_vad.user_speech_start_time, "end_time": self._pending_interrupt_vad.user_speech_end_time, "duration_ms": ( round((self._pending_interrupt_vad.user_speech_end_time - self._pending_interrupt_vad.user_speech_start_time) * 1000, 4) if self._pending_interrupt_vad.user_speech_start_time and self._pending_interrupt_vad.user_speech_end_time else None ), "text": None, # Will be set by set_user_transcript in the next turn } logger.info(f"[metrics] complete_turn() buffering interrupt VAD + timeline for next turn") # Send trace if self.traces_flow_manager: self.traces_flow_manager.create_unified_turn_trace(self.current_turn, self.session) self.turns.append(self.current_turn) # Send to playground if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics=self.current_turn, full_turn_data=True) # Serialize and send analytics interaction_data = self._serialize_turn(self.current_turn) transformed_data = self._transform_to_camel_case(interaction_data) transformed_data = self._remove_internal_fields(transformed_data) if len(self.turns) > 1: self._remove_provider_fields(transformed_data) transformed_data = self._remove_negatives(transformed_data) global_event_emitter.emit("TURN_METRICS_ADDED", {"metrics": transformed_data}) self.analytics_client.send_interaction_analytics_safe(transformed_data) self.current_turn = None self._pending_user_start_time = None # Reset transient timing state so stale values don't leak into the next turn self._stt_start_time = None self._is_user_speaking = False def schedule_turn_complete(self, timeout: float = 1.0) -> None: """Schedule turn completion after a timeout (for realtime modes).""" if self._agent_speech_end_timer: self._agent_speech_end_timer.cancel() try: loop = asyncio.get_event_loop() self._agent_speech_end_timer = loop.call_later(timeout, self._finalize_realtime_turn) except RuntimeError: # No event loop available, complete immediately self.complete_turn() def _finalize_realtime_turn(self) -> None: """Finalize a realtime turn after the timeout.""" if not self.current_turn: return # Ensure agent speech end time is set if self.current_turn.agent_speech_start_time and not self.current_turn.agent_speech_end_time: self.current_turn.agent_speech_end_time = time.perf_counter() # Ensure user speech end time is set if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time: self.current_turn.user_speech_end_time = time.perf_counter() # Close any open timeline events current_time = time.perf_counter() for event in self.current_turn.timeline_event_metrics: if event.end_time is None: event.end_time = current_time event.duration_ms = round((current_time - event.start_time) * 1000, 4) # Compute realtime latencies self._compute_realtime_latencies() self._agent_speech_end_timer = None self.complete_turn() def _compute_realtime_latencies(self) -> None: """Compute realtime TTFB (used as E2E) and agent speech duration based on user and agent speech timestamps.""" turn = self.current_turn if not turn: return if turn.user_speech_end_time and turn.agent_speech_start_time: ttfb = round((turn.agent_speech_start_time - turn.user_speech_end_time) * 1000, 4) turn.e2e_latency = ttfb if turn.realtime_metrics: turn.realtime_metrics[-1].realtime_ttfb = ttfb if turn.agent_speech_start_time and turn.agent_speech_end_time: turn.agent_speech_duration = round( (turn.agent_speech_end_time - turn.agent_speech_start_time) * 1000, 4 ) # ────────────────────────────────────────────── # VAD metrics # ────────────────────────────────────────────── def on_user_speech_start(self) -> None: """Called when user starts speaking (VAD start).""" if not self.current_turn: self.start_turn() logger.info(f"[metrics] on_user_speech_start() called | _is_user_speaking={self._is_user_speaking} | current_turn={'exists' if self.current_turn else 'None'}") if self._is_user_speaking: logger.info(f"[metrics] on_user_speech_start() early return — already speaking") return self._is_user_speaking = True self._user_input_start_time = time.perf_counter() if self.current_turn: if self.current_turn.user_speech_start_time is None: logger.info(f"[DEBUG_START] on_user_speech_start() | setting user_speech_start_time={self._user_input_start_time}") self.current_turn.user_speech_start_time = self._user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics): self._start_timeline_event("user_speech", self._user_input_start_time) # Create new entry if none exists or last entry is already complete if not self.current_turn.vad_metrics or self.current_turn.vad_metrics[-1].user_speech_end_time is not None: self.current_turn.vad_metrics.append(VadMetrics()) self.current_turn.vad_metrics[-1].user_speech_start_time = self._user_input_start_time def on_user_speech_end(self) -> None: """Called when user stops speaking (VAD end).""" logger.info(f"[metrics] on_user_speech_end() called | current_turn={'exists' if self.current_turn else 'None'}") self._is_user_speaking = False self._user_speech_end_time = time.perf_counter() if self.current_turn and self.current_turn.user_speech_start_time: self.current_turn.user_speech_end_time = self._user_speech_end_time logger.info(f"[DEBUG_END] on_user_speech_end() | setting user_speech_end_time={self._user_speech_end_time}") self.current_turn.user_speech_duration = self._round_latency( self.current_turn.user_speech_end_time - self.current_turn.user_speech_start_time ) self._end_timeline_event("user_speech", self._user_speech_end_time) if not self.current_turn.vad_metrics: self.current_turn.vad_metrics.append(VadMetrics()) self.current_turn.vad_metrics[-1].user_speech_end_time = self._user_speech_end_time logger.info(f"user speech duration: {self.current_turn.user_speech_duration}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"user_speech_duration": self.current_turn.user_speech_duration} ) def set_vad_config(self, vad_config: Dict[str, Any]) -> None: """Set VAD configuration.""" if not self.current_turn.vad_metrics: self.current_turn.vad_metrics.append(VadMetrics()) vad = self.current_turn.vad_metrics[-1] vad.vad_threshold = vad_config.get("threshold") vad.vad_min_speech_duration = vad_config.get("min_speech_duration") vad.vad_min_silence_duration = vad_config.get("min_silence_duration") # ────────────────────────────────────────────── # STT metrics # ────────────────────────────────────────────── def on_stt_start(self) -> None: """Called when STT processing starts.""" self._stt_start_time = time.perf_counter() if self.current_turn: logger.info(f"[metrics] on_stt_start() called | current_turn={'exists' if self.current_turn else 'None'}") # Create new entry if none exists or last entry is already complete if not self.current_turn.stt_metrics or self.current_turn.stt_metrics[-1].stt_latency is not None: self.current_turn.stt_metrics.append(SttMetrics()) stt = self.current_turn.stt_metrics[-1] stt.stt_start_time = self._stt_start_time def on_stt_complete(self, transcript: str = "", duration: float = 0.0, confidence: float = 0.0) -> None: """Called when STT processing completes.""" if self.current_turn and self.current_turn.stt_metrics: stt = self.current_turn.stt_metrics[-1] if stt.stt_preemptive_generation_enabled and stt.stt_preemptive_generation_occurred: logger.info("STT preemptive generation occurred, skipping stt complete") return if not self.current_turn: return if not self.current_turn.stt_metrics: self.current_turn.stt_metrics.append(SttMetrics()) stt = self.current_turn.stt_metrics[-1] stt_end_time = time.perf_counter() stt.stt_end_time = stt_end_time if self._stt_start_time: stt_latency = self._round_latency(stt_end_time - self._stt_start_time) stt.stt_latency = stt_latency stt.stt_confidence = confidence stt.stt_duration = duration logger.info(f"stt latency: {stt_latency}ms | stt confidence: {confidence} | stt duration: {duration}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"stt_latency": stt_latency}) if transcript: stt.stt_transcript = transcript self._stt_start_time = None def on_stt_preflight_end(self, transcript: str = "") -> None: """Called when STT preflight event is received.""" if self.current_turn and self.current_turn.stt_metrics: stt = self.current_turn.stt_metrics[-1] if stt.stt_start_time: stt.stt_preflight_end_time = time.perf_counter() stt.stt_preflight_latency = self._round_latency( stt.stt_preflight_end_time - stt.stt_start_time ) stt.stt_preflight_transcript = transcript logger.info(f"stt preflight latency: {stt.stt_preflight_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"stt_preflight_latency": stt.stt_preflight_latency} ) def on_stt_interim_end(self) -> None: """Called when STT interim event is received.""" if self.current_turn and self.current_turn.stt_metrics: stt = self.current_turn.stt_metrics[-1] if stt.stt_start_time and not stt.stt_interim_latency: stt.stt_interim_end_time = time.perf_counter() stt.stt_interim_latency = self._round_latency( stt.stt_interim_end_time - stt.stt_start_time ) logger.info(f"stt interim latency: {stt.stt_interim_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"stt_interim_latency": stt.stt_interim_latency} ) def set_stt_usage(self, input_tokens: int = 0, output_tokens: int = 0, total_tokens: int = 0) -> None: """Set STT token usage.""" if self.current_turn: if not self.current_turn.stt_metrics: self.current_turn.stt_metrics.append(SttMetrics()) stt = self.current_turn.stt_metrics[-1] stt.stt_input_tokens = input_tokens stt.stt_output_tokens = output_tokens stt.stt_total_tokens = total_tokens def set_preemptive_generation_enabled(self, enabled: bool) -> None: """Mark preemptive generation as enabled for current STT.""" self.preemtive_generation_enabled = enabled # ────────────────────────────────────────────── # EOU metrics # ────────────────────────────────────────────── def on_eou_start(self) -> None: """Called when EOU processing starts.""" if not self.current_turn: self.start_turn() self._eou_start_time = time.perf_counter() if self.current_turn: # Create new entry if none exists or last entry is already complete if not self.current_turn.eou_metrics or self.current_turn.eou_metrics[-1].eou_latency is not None: self.current_turn.eou_metrics.append(EouMetrics()) self.current_turn.eou_metrics[-1].eou_start_time = self._eou_start_time def on_eou_complete(self) -> None: """Called when EOU processing completes.""" if self._eou_start_time: eou_end_time = time.perf_counter() eou_latency = self._round_latency(eou_end_time - self._eou_start_time) if self.current_turn: if not self.current_turn.eou_metrics: self.current_turn.eou_metrics.append(EouMetrics()) eou = self.current_turn.eou_metrics[-1] eou.eou_end_time = eou_end_time eou.eou_latency = eou_latency logger.info(f"eou latency: {eou_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"eou_latency": eou_latency}) self._eou_start_time = None def on_wait_for_additional_speech(self, duration: float, eou_probability: float): if self.current_turn: if not self.current_turn.eou_metrics: self.current_turn.eou_metrics.append(EouMetrics()) eou = self.current_turn.eou_metrics[-1] eou.wait_for_additional_speech_duration = duration eou.eou_probability = eou_probability eou.waited_for_additional_speech = True logger.info(f"wait for additional speech duration: {duration}ms") logger.info(f"wait for additional speech eou probability: {eou_probability}") # ────────────────────────────────────────────── # LLM metrics # ────────────────────────────────────────────── def on_llm_start(self) -> None: """Called when LLM processing starts.""" self._llm_start_time = time.perf_counter() if self.current_turn: if not self.current_turn.llm_metrics: self.current_turn.llm_metrics.append(LlmMetrics()) self.current_turn.llm_metrics[-1].llm_start_time = self._llm_start_time def on_llm_first_token(self) -> None: """Called when first LLM token is received.""" if self.current_turn and self.current_turn.llm_metrics: llm = self.current_turn.llm_metrics[-1] if llm.llm_start_time: llm.llm_first_token_time = time.perf_counter() llm.llm_ttft = self._round_latency(llm.llm_first_token_time - llm.llm_start_time) logger.info(f"llm ttft: {llm.llm_ttft}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"llm_ttft": llm.llm_ttft}) def on_llm_complete(self) -> None: """Called when LLM processing completes.""" if self._llm_start_time: llm_end_time = time.perf_counter() llm_duration = self._round_latency(llm_end_time - self._llm_start_time) if self.current_turn and self.current_turn.llm_metrics: llm = self.current_turn.llm_metrics[-1] llm.llm_end_time = llm_end_time llm.llm_duration = llm_duration logger.info(f"llm duration: {llm_duration}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"llm_duration": llm_duration}) self._llm_start_time = None def set_llm_input(self, text: str) -> None: """Record the actual text sent to LLM.""" if self.current_turn: if not self.current_turn.llm_metrics: self.current_turn.llm_metrics.append(LlmMetrics()) self.current_turn.llm_metrics[-1].llm_input = text def set_llm_usage(self, usage: Dict[str, Any]) -> None: """Set LLM token usage and calculate TPS.""" if not self.current_turn or not usage: return if not self.current_turn.llm_metrics: self.current_turn.llm_metrics.append(LlmMetrics()) llm = self.current_turn.llm_metrics[-1] llm.prompt_tokens = usage.get("prompt_tokens") llm.completion_tokens = usage.get("completion_tokens") llm.total_tokens = usage.get("total_tokens") llm.prompt_cached_tokens = usage.get("prompt_cached_tokens") if llm.llm_duration and llm.llm_duration > 0 and llm.completion_tokens and llm.completion_tokens > 0: latency_seconds = llm.llm_duration / 1000 llm.tokens_per_second = round(llm.completion_tokens / latency_seconds, 2) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={ "prompt_tokens": llm.prompt_tokens, "completion_tokens": llm.completion_tokens, "total_tokens": llm.total_tokens, "tokens_per_second": llm.tokens_per_second, }) # ────────────────────────────────────────────── # TTS metrics # ────────────────────────────────────────────── def on_tts_start(self) -> None: """Called when TTS processing starts.""" if not self.current_turn: self.start_turn() self._tts_start_time = time.perf_counter() self._tts_first_byte_time = None if self.current_turn: if not self.current_turn.tts_metrics: self.current_turn.tts_metrics.append(TtsMetrics()) self.current_turn.tts_metrics[-1].tts_start_time = self._tts_start_time def on_tts_first_byte(self) -> None: """Called when TTS produces first audio byte.""" if self._tts_start_time: now = time.perf_counter() if self.current_turn and self.current_turn.tts_metrics: tts = self.current_turn.tts_metrics[-1] tts.tts_end_time = now tts.ttfb = self._round_latency(now - self._tts_start_time) tts.tts_first_byte_time = now logger.info(f"tts ttfb: {tts.ttfb}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"ttfb": tts.ttfb}) self._tts_first_byte_time = now def on_agent_speech_start(self) -> None: """Called when agent starts speaking (actual audio output).""" if not self.current_turn: self.start_turn() self._is_agent_speaking = True self._agent_speech_start_time = time.perf_counter() if self.current_turn: self.current_turn.agent_speech_start_time = self._agent_speech_start_time if not any( ev.event_type == "agent_speech" and ev.end_time is None for ev in self.current_turn.timeline_event_metrics ): self._start_timeline_event("agent_speech", self._agent_speech_start_time) def on_agent_speech_end(self) -> None: """Called when agent stops speaking.""" logger.info(f"[metrics] on_agent_speech_end() called | current_turn={'exists' if self.current_turn else 'None'} | _tts_start={self._tts_start_time is not None}") self._is_agent_speaking = False agent_speech_end_time = time.perf_counter() if self.current_turn: self._end_timeline_event("agent_speech", agent_speech_end_time) self.current_turn.agent_speech_end_time = agent_speech_end_time # Calculate TTS latency from first_byte - start if self._tts_start_time and self._tts_first_byte_time: total_tts_latency = self._tts_first_byte_time - self._tts_start_time if self.current_turn and self.current_turn.agent_speech_start_time: if self.current_turn.tts_metrics: tts = self.current_turn.tts_metrics[-1] tts.tts_latency = self._round_latency(total_tts_latency) self.current_turn.agent_speech_duration = self._round_latency( agent_speech_end_time - self.current_turn.agent_speech_start_time ) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"tts_latency": self.current_turn.tts_metrics[-1].tts_latency if self.current_turn.tts_metrics else None} ) self.playground_manager.send_cascading_metrics( metrics={"agent_speech_duration": self.current_turn.agent_speech_duration} ) self._tts_start_time = None self._tts_first_byte_time = None elif self._tts_start_time: self._tts_start_time = None self._tts_first_byte_time = None def add_tts_characters(self, count: int) -> None: """Add to the total character count for the current turn.""" if self.current_turn: if not self.current_turn.tts_metrics: self.current_turn.tts_metrics.append(TtsMetrics()) tts = self.current_turn.tts_metrics[-1] tts.tts_characters = (tts.tts_characters or 0) + count if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"tts_characters": tts.tts_characters}) # ────────────────────────────────────────────── # Interruption metrics # ────────────────────────────────────────────── def on_interrupted(self) -> None: """Called when user interrupts the agent.""" if self._is_agent_speaking: self._total_interruptions += 1 if self.current_turn: self.current_turn.is_interrupted = True logger.info(f"User interrupted the agent. Total interruptions: {self._total_interruptions}") if self.playground and self.playground_manager and self.current_turn: self.playground_manager.send_cascading_metrics( metrics={"interrupted": self.current_turn.is_interrupted} ) def on_false_interrupt_start(self, pause_duration: Optional[float] = None) -> None: """Called when a false interrupt is detected.""" if self.current_turn: if not self.current_turn.interruption_metrics: self.current_turn.interruption_metrics = InterruptionMetrics() im = self.current_turn.interruption_metrics im.is_false_interrupt = True im.false_interrupt_start_time = time.perf_counter() if pause_duration is not None: im.false_interrupt_pause_duration = pause_duration def on_false_interrupt_resume(self) -> None: """Called when agent resumes after a false interrupt.""" if self.current_turn and self.current_turn.interruption_metrics: im = self.current_turn.interruption_metrics im.resumed_after_false_interrupt = True im.resume_on_false_interrupt = True im.false_interrupt_end_time = time.perf_counter() if im.false_interrupt_start_time: im.false_interrupt_duration = self._round_latency( im.false_interrupt_end_time - im.false_interrupt_start_time ) def on_false_interrupt_escalated(self, word_count: Optional[int] = None) -> None: """Called when a false interrupt escalates to a real interrupt.""" if self.current_turn and self.current_turn.interruption_metrics: im = self.current_turn.interruption_metrics im.is_false_interrupt = False if word_count is not None: im.false_interrupt_words = word_count # ────────────────────────────────────────────── # Tool metrics # ────────────────────────────────────────────── def add_function_tool_call( self, tool_name: str, tool_params: Optional[Union[Dict[str, Any], list]] = None, tool_response: Optional[Dict[str, Any]] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> None: """Track a function tool call in the current turn.""" if self.current_turn: self.current_turn.function_tools_called.append(tool_name) tool_metric = FunctionToolMetrics( tool_name=tool_name, tool_params=tool_params or {}, tool_response=tool_response or {}, start_time=start_time or time.perf_counter(), end_time=end_time, ) if tool_metric.start_time and tool_metric.end_time: tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time) self.current_turn.function_tool_metrics.append(tool_metric) # Also track in function_tool_timestamps for backward compat tool_timestamp = { "tool_name": tool_name, "timestamp": start_time or time.perf_counter(), "readable_time": time.strftime("%H:%M:%S", time.localtime()), } self.current_turn.function_tool_timestamps.append(tool_timestamp) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"function_tool_timestamps": self.current_turn.function_tool_timestamps} ) def add_mcp_tool_call( self, tool_type: str = "local", tool_url: Optional[str] = None, tool_params: Optional[Union[Dict[str, Any], list]] = None, tool_response: Optional[Dict[str, Any]] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> None: """Track an MCP tool call in the current turn.""" if self.current_turn: tool_metric = McpToolMetrics( type=tool_type, tool_url=tool_url, tool_params=tool_params or {}, tool_response=tool_response or {}, start_time=start_time or time.perf_counter(), end_time=end_time, ) if tool_metric.start_time and tool_metric.end_time: tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time) self.current_turn.mcp_tool_metrics.append(tool_metric) # ────────────────────────────────────────────── # Transcript / response # ────────────────────────────────────────────── def set_user_transcript(self, transcript: str) -> None: """Set the user transcript for the current turn.""" if self.current_turn: # Guard: skip if the turn already has user_speech AND the pipeline # is actively processing (agent speaking OR LLM/TTS already started). # This prevents the interrupting speech's transcript from overwriting # the original transcript of the current turn. pipeline_active = ( self._is_agent_speaking or bool(self.current_turn.llm_metrics) or bool(self.current_turn.tts_metrics) ) if pipeline_active and self.current_turn.user_speech: logger.info( f"[metrics] Skipping set_user_transcript — pipeline active " f"and turn already has user_speech, new transcript " f"belongs to the next turn: {transcript}" ) return self.current_turn.user_speech = transcript logger.info(f"user input speech: {transcript}") global_event_emitter.emit("USER_TRANSCRIPT_ADDED", {"text": transcript}) # Update timeline user_events = [ ev for ev in self.current_turn.timeline_event_metrics if ev.event_type == "user_speech" ] if user_events: user_events[-1].text = transcript else: current_time = time.perf_counter() self._start_timeline_event("user_speech", current_time) if self.current_turn.timeline_event_metrics: self.current_turn.timeline_event_metrics[-1].text = transcript if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"user_speech": self.current_turn.user_speech} ) def set_agent_response(self, response: str) -> None: """Set the agent response for the current turn.""" if not self.current_turn: self.start_turn() self.current_turn.agent_speech = response logger.info(f"agent output speech: {response}") global_event_emitter.emit("AGENT_TRANSCRIPT_ADDED", {"text": response}) if not any(ev.event_type == "agent_speech" for ev in self.current_turn.timeline_event_metrics): current_time = time.perf_counter() self._start_timeline_event("agent_speech", current_time) self._update_timeline_event_text("agent_speech", response) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"agent_speech": self.current_turn.agent_speech} ) # ────────────────────────────────────────────── # Knowledge Base # ────────────────────────────────────────────── def on_knowledge_base_start(self, kb_id: Optional[str] = None) -> None: """Called when knowledge base processing starts.""" if self.current_turn: kb_metric = KbMetrics( kb_id=kb_id, kb_start_time=time.perf_counter(), ) self.current_turn.kb_metrics.append(kb_metric) def on_knowledge_base_complete(self, documents: List[str], scores: List[float]) -> None: """Called when knowledge base processing completes.""" if self.current_turn and self.current_turn.kb_metrics: kb = self.current_turn.kb_metrics[-1] kb.kb_documents = documents kb.kb_scores = scores kb.kb_end_time = time.perf_counter() if kb.kb_start_time: kb.kb_retrieval_latency = self._round_latency(kb.kb_end_time - kb.kb_start_time) logger.info(f"kb retrieval latency: {kb.kb_retrieval_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={ "kb_id": kb.kb_id, "kb_retrieval_latency": kb.kb_retrieval_latency, "kb_documents": kb.kb_documents, "kb_scores": kb.kb_scores, } ) # ────────────────────────────────────────────── # A2A # ────────────────────────────────────────────── def set_a2a_handoff(self) -> None: """Set the A2A enabled and handoff occurred flags.""" if self.current_turn: self.current_turn.is_a2a_enabled = True self.current_turn.handoff_occurred = True if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"handoff_occurred": self.current_turn.handoff_occurred} ) # ────────────────────────────────────────────── # Error tracking # ────────────────────────────────────────────── def add_error(self, source: str, message: str) -> None: """Add an error to the current turn.""" if self.current_turn: now = time.perf_counter() self.current_turn.errors.append({ "source": source, "message": str(message), "timestamp": now, "timestamp_perf": now, }) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"errors": self.current_turn.errors} ) def on_fallback_event(self, event_data: Dict[str, Any]) -> None: """Record a fallback event for the current turn""" if not self.current_turn: self.start_turn() if self.current_turn: fallback_event = FallbackEvent( component_type=event_data.get("component_type", ""), temporary_disable_sec=event_data.get("temporary_disable_sec", 0), permanent_disable_after_attempts=event_data.get("permanent_disable_after_attempts", 0), recovery_attempt=event_data.get("recovery_attempt", 0), message=event_data.get("message", ""), start_time=event_data.get("start_time"), end_time=event_data.get("end_time"), duration_ms=event_data.get("duration_ms"), original_provider_label=event_data.get("original_provider_label"), original_connection_start=event_data.get("original_connection_start"), original_connection_end=event_data.get("original_connection_end"), original_connection_duration_ms=event_data.get("original_connection_duration_ms"), new_provider_label=event_data.get("new_provider_label"), new_connection_start=event_data.get("new_connection_start"), new_connection_end=event_data.get("new_connection_end"), new_connection_duration_ms=event_data.get("new_connection_duration_ms"), is_recovery=event_data.get("is_recovery", False), ) self.current_turn.fallback_events.append(fallback_event) logger.info(f"Fallback event recorded: {event_data.get('component_type')} - {event_data.get('message')}") # ────────────────────────────────────────────── # Background audio metrics # ────────────────────────────────────────────── def on_background_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None: """Called when background audio playback starts.""" now = time.perf_counter() self._background_audio_start_time = now file_name = os.path.basename(file_path) if file_path else None if self.current_turn: self.current_turn.background_audio_file_path = file_name self.current_turn.background_audio_looping = looping self._start_timeline_event("background_audio", now) if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_start_span( file_path=file_name, looping=looping, start_time=now ) logger.info(f"[metrics] background audio started | file={file_name} | looping={looping}") def on_background_audio_stop(self) -> None: """Called when background audio playback stops.""" now = time.perf_counter() self._background_audio_start_time = None if self.current_turn: self._end_timeline_event("background_audio", now) if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_stop_span(end_time=now) logger.info("[metrics] background audio stopped") # ────────────────────────────────────────────── # Thinking audio metrics # ────────────────────────────────────────────── def on_thinking_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None: """Called when thinking audio playback starts.""" now = time.perf_counter() self._thinking_audio_start_time = now file_name = os.path.basename(file_path) if file_path else None if self.current_turn: self.current_turn.thinking_audio_file_path = file_name self.current_turn.thinking_audio_looping = looping self._start_timeline_event("thinking_audio", now) logger.info(f"[metrics] thinking audio started | file={file_name} | looping={looping}") def on_thinking_audio_stop(self) -> None: """Called when thinking audio playback stops.""" now = time.perf_counter() self._thinking_audio_start_time = None if self.current_turn: self._end_timeline_event("thinking_audio", now) logger.info("[metrics] thinking audio stopped") # ────────────────────────────────────────────── # Realtime-specific metrics # ────────────────────────────────────────────── def set_realtime_usage(self, usage: Dict[str, Any]) -> None: """Set realtime model token usage from a flat dict (input_tokens, total_tokens, cached_*, thoughts_tokens, etc.).""" if not self.current_turn or not usage: return if not self.current_turn.realtime_metrics: self.current_turn.realtime_metrics.append(RealtimeMetrics()) rt = self.current_turn.realtime_metrics[-1] rt.realtime_input_tokens = usage.get("input_tokens") rt.realtime_output_tokens = usage.get("output_tokens") rt.realtime_total_tokens = usage.get("total_tokens") rt.realtime_input_text_tokens = usage.get("input_text_tokens") rt.realtime_input_audio_tokens = usage.get("input_audio_tokens") rt.realtime_input_image_tokens = usage.get("input_image_tokens") rt.realtime_input_cached_tokens = usage.get("input_cached_tokens") rt.realtime_thoughts_tokens = usage.get("thoughts_tokens") rt.realtime_cached_text_tokens = usage.get("cached_text_tokens") rt.realtime_cached_audio_tokens = usage.get("cached_audio_tokens") rt.realtime_cached_image_tokens = usage.get("cached_image_tokens") rt.realtime_output_text_tokens = usage.get("output_text_tokens") rt.realtime_output_audio_tokens = usage.get("output_audio_tokens") rt.realtime_output_image_tokens = usage.get("output_image_tokens") def set_realtime_model_error(self, error: Dict[str, Any]) -> None: """Track a realtime model error.""" if self.current_turn: logger.error(f"realtime model error: {error}") now = time.perf_counter() self.current_turn.errors.append({ "source": "REALTIME_MODEL", "message": str(error.get("message", "Unknown error")), "timestamp": now, "timestamp_perf": now, }) # ────────────────────────────────────────────── # Timeline helpers # ────────────────────────────────────────────── def _start_timeline_event(self, event_type: str, start_time: float) -> None: """Start a timeline event.""" if self.current_turn: event = TimelineEvent(event_type=event_type, start_time=start_time) self.current_turn.timeline_event_metrics.append(event) def _end_timeline_event(self, event_type: str, end_time: float) -> None: """End a timeline event and calculate duration.""" if self.current_turn: for event in reversed(self.current_turn.timeline_event_metrics): if event.event_type == event_type: # Allow extending user_speech events which may rapidly start/stop due to VAD if event.end_time is None or event_type == "user_speech": event.end_time = end_time event.duration_ms = self._round_latency(end_time - event.start_time) break def _update_timeline_event_text(self, event_type: str, text: str) -> None: """Update the most recent timeline event of this type with text content.""" if self.current_turn: for event in reversed(self.current_turn.timeline_event_metrics): if event.event_type == event_type: event.text = text break # ────────────────────────────────────────────── # Validation & Serialization # ────────────────────────────────────────────── def _validate_turn(self, turn: TurnMetrics) -> bool: """Check that the turn has at least one meaningful latency metric.""" has_stt = bool(turn.stt_metrics and any(s.stt_latency is not None for s in turn.stt_metrics)) has_llm = bool(turn.llm_metrics and any(l.llm_ttft is not None for l in turn.llm_metrics)) has_tts = bool(turn.tts_metrics and any(t.tts_latency is not None or t.ttfb is not None for t in turn.tts_metrics)) has_eou = bool(turn.eou_metrics and any(e.eou_latency is not None for e in turn.eou_metrics)) has_realtime = bool(turn.realtime_metrics) has_e2e = turn.e2e_latency is not None return any([has_stt, has_llm, has_tts, has_eou, has_realtime, has_e2e]) def _serialize_turn(self, turn: TurnMetrics) -> Dict[str, Any]: """Serialize a TurnMetrics to a flat dict for analytics API.""" data: Dict[str, Any] = {} # Top-level turn fields data["timestamp"] = turn.timestamp data["e2e_latency"] = turn.e2e_latency data["interrupted"] = turn.is_interrupted data["user_speech_start_time"] = turn.user_speech_start_time data["user_speech_end_time"] = turn.user_speech_end_time data["user_speech_duration"] = turn.user_speech_duration data["user_speech"] = turn.user_speech data["agent_speech_start_time"] = turn.agent_speech_start_time data["agent_speech_end_time"] = turn.agent_speech_end_time data["agent_speech_duration"] = turn.agent_speech_duration data["agent_speech"] = turn.agent_speech data["function_tools_called"] = turn.function_tools_called data["function_tool_timestamps"] = turn.function_tool_timestamps data["is_a2a_enabled"] = turn.is_a2a_enabled data["handoff_occurred"] = turn.handoff_occurred data["errors"] = turn.errors # Flatten the last STT metrics entry if turn.stt_metrics: stt = turn.stt_metrics[-1] data["stt_latency"] = stt.stt_latency data["stt_start_time"] = stt.stt_start_time data["stt_end_time"] = stt.stt_end_time data["stt_preflight_latency"] = stt.stt_preflight_latency data["stt_interim_latency"] = stt.stt_interim_latency data["stt_confidence"] = stt.stt_confidence data["stt_duration"] = stt.stt_duration # Flatten the last EOU metrics entry if turn.eou_metrics: eou = turn.eou_metrics[-1] data["eou_latency"] = eou.eou_latency data["eou_start_time"] = eou.eou_start_time data["eou_end_time"] = eou.eou_end_time # Flatten the last LLM metrics entry if turn.llm_metrics: llm = turn.llm_metrics[-1] data["llm_ttft"] = llm.llm_ttft data["llm_duration"] = llm.llm_duration data["llm_start_time"] = llm.llm_start_time data["llm_end_time"] = llm.llm_end_time data["prompt_tokens"] = llm.prompt_tokens data["completion_tokens"] = llm.completion_tokens data["total_tokens"] = llm.total_tokens data["prompt_cached_tokens"] = llm.prompt_cached_tokens data["tokens_per_second"] = llm.tokens_per_second # Flatten the last KB metrics entry if turn.kb_metrics: kb = turn.kb_metrics[-1] data["kb_id"] = kb.kb_id data["kb_documents"] = kb.kb_documents data["kb_scores"] = kb.kb_scores data["kb_retrieval_latency"] = kb.kb_retrieval_latency # Flatten the last TTS metrics entry if turn.tts_metrics: tts = turn.tts_metrics[-1] data["tts_latency"] = tts.tts_latency data["ttfb"] = tts.ttfb data["tts_start_time"] = tts.tts_start_time data["tts_end_time"] = tts.tts_end_time data["tts_duration"] = tts.tts_duration data["tts_characters"] = tts.tts_characters # Flatten the last realtime (full s2s) metrics entry for analytics if turn.realtime_metrics: rt = turn.realtime_metrics[-1] data["ttfb"] = rt.realtime_ttfb data["realtime_input_tokens"] = rt.realtime_input_tokens data["realtime_total_tokens"] = rt.realtime_total_tokens data["realtime_output_tokens"] = rt.realtime_output_tokens data["realtime_input_text_tokens"] = rt.realtime_input_text_tokens data["realtime_input_audio_tokens"] = rt.realtime_input_audio_tokens data["realtime_input_image_tokens"] = rt.realtime_input_image_tokens data["realtime_input_cached_tokens"] = rt.realtime_input_cached_tokens data["realtime_thoughts_tokens"] = rt.realtime_thoughts_tokens data["realtime_cached_text_tokens"] = rt.realtime_cached_text_tokens data["realtime_cached_audio_tokens"] = rt.realtime_cached_audio_tokens data["realtime_cached_image_tokens"] = rt.realtime_cached_image_tokens data["realtime_output_text_tokens"] = rt.realtime_output_text_tokens data["realtime_output_audio_tokens"] = rt.realtime_output_audio_tokens data["realtime_output_image_tokens"] = rt.realtime_output_image_tokens # Provider info (from session). Use realtime_* keys for known S2S models. providers = self.session.provider_per_component if "realtime" in providers: data["realtime_provider_class"] = providers["realtime"]["provider_class"] data["realtime_model_name"] = providers["realtime"]["model_name"] elif "llm" in providers: pc = providers["llm"]["provider_class"] if pc in REALTIME_PROVIDER_CLASS_NAMES: data["realtime_provider_class"] = pc data["realtime_model_name"] = providers["llm"]["model_name"] else: data["llm_provider_class"] = pc data["llm_model_name"] = providers["llm"]["model_name"] if "stt" in providers: data["stt_provider_class"] = providers["stt"]["provider_class"] data["stt_model_name"] = providers["stt"]["model_name"] if "tts" in providers: data["tts_provider_class"] = providers["tts"]["provider_class"] data["tts_model_name"] = providers["tts"]["model_name"] if "vad" in providers: data["vad_provider_class"] = providers["vad"]["provider_class"] data["vad_model_name"] = providers["vad"]["model_name"] if "eou" in providers: data["eou_provider_class"] = providers["eou"]["provider_class"] data["eou_model_name"] = providers["eou"]["model_name"] # System instructions (first turn only) if self._total_turns == 1 or len(self.turns) == 0: data["system_instructions"] = self.session.system_instruction else: data["system_instructions"] = "" # Timeline data["timeline"] = [asdict(ev) for ev in turn.timeline_event_metrics] return data def _transform_to_camel_case(self, data: Dict[str, Any]) -> Dict[str, Any]: """Transform snake_case field names to camelCase for analytics.""" field_mapping = { # User and agent metrics "user_speech_start_time": "userSpeechStartTime", "user_speech_end_time": "userSpeechEndTime", "user_speech_duration": "userSpeechDuration", "agent_speech_start_time": "agentSpeechStartTime", "agent_speech_end_time": "agentSpeechEndTime", "agent_speech_duration": "agentSpeechDuration", # STT metrics "stt_latency": "sttLatency", "stt_start_time": "sttStartTime", "stt_end_time": "sttEndTime", "stt_preflight_latency": "sttPreflightLatency", "stt_interim_latency": "sttInterimLatency", "stt_confidence": "sttConfidence", "stt_duration": "sttDuration", # LLM metrics "llm_duration": "llmDuration", "llm_start_time": "llmStartTime", "llm_end_time": "llmEndTime", "llm_ttft": "ttft", "prompt_tokens": "promptTokens", "completion_tokens": "completionTokens", "total_tokens": "totalTokens", "prompt_cached_tokens": "promptCachedTokens", "tokens_per_second": "tokensPerSecond", # TTS metrics "tts_start_time": "ttsStartTime", "tts_end_time": "ttsEndTime", "tts_duration": "ttsDuration", "tts_characters": "ttsCharacters", "ttfb": "ttfb", "tts_latency": "ttsLatency", # EOU metrics "eou_latency": "eouLatency", "eou_start_time": "eouStartTime", "eou_end_time": "eouEndTime", # Realtime (full s2s) metrics "realtime_ttfb": "realtimeTtfb", "realtime_input_tokens": "realtimeInputTokens", "realtime_total_tokens": "realtimeTotalTokens", "realtime_output_tokens": "realtimeOutputTokens", "realtime_input_text_tokens": "realtimeInputTextTokens", "realtime_input_audio_tokens": "realtimeInputAudioTokens", "realtime_input_image_tokens": "realtimeInputImageTokens", "realtime_input_cached_tokens": "realtimeInputCachedTokens", "realtime_thoughts_tokens": "realtimeThoughtsTokens", "realtime_cached_text_tokens": "realtimeCachedTextTokens", "realtime_cached_audio_tokens": "realtimeCachedAudioTokens", "realtime_cached_image_tokens": "realtimeCachedImageTokens", "realtime_output_text_tokens": "realtimeOutputTextTokens", "realtime_output_audio_tokens": "realtimeOutputAudioTokens", "realtime_output_image_tokens": "realtimeOutputImageTokens", "kb_id": "kbId", "kb_retrieval_latency": "kbRetrievalLatency", "kb_documents": "kbDocuments", "kb_scores": "kbScores", # Provider metrics "llm_provider_class": "llmProviderClass", "llm_model_name": "llmModelName", "realtime_provider_class": "realtimeProviderClass", "realtime_model_name": "realtimeModelName", "stt_provider_class": "sttProviderClass", "stt_model_name": "sttModelName", "tts_provider_class": "ttsProviderClass", "tts_model_name": "ttsModelName", # VAD metrics "vad_provider_class": "vadProviderClass", "vad_model_name": "vadModelName", # EOU metrics "eou_provider_class": "eouProviderClass", "eou_model_name": "eouModelName", # Other metrics "e2e_latency": "e2eLatency", "interrupted": "interrupted", "timestamp": "timestamp", "function_tools_called": "functionToolsCalled", "function_tool_timestamps": "functionToolTimestamps", "system_instructions": "systemInstructions", "handoff_occurred": "handOffOccurred", "is_a2a_enabled": "isA2aEnabled", "errors": "errors", } timeline_field_mapping = { "event_type": "eventType", "start_time": "startTime", "end_time": "endTime", "duration_ms": "durationInMs", } transformed: Dict[str, Any] = {} for key, value in data.items(): camel_key = field_mapping.get(key, key) if key == "timeline" and isinstance(value, list): transformed_timeline = [] for event in value: transformed_event = {} for ek, ev in event.items(): transformed_event[timeline_field_mapping.get(ek, ek)] = ev transformed_timeline.append(transformed_event) transformed[camel_key] = transformed_timeline else: transformed[camel_key] = value return transformed def _remove_internal_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: """Remove internal-only fields from the analytics payload.""" always_remove = [ "errors", "functionToolTimestamps", "sttStartTime", "sttEndTime", "ttsStartTime", "ttsEndTime", "llmStartTime", "llmEndTime", "eouStartTime", "eouEndTime", "isA2aEnabled", "interactionId", "timestamp", ] if self.current_turn and not self.current_turn.is_a2a_enabled: always_remove.append("handOffOccurred") for f in always_remove: data.pop(f, None) return data def _remove_provider_fields(self, data: Dict[str, Any]) -> None: """Remove provider fields after first turn.""" provider_fields = [ "systemInstructions", # "llmProviderClass", "llmModelName", # "sttProviderClass", "sttModelName", # "ttsProviderClass", "ttsModelName", "vadProviderClass", "vadModelName", "eouProviderClass", "eouModelName", ] for f in provider_fields: data.pop(f, None) def _remove_negatives(self, obj: Any) -> Any: """Recursively clamp any numeric value < 0 to 0.""" if isinstance(obj, dict): for k, v in obj.items(): if isinstance(v, (int, float)) and v < 0: obj[k] = 0 elif isinstance(v, (dict, list)): obj[k] = self._remove_negatives(v) return obj if isinstance(obj, list): for i, v in enumerate(obj): if isinstance(v, (int, float)) and v < 0: obj[i] = 0 elif isinstance(v, (dict, list)): obj[i] = self._remove_negatives(v) return obj return objSingle metrics collector for all pipeline modes.
Replaces both CascadingMetricsCollector and RealtimeMetricsCollector with one collector that uses the component-wise metrics schema.
Methods
def add_error(self, source: str, message: str) ‑> None-
Expand source code
def add_error(self, source: str, message: str) -> None: """Add an error to the current turn.""" if self.current_turn: now = time.perf_counter() self.current_turn.errors.append({ "source": source, "message": str(message), "timestamp": now, "timestamp_perf": now, }) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"errors": self.current_turn.errors} )Add an error to the current turn.
def add_function_tool_call(self,
tool_name: str,
tool_params: Optional[Union[Dict[str, Any], list]] = None,
tool_response: Optional[Dict[str, Any]] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None) ‑> None-
Expand source code
def add_function_tool_call( self, tool_name: str, tool_params: Optional[Union[Dict[str, Any], list]] = None, tool_response: Optional[Dict[str, Any]] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> None: """Track a function tool call in the current turn.""" if self.current_turn: self.current_turn.function_tools_called.append(tool_name) tool_metric = FunctionToolMetrics( tool_name=tool_name, tool_params=tool_params or {}, tool_response=tool_response or {}, start_time=start_time or time.perf_counter(), end_time=end_time, ) if tool_metric.start_time and tool_metric.end_time: tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time) self.current_turn.function_tool_metrics.append(tool_metric) # Also track in function_tool_timestamps for backward compat tool_timestamp = { "tool_name": tool_name, "timestamp": start_time or time.perf_counter(), "readable_time": time.strftime("%H:%M:%S", time.localtime()), } self.current_turn.function_tool_timestamps.append(tool_timestamp) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"function_tool_timestamps": self.current_turn.function_tool_timestamps} )Track a function tool call in the current turn.
def add_mcp_tool_call(self,
tool_type: str = 'local',
tool_url: Optional[str] = None,
tool_params: Optional[Union[Dict[str, Any], list]] = None,
tool_response: Optional[Dict[str, Any]] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None) ‑> None-
Expand source code
def add_mcp_tool_call( self, tool_type: str = "local", tool_url: Optional[str] = None, tool_params: Optional[Union[Dict[str, Any], list]] = None, tool_response: Optional[Dict[str, Any]] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> None: """Track an MCP tool call in the current turn.""" if self.current_turn: tool_metric = McpToolMetrics( type=tool_type, tool_url=tool_url, tool_params=tool_params or {}, tool_response=tool_response or {}, start_time=start_time or time.perf_counter(), end_time=end_time, ) if tool_metric.start_time and tool_metric.end_time: tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time) self.current_turn.mcp_tool_metrics.append(tool_metric)Track an MCP tool call in the current turn.
def add_participant_metrics(self,
participant_id: str,
kind: Optional[str] = None,
sip_user: Optional[bool] = None,
join_time: Optional[float] = None,
meta: Optional[Dict[str, Any]] = None) ‑> None-
Expand source code
def add_participant_metrics( self, participant_id: str, kind: Optional[str] = None, sip_user: Optional[bool] = None, join_time: Optional[float] = None, meta: Optional[Dict[str, Any]] = None, ) -> None: """Append a participant entry (agent or user) into session.participant_metrics.""" self.session.participant_metrics.append( ParticipantMetrics( participant_id=participant_id, kind=kind, sip_user=sip_user, join_time=join_time, meta=meta, ) ) self.traces_flow_manager.participant_metrics.append( ParticipantMetrics( participant_id=participant_id, kind=kind, sip_user=sip_user, join_time=join_time, ) )Append a participant entry (agent or user) into session.participant_metrics.
def add_tts_characters(self, count: int) ‑> None-
Expand source code
def add_tts_characters(self, count: int) -> None: """Add to the total character count for the current turn.""" if self.current_turn: if not self.current_turn.tts_metrics: self.current_turn.tts_metrics.append(TtsMetrics()) tts = self.current_turn.tts_metrics[-1] tts.tts_characters = (tts.tts_characters or 0) + count if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"tts_characters": tts.tts_characters})Add to the total character count for the current turn.
def complete_turn(self) ‑> None-
Expand source code
def complete_turn(self) -> None: """Complete the current turn, calculate E2E, validate, serialize and send.""" logger.info(f"[metrics] complete_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}") if not self.current_turn: logger.info(f"[metrics] complete_turn() early return — no current_turn") return self.current_turn.session_metrics = self.session # Calculate E2E latency self.current_turn.compute_e2e_latency() # Validate turn has meaningful data has_stt = bool(self.current_turn.stt_metrics and any(s.stt_latency is not None for s in self.current_turn.stt_metrics)) has_llm = bool(self.current_turn.llm_metrics and any(l.llm_ttft is not None for l in self.current_turn.llm_metrics)) has_tts = bool(self.current_turn.tts_metrics and any(t.tts_latency is not None or t.ttfb is not None for t in self.current_turn.tts_metrics)) logger.info(f"[metrics] complete_turn() validation | stt={has_stt} llm={has_llm} tts={has_tts} | user_speech_start={self.current_turn.user_speech_start_time is not None} | e2e={self.current_turn.e2e_latency}") if not self._validate_turn(self.current_turn) and self._total_turns > 1: logger.warning(f"[metrics] complete_turn() DISCARDING turn — validation failed | total_turns={self._total_turns}") # Cache user start time for next turn if self.current_turn.user_speech_start_time is not None: if (self._pending_user_start_time is None or self.current_turn.user_speech_start_time < self._pending_user_start_time): self._pending_user_start_time = self.current_turn.user_speech_start_time logger.info(f"[metrics] Caching earliest user start: {self._pending_user_start_time}") self.current_turn = None return # If this turn was interrupted, buffer the interrupting user's metrics # (STT, EOU, VAD, timeline) for carry-over to the next turn. if self.current_turn.is_interrupted: # Buffer STT metrics from interrupting speech if len(self.current_turn.stt_metrics) >= 2: self._pending_interrupt_stt = self.current_turn.stt_metrics.pop() logger.info(f"[metrics] complete_turn() buffering interrupt STT for next turn: {self._pending_interrupt_stt.stt_transcript}") # Buffer EOU metrics from interrupting speech if len(self.current_turn.eou_metrics) >= 2: self._pending_interrupt_eou = self.current_turn.eou_metrics.pop() logger.info(f"[metrics] complete_turn() buffering interrupt EOU for next turn: {self._pending_interrupt_eou.eou_latency}") # Buffer VAD metrics from interrupting speech and construct timeline from it if len(self.current_turn.vad_metrics) >= 2: self._pending_interrupt_vad = self.current_turn.vad_metrics.pop() # Build the pending timeline from the buffered VAD data self._pending_interrupt_timeline = { "start_time": self._pending_interrupt_vad.user_speech_start_time, "end_time": self._pending_interrupt_vad.user_speech_end_time, "duration_ms": ( round((self._pending_interrupt_vad.user_speech_end_time - self._pending_interrupt_vad.user_speech_start_time) * 1000, 4) if self._pending_interrupt_vad.user_speech_start_time and self._pending_interrupt_vad.user_speech_end_time else None ), "text": None, # Will be set by set_user_transcript in the next turn } logger.info(f"[metrics] complete_turn() buffering interrupt VAD + timeline for next turn") # Send trace if self.traces_flow_manager: self.traces_flow_manager.create_unified_turn_trace(self.current_turn, self.session) self.turns.append(self.current_turn) # Send to playground if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics=self.current_turn, full_turn_data=True) # Serialize and send analytics interaction_data = self._serialize_turn(self.current_turn) transformed_data = self._transform_to_camel_case(interaction_data) transformed_data = self._remove_internal_fields(transformed_data) if len(self.turns) > 1: self._remove_provider_fields(transformed_data) transformed_data = self._remove_negatives(transformed_data) global_event_emitter.emit("TURN_METRICS_ADDED", {"metrics": transformed_data}) self.analytics_client.send_interaction_analytics_safe(transformed_data) self.current_turn = None self._pending_user_start_time = None # Reset transient timing state so stale values don't leak into the next turn self._stt_start_time = None self._is_user_speaking = FalseComplete the current turn, calculate E2E, validate, serialize and send.
def configure_pipeline(self,
pipeline_mode: Any,
realtime_mode: Any = None,
active_components: Any = None) ‑> None-
Expand source code
def configure_pipeline( self, pipeline_mode: Any, realtime_mode: Any = None, active_components: Any = None, ) -> None: """Configure the collector with pipeline information. Args: pipeline_mode: PipelineMode enum value realtime_mode: RealtimeMode enum value (optional) active_components: frozenset of PipelineComponent (optional) """ self.pipeline_mode = pipeline_mode.value if hasattr(pipeline_mode, "value") else str(pipeline_mode) self.realtime_mode = realtime_mode.value if realtime_mode and hasattr(realtime_mode, "value") else (str(realtime_mode) if realtime_mode else None) self.active_components = active_components self.session.pipeline_mode = self.pipeline_mode self.session.realtime_mode = self.realtime_mode if active_components: self.session.components = sorted( c.value if hasattr(c, "value") else str(c) for c in active_components ) logger.info(f"[metrics] Pipeline configured: mode={self.pipeline_mode}, realtime={self.realtime_mode}")Configure the collector with pipeline information.
Args
pipeline_mode- PipelineMode enum value
realtime_mode- RealtimeMode enum value (optional)
active_components- frozenset of PipelineComponent (optional)
def finalize_session(self) ‑> None-
Expand source code
def finalize_session(self) -> None: """Finalize the session, completing any in-progress turn.""" if self.current_turn: self.complete_turn() self.session.session_end_time = time.time()Finalize the session, completing any in-progress turn.
def on_agent_speech_end(self) ‑> None-
Expand source code
def on_agent_speech_end(self) -> None: """Called when agent stops speaking.""" logger.info(f"[metrics] on_agent_speech_end() called | current_turn={'exists' if self.current_turn else 'None'} | _tts_start={self._tts_start_time is not None}") self._is_agent_speaking = False agent_speech_end_time = time.perf_counter() if self.current_turn: self._end_timeline_event("agent_speech", agent_speech_end_time) self.current_turn.agent_speech_end_time = agent_speech_end_time # Calculate TTS latency from first_byte - start if self._tts_start_time and self._tts_first_byte_time: total_tts_latency = self._tts_first_byte_time - self._tts_start_time if self.current_turn and self.current_turn.agent_speech_start_time: if self.current_turn.tts_metrics: tts = self.current_turn.tts_metrics[-1] tts.tts_latency = self._round_latency(total_tts_latency) self.current_turn.agent_speech_duration = self._round_latency( agent_speech_end_time - self.current_turn.agent_speech_start_time ) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"tts_latency": self.current_turn.tts_metrics[-1].tts_latency if self.current_turn.tts_metrics else None} ) self.playground_manager.send_cascading_metrics( metrics={"agent_speech_duration": self.current_turn.agent_speech_duration} ) self._tts_start_time = None self._tts_first_byte_time = None elif self._tts_start_time: self._tts_start_time = None self._tts_first_byte_time = NoneCalled when agent stops speaking.
def on_agent_speech_start(self) ‑> None-
Expand source code
def on_agent_speech_start(self) -> None: """Called when agent starts speaking (actual audio output).""" if not self.current_turn: self.start_turn() self._is_agent_speaking = True self._agent_speech_start_time = time.perf_counter() if self.current_turn: self.current_turn.agent_speech_start_time = self._agent_speech_start_time if not any( ev.event_type == "agent_speech" and ev.end_time is None for ev in self.current_turn.timeline_event_metrics ): self._start_timeline_event("agent_speech", self._agent_speech_start_time)Called when agent starts speaking (actual audio output).
def on_background_audio_start(self, file_path: Optional[str] = None, looping: bool = False) ‑> None-
Expand source code
def on_background_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None: """Called when background audio playback starts.""" now = time.perf_counter() self._background_audio_start_time = now file_name = os.path.basename(file_path) if file_path else None if self.current_turn: self.current_turn.background_audio_file_path = file_name self.current_turn.background_audio_looping = looping self._start_timeline_event("background_audio", now) if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_start_span( file_path=file_name, looping=looping, start_time=now ) logger.info(f"[metrics] background audio started | file={file_name} | looping={looping}")Called when background audio playback starts.
def on_background_audio_stop(self) ‑> None-
Expand source code
def on_background_audio_stop(self) -> None: """Called when background audio playback stops.""" now = time.perf_counter() self._background_audio_start_time = None if self.current_turn: self._end_timeline_event("background_audio", now) if self.traces_flow_manager: self.traces_flow_manager.create_background_audio_stop_span(end_time=now) logger.info("[metrics] background audio stopped")Called when background audio playback stops.
def on_eou_complete(self) ‑> None-
Expand source code
def on_eou_complete(self) -> None: """Called when EOU processing completes.""" if self._eou_start_time: eou_end_time = time.perf_counter() eou_latency = self._round_latency(eou_end_time - self._eou_start_time) if self.current_turn: if not self.current_turn.eou_metrics: self.current_turn.eou_metrics.append(EouMetrics()) eou = self.current_turn.eou_metrics[-1] eou.eou_end_time = eou_end_time eou.eou_latency = eou_latency logger.info(f"eou latency: {eou_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"eou_latency": eou_latency}) self._eou_start_time = NoneCalled when EOU processing completes.
def on_eou_start(self) ‑> None-
Expand source code
def on_eou_start(self) -> None: """Called when EOU processing starts.""" if not self.current_turn: self.start_turn() self._eou_start_time = time.perf_counter() if self.current_turn: # Create new entry if none exists or last entry is already complete if not self.current_turn.eou_metrics or self.current_turn.eou_metrics[-1].eou_latency is not None: self.current_turn.eou_metrics.append(EouMetrics()) self.current_turn.eou_metrics[-1].eou_start_time = self._eou_start_timeCalled when EOU processing starts.
def on_fallback_event(self, event_data: Dict[str, Any]) ‑> None-
Expand source code
def on_fallback_event(self, event_data: Dict[str, Any]) -> None: """Record a fallback event for the current turn""" if not self.current_turn: self.start_turn() if self.current_turn: fallback_event = FallbackEvent( component_type=event_data.get("component_type", ""), temporary_disable_sec=event_data.get("temporary_disable_sec", 0), permanent_disable_after_attempts=event_data.get("permanent_disable_after_attempts", 0), recovery_attempt=event_data.get("recovery_attempt", 0), message=event_data.get("message", ""), start_time=event_data.get("start_time"), end_time=event_data.get("end_time"), duration_ms=event_data.get("duration_ms"), original_provider_label=event_data.get("original_provider_label"), original_connection_start=event_data.get("original_connection_start"), original_connection_end=event_data.get("original_connection_end"), original_connection_duration_ms=event_data.get("original_connection_duration_ms"), new_provider_label=event_data.get("new_provider_label"), new_connection_start=event_data.get("new_connection_start"), new_connection_end=event_data.get("new_connection_end"), new_connection_duration_ms=event_data.get("new_connection_duration_ms"), is_recovery=event_data.get("is_recovery", False), ) self.current_turn.fallback_events.append(fallback_event) logger.info(f"Fallback event recorded: {event_data.get('component_type')} - {event_data.get('message')}")Record a fallback event for the current turn
def on_false_interrupt_escalated(self, word_count: Optional[int] = None) ‑> None-
Expand source code
def on_false_interrupt_escalated(self, word_count: Optional[int] = None) -> None: """Called when a false interrupt escalates to a real interrupt.""" if self.current_turn and self.current_turn.interruption_metrics: im = self.current_turn.interruption_metrics im.is_false_interrupt = False if word_count is not None: im.false_interrupt_words = word_countCalled when a false interrupt escalates to a real interrupt.
def on_false_interrupt_resume(self) ‑> None-
Expand source code
def on_false_interrupt_resume(self) -> None: """Called when agent resumes after a false interrupt.""" if self.current_turn and self.current_turn.interruption_metrics: im = self.current_turn.interruption_metrics im.resumed_after_false_interrupt = True im.resume_on_false_interrupt = True im.false_interrupt_end_time = time.perf_counter() if im.false_interrupt_start_time: im.false_interrupt_duration = self._round_latency( im.false_interrupt_end_time - im.false_interrupt_start_time )Called when agent resumes after a false interrupt.
def on_false_interrupt_start(self, pause_duration: Optional[float] = None) ‑> None-
Expand source code
def on_false_interrupt_start(self, pause_duration: Optional[float] = None) -> None: """Called when a false interrupt is detected.""" if self.current_turn: if not self.current_turn.interruption_metrics: self.current_turn.interruption_metrics = InterruptionMetrics() im = self.current_turn.interruption_metrics im.is_false_interrupt = True im.false_interrupt_start_time = time.perf_counter() if pause_duration is not None: im.false_interrupt_pause_duration = pause_durationCalled when a false interrupt is detected.
def on_interrupted(self) ‑> None-
Expand source code
def on_interrupted(self) -> None: """Called when user interrupts the agent.""" if self._is_agent_speaking: self._total_interruptions += 1 if self.current_turn: self.current_turn.is_interrupted = True logger.info(f"User interrupted the agent. Total interruptions: {self._total_interruptions}") if self.playground and self.playground_manager and self.current_turn: self.playground_manager.send_cascading_metrics( metrics={"interrupted": self.current_turn.is_interrupted} )Called when user interrupts the agent.
def on_knowledge_base_complete(self, documents: List[str], scores: List[float]) ‑> None-
Expand source code
def on_knowledge_base_complete(self, documents: List[str], scores: List[float]) -> None: """Called when knowledge base processing completes.""" if self.current_turn and self.current_turn.kb_metrics: kb = self.current_turn.kb_metrics[-1] kb.kb_documents = documents kb.kb_scores = scores kb.kb_end_time = time.perf_counter() if kb.kb_start_time: kb.kb_retrieval_latency = self._round_latency(kb.kb_end_time - kb.kb_start_time) logger.info(f"kb retrieval latency: {kb.kb_retrieval_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={ "kb_id": kb.kb_id, "kb_retrieval_latency": kb.kb_retrieval_latency, "kb_documents": kb.kb_documents, "kb_scores": kb.kb_scores, } )Called when knowledge base processing completes.
def on_knowledge_base_start(self, kb_id: Optional[str] = None) ‑> None-
Expand source code
def on_knowledge_base_start(self, kb_id: Optional[str] = None) -> None: """Called when knowledge base processing starts.""" if self.current_turn: kb_metric = KbMetrics( kb_id=kb_id, kb_start_time=time.perf_counter(), ) self.current_turn.kb_metrics.append(kb_metric)Called when knowledge base processing starts.
def on_llm_complete(self) ‑> None-
Expand source code
def on_llm_complete(self) -> None: """Called when LLM processing completes.""" if self._llm_start_time: llm_end_time = time.perf_counter() llm_duration = self._round_latency(llm_end_time - self._llm_start_time) if self.current_turn and self.current_turn.llm_metrics: llm = self.current_turn.llm_metrics[-1] llm.llm_end_time = llm_end_time llm.llm_duration = llm_duration logger.info(f"llm duration: {llm_duration}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"llm_duration": llm_duration}) self._llm_start_time = NoneCalled when LLM processing completes.
def on_llm_first_token(self) ‑> None-
Expand source code
def on_llm_first_token(self) -> None: """Called when first LLM token is received.""" if self.current_turn and self.current_turn.llm_metrics: llm = self.current_turn.llm_metrics[-1] if llm.llm_start_time: llm.llm_first_token_time = time.perf_counter() llm.llm_ttft = self._round_latency(llm.llm_first_token_time - llm.llm_start_time) logger.info(f"llm ttft: {llm.llm_ttft}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"llm_ttft": llm.llm_ttft})Called when first LLM token is received.
def on_llm_start(self) ‑> None-
Expand source code
def on_llm_start(self) -> None: """Called when LLM processing starts.""" self._llm_start_time = time.perf_counter() if self.current_turn: if not self.current_turn.llm_metrics: self.current_turn.llm_metrics.append(LlmMetrics()) self.current_turn.llm_metrics[-1].llm_start_time = self._llm_start_timeCalled when LLM processing starts.
def on_stt_complete(self, transcript: str = '', duration: float = 0.0, confidence: float = 0.0) ‑> None-
Expand source code
def on_stt_complete(self, transcript: str = "", duration: float = 0.0, confidence: float = 0.0) -> None: """Called when STT processing completes.""" if self.current_turn and self.current_turn.stt_metrics: stt = self.current_turn.stt_metrics[-1] if stt.stt_preemptive_generation_enabled and stt.stt_preemptive_generation_occurred: logger.info("STT preemptive generation occurred, skipping stt complete") return if not self.current_turn: return if not self.current_turn.stt_metrics: self.current_turn.stt_metrics.append(SttMetrics()) stt = self.current_turn.stt_metrics[-1] stt_end_time = time.perf_counter() stt.stt_end_time = stt_end_time if self._stt_start_time: stt_latency = self._round_latency(stt_end_time - self._stt_start_time) stt.stt_latency = stt_latency stt.stt_confidence = confidence stt.stt_duration = duration logger.info(f"stt latency: {stt_latency}ms | stt confidence: {confidence} | stt duration: {duration}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"stt_latency": stt_latency}) if transcript: stt.stt_transcript = transcript self._stt_start_time = NoneCalled when STT processing completes.
def on_stt_interim_end(self) ‑> None-
Expand source code
def on_stt_interim_end(self) -> None: """Called when STT interim event is received.""" if self.current_turn and self.current_turn.stt_metrics: stt = self.current_turn.stt_metrics[-1] if stt.stt_start_time and not stt.stt_interim_latency: stt.stt_interim_end_time = time.perf_counter() stt.stt_interim_latency = self._round_latency( stt.stt_interim_end_time - stt.stt_start_time ) logger.info(f"stt interim latency: {stt.stt_interim_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"stt_interim_latency": stt.stt_interim_latency} )Called when STT interim event is received.
def on_stt_preflight_end(self, transcript: str = '') ‑> None-
Expand source code
def on_stt_preflight_end(self, transcript: str = "") -> None: """Called when STT preflight event is received.""" if self.current_turn and self.current_turn.stt_metrics: stt = self.current_turn.stt_metrics[-1] if stt.stt_start_time: stt.stt_preflight_end_time = time.perf_counter() stt.stt_preflight_latency = self._round_latency( stt.stt_preflight_end_time - stt.stt_start_time ) stt.stt_preflight_transcript = transcript logger.info(f"stt preflight latency: {stt.stt_preflight_latency}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"stt_preflight_latency": stt.stt_preflight_latency} )Called when STT preflight event is received.
def on_stt_start(self) ‑> None-
Expand source code
def on_stt_start(self) -> None: """Called when STT processing starts.""" self._stt_start_time = time.perf_counter() if self.current_turn: logger.info(f"[metrics] on_stt_start() called | current_turn={'exists' if self.current_turn else 'None'}") # Create new entry if none exists or last entry is already complete if not self.current_turn.stt_metrics or self.current_turn.stt_metrics[-1].stt_latency is not None: self.current_turn.stt_metrics.append(SttMetrics()) stt = self.current_turn.stt_metrics[-1] stt.stt_start_time = self._stt_start_timeCalled when STT processing starts.
def on_thinking_audio_start(self, file_path: Optional[str] = None, looping: bool = False) ‑> None-
Expand source code
def on_thinking_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None: """Called when thinking audio playback starts.""" now = time.perf_counter() self._thinking_audio_start_time = now file_name = os.path.basename(file_path) if file_path else None if self.current_turn: self.current_turn.thinking_audio_file_path = file_name self.current_turn.thinking_audio_looping = looping self._start_timeline_event("thinking_audio", now) logger.info(f"[metrics] thinking audio started | file={file_name} | looping={looping}")Called when thinking audio playback starts.
def on_thinking_audio_stop(self) ‑> None-
Expand source code
def on_thinking_audio_stop(self) -> None: """Called when thinking audio playback stops.""" now = time.perf_counter() self._thinking_audio_start_time = None if self.current_turn: self._end_timeline_event("thinking_audio", now) logger.info("[metrics] thinking audio stopped")Called when thinking audio playback stops.
def on_tts_first_byte(self) ‑> None-
Expand source code
def on_tts_first_byte(self) -> None: """Called when TTS produces first audio byte.""" if self._tts_start_time: now = time.perf_counter() if self.current_turn and self.current_turn.tts_metrics: tts = self.current_turn.tts_metrics[-1] tts.tts_end_time = now tts.ttfb = self._round_latency(now - self._tts_start_time) tts.tts_first_byte_time = now logger.info(f"tts ttfb: {tts.ttfb}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={"ttfb": tts.ttfb}) self._tts_first_byte_time = nowCalled when TTS produces first audio byte.
def on_tts_start(self) ‑> None-
Expand source code
def on_tts_start(self) -> None: """Called when TTS processing starts.""" if not self.current_turn: self.start_turn() self._tts_start_time = time.perf_counter() self._tts_first_byte_time = None if self.current_turn: if not self.current_turn.tts_metrics: self.current_turn.tts_metrics.append(TtsMetrics()) self.current_turn.tts_metrics[-1].tts_start_time = self._tts_start_timeCalled when TTS processing starts.
def on_user_speech_end(self) ‑> None-
Expand source code
def on_user_speech_end(self) -> None: """Called when user stops speaking (VAD end).""" logger.info(f"[metrics] on_user_speech_end() called | current_turn={'exists' if self.current_turn else 'None'}") self._is_user_speaking = False self._user_speech_end_time = time.perf_counter() if self.current_turn and self.current_turn.user_speech_start_time: self.current_turn.user_speech_end_time = self._user_speech_end_time logger.info(f"[DEBUG_END] on_user_speech_end() | setting user_speech_end_time={self._user_speech_end_time}") self.current_turn.user_speech_duration = self._round_latency( self.current_turn.user_speech_end_time - self.current_turn.user_speech_start_time ) self._end_timeline_event("user_speech", self._user_speech_end_time) if not self.current_turn.vad_metrics: self.current_turn.vad_metrics.append(VadMetrics()) self.current_turn.vad_metrics[-1].user_speech_end_time = self._user_speech_end_time logger.info(f"user speech duration: {self.current_turn.user_speech_duration}ms") if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"user_speech_duration": self.current_turn.user_speech_duration} )Called when user stops speaking (VAD end).
def on_user_speech_start(self) ‑> None-
Expand source code
def on_user_speech_start(self) -> None: """Called when user starts speaking (VAD start).""" if not self.current_turn: self.start_turn() logger.info(f"[metrics] on_user_speech_start() called | _is_user_speaking={self._is_user_speaking} | current_turn={'exists' if self.current_turn else 'None'}") if self._is_user_speaking: logger.info(f"[metrics] on_user_speech_start() early return — already speaking") return self._is_user_speaking = True self._user_input_start_time = time.perf_counter() if self.current_turn: if self.current_turn.user_speech_start_time is None: logger.info(f"[DEBUG_START] on_user_speech_start() | setting user_speech_start_time={self._user_input_start_time}") self.current_turn.user_speech_start_time = self._user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics): self._start_timeline_event("user_speech", self._user_input_start_time) # Create new entry if none exists or last entry is already complete if not self.current_turn.vad_metrics or self.current_turn.vad_metrics[-1].user_speech_end_time is not None: self.current_turn.vad_metrics.append(VadMetrics()) self.current_turn.vad_metrics[-1].user_speech_start_time = self._user_input_start_timeCalled when user starts speaking (VAD start).
def on_wait_for_additional_speech(self, duration: float, eou_probability: float)-
Expand source code
def on_wait_for_additional_speech(self, duration: float, eou_probability: float): if self.current_turn: if not self.current_turn.eou_metrics: self.current_turn.eou_metrics.append(EouMetrics()) eou = self.current_turn.eou_metrics[-1] eou.wait_for_additional_speech_duration = duration eou.eou_probability = eou_probability eou.waited_for_additional_speech = True logger.info(f"wait for additional speech duration: {duration}ms") logger.info(f"wait for additional speech eou probability: {eou_probability}") def schedule_turn_complete(self, timeout: float = 1.0) ‑> None-
Expand source code
def schedule_turn_complete(self, timeout: float = 1.0) -> None: """Schedule turn completion after a timeout (for realtime modes).""" if self._agent_speech_end_timer: self._agent_speech_end_timer.cancel() try: loop = asyncio.get_event_loop() self._agent_speech_end_timer = loop.call_later(timeout, self._finalize_realtime_turn) except RuntimeError: # No event loop available, complete immediately self.complete_turn()Schedule turn completion after a timeout (for realtime modes).
def set_a2a_handoff(self) ‑> None-
Expand source code
def set_a2a_handoff(self) -> None: """Set the A2A enabled and handoff occurred flags.""" if self.current_turn: self.current_turn.is_a2a_enabled = True self.current_turn.handoff_occurred = True if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"handoff_occurred": self.current_turn.handoff_occurred} )Set the A2A enabled and handoff occurred flags.
def set_agent_response(self, response: str) ‑> None-
Expand source code
def set_agent_response(self, response: str) -> None: """Set the agent response for the current turn.""" if not self.current_turn: self.start_turn() self.current_turn.agent_speech = response logger.info(f"agent output speech: {response}") global_event_emitter.emit("AGENT_TRANSCRIPT_ADDED", {"text": response}) if not any(ev.event_type == "agent_speech" for ev in self.current_turn.timeline_event_metrics): current_time = time.perf_counter() self._start_timeline_event("agent_speech", current_time) self._update_timeline_event_text("agent_speech", response) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"agent_speech": self.current_turn.agent_speech} )Set the agent response for the current turn.
def set_eou_config(self, eou_config: Any) ‑> None-
Expand source code
def set_eou_config(self, eou_config: Any) -> None: """Store EOU config on session for later use (internal tracking, not sent).""" self.session.eou_config = self._eou_config_to_dict(eou_config)Store EOU config on session for later use (internal tracking, not sent).
def set_interrupt_config(self, interrupt_config: Any) ‑> None-
Expand source code
def set_interrupt_config(self, interrupt_config: Any) -> None: """Store Interrupt config on session for later use (internal tracking, not sent).""" self.session.interrupt_config = self._interrupt_config_to_dict(interrupt_config)Store Interrupt config on session for later use (internal tracking, not sent).
def set_llm_input(self, text: str) ‑> None-
Expand source code
def set_llm_input(self, text: str) -> None: """Record the actual text sent to LLM.""" if self.current_turn: if not self.current_turn.llm_metrics: self.current_turn.llm_metrics.append(LlmMetrics()) self.current_turn.llm_metrics[-1].llm_input = textRecord the actual text sent to LLM.
def set_llm_usage(self, usage: Dict[str, Any]) ‑> None-
Expand source code
def set_llm_usage(self, usage: Dict[str, Any]) -> None: """Set LLM token usage and calculate TPS.""" if not self.current_turn or not usage: return if not self.current_turn.llm_metrics: self.current_turn.llm_metrics.append(LlmMetrics()) llm = self.current_turn.llm_metrics[-1] llm.prompt_tokens = usage.get("prompt_tokens") llm.completion_tokens = usage.get("completion_tokens") llm.total_tokens = usage.get("total_tokens") llm.prompt_cached_tokens = usage.get("prompt_cached_tokens") if llm.llm_duration and llm.llm_duration > 0 and llm.completion_tokens and llm.completion_tokens > 0: latency_seconds = llm.llm_duration / 1000 llm.tokens_per_second = round(llm.completion_tokens / latency_seconds, 2) if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics(metrics={ "prompt_tokens": llm.prompt_tokens, "completion_tokens": llm.completion_tokens, "total_tokens": llm.total_tokens, "tokens_per_second": llm.tokens_per_second, })Set LLM token usage and calculate TPS.
def set_playground_manager(self, manager: Optional[PlaygroundManager]) ‑> None-
Expand source code
def set_playground_manager(self, manager: Optional[PlaygroundManager]) -> None: """Set the PlaygroundManager instance.""" self.playground = True self.playground_manager = managerSet the PlaygroundManager instance.
def set_preemptive_generation_enabled(self, enabled: bool) ‑> None-
Expand source code
def set_preemptive_generation_enabled(self, enabled: bool) -> None: """Mark preemptive generation as enabled for current STT.""" self.preemtive_generation_enabled = enabledMark preemptive generation as enabled for current STT.
def set_provider_info(self, component_type: str, provider_class: str, model_name: str) ‑> None-
Expand source code
def set_provider_info(self, component_type: str, provider_class: str, model_name: str) -> None: """Set provider info for a specific component. Args: component_type: e.g. "stt", "llm", "tts", "vad", "eou", "realtime" provider_class: class name of the provider model_name: model identifier """ self.session.provider_per_component[component_type] = { "provider_class": provider_class, "model_name": model_name, }Set provider info for a specific component.
Args
component_type- e.g. "stt", "llm", "tts", "vad", "eou", "realtime"
provider_class- class name of the provider
model_name- model identifier
def set_realtime_model_error(self, error: Dict[str, Any]) ‑> None-
Expand source code
def set_realtime_model_error(self, error: Dict[str, Any]) -> None: """Track a realtime model error.""" if self.current_turn: logger.error(f"realtime model error: {error}") now = time.perf_counter() self.current_turn.errors.append({ "source": "REALTIME_MODEL", "message": str(error.get("message", "Unknown error")), "timestamp": now, "timestamp_perf": now, })Track a realtime model error.
def set_realtime_usage(self, usage: Dict[str, Any]) ‑> None-
Expand source code
def set_realtime_usage(self, usage: Dict[str, Any]) -> None: """Set realtime model token usage from a flat dict (input_tokens, total_tokens, cached_*, thoughts_tokens, etc.).""" if not self.current_turn or not usage: return if not self.current_turn.realtime_metrics: self.current_turn.realtime_metrics.append(RealtimeMetrics()) rt = self.current_turn.realtime_metrics[-1] rt.realtime_input_tokens = usage.get("input_tokens") rt.realtime_output_tokens = usage.get("output_tokens") rt.realtime_total_tokens = usage.get("total_tokens") rt.realtime_input_text_tokens = usage.get("input_text_tokens") rt.realtime_input_audio_tokens = usage.get("input_audio_tokens") rt.realtime_input_image_tokens = usage.get("input_image_tokens") rt.realtime_input_cached_tokens = usage.get("input_cached_tokens") rt.realtime_thoughts_tokens = usage.get("thoughts_tokens") rt.realtime_cached_text_tokens = usage.get("cached_text_tokens") rt.realtime_cached_audio_tokens = usage.get("cached_audio_tokens") rt.realtime_cached_image_tokens = usage.get("cached_image_tokens") rt.realtime_output_text_tokens = usage.get("output_text_tokens") rt.realtime_output_audio_tokens = usage.get("output_audio_tokens") rt.realtime_output_image_tokens = usage.get("output_image_tokens")Set realtime model token usage from a flat dict (input_tokens, total_tokens, cached_*, thoughts_tokens, etc.).
def set_session_id(self, session_id: str) ‑> None-
Expand source code
def set_session_id(self, session_id: str) -> None: """Set the session ID for metrics tracking.""" self.session.session_id = session_id self.analytics_client.set_session_id(session_id)Set the session ID for metrics tracking.
def set_stt_usage(self, input_tokens: int = 0, output_tokens: int = 0, total_tokens: int = 0) ‑> None-
Expand source code
def set_stt_usage(self, input_tokens: int = 0, output_tokens: int = 0, total_tokens: int = 0) -> None: """Set STT token usage.""" if self.current_turn: if not self.current_turn.stt_metrics: self.current_turn.stt_metrics.append(SttMetrics()) stt = self.current_turn.stt_metrics[-1] stt.stt_input_tokens = input_tokens stt.stt_output_tokens = output_tokens stt.stt_total_tokens = total_tokensSet STT token usage.
def set_system_instructions(self, instructions: str) ‑> None-
Expand source code
def set_system_instructions(self, instructions: str) -> None: """Set the system instructions for this session.""" self.session.system_instruction = instructionsSet the system instructions for this session.
def set_traces_flow_manager(self,
manager: TracesFlowManager) ‑> None-
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager) -> None: """Set the TracesFlowManager instance.""" self.traces_flow_manager = managerSet the TracesFlowManager instance.
def set_user_transcript(self, transcript: str) ‑> None-
Expand source code
def set_user_transcript(self, transcript: str) -> None: """Set the user transcript for the current turn.""" if self.current_turn: # Guard: skip if the turn already has user_speech AND the pipeline # is actively processing (agent speaking OR LLM/TTS already started). # This prevents the interrupting speech's transcript from overwriting # the original transcript of the current turn. pipeline_active = ( self._is_agent_speaking or bool(self.current_turn.llm_metrics) or bool(self.current_turn.tts_metrics) ) if pipeline_active and self.current_turn.user_speech: logger.info( f"[metrics] Skipping set_user_transcript — pipeline active " f"and turn already has user_speech, new transcript " f"belongs to the next turn: {transcript}" ) return self.current_turn.user_speech = transcript logger.info(f"user input speech: {transcript}") global_event_emitter.emit("USER_TRANSCRIPT_ADDED", {"text": transcript}) # Update timeline user_events = [ ev for ev in self.current_turn.timeline_event_metrics if ev.event_type == "user_speech" ] if user_events: user_events[-1].text = transcript else: current_time = time.perf_counter() self._start_timeline_event("user_speech", current_time) if self.current_turn.timeline_event_metrics: self.current_turn.timeline_event_metrics[-1].text = transcript if self.playground and self.playground_manager: self.playground_manager.send_cascading_metrics( metrics={"user_speech": self.current_turn.user_speech} )Set the user transcript for the current turn.
def set_vad_config(self, vad_config: Dict[str, Any]) ‑> None-
Expand source code
def set_vad_config(self, vad_config: Dict[str, Any]) -> None: """Set VAD configuration.""" if not self.current_turn.vad_metrics: self.current_turn.vad_metrics.append(VadMetrics()) vad = self.current_turn.vad_metrics[-1] vad.vad_threshold = vad_config.get("threshold") vad.vad_min_speech_duration = vad_config.get("min_speech_duration") vad.vad_min_silence_duration = vad_config.get("min_silence_duration")Set VAD configuration.
def start_turn(self) ‑> None-
Expand source code
def start_turn(self) -> None: """Start a new turn. Completes any existing turn first.""" logger.info(f"[metrics] start_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}") if self.current_turn: logger.info(f"[metrics] start_turn() completing existing turn before creating new one") self.complete_turn() self._total_turns += 1 self.current_turn = TurnMetrics(turn_id=self._generate_turn_id(), preemtive_generation_enabled=self.preemtive_generation_enabled) # Carry over pending user start time from a previously discarded turn if self._pending_user_start_time is not None: self.current_turn.user_speech_start_time = self._pending_user_start_time self._start_timeline_event("user_speech", self._pending_user_start_time) # Carry over buffered data from an interrupted turn if self._pending_interrupt_timeline is not None: tl = self._pending_interrupt_timeline event = TimelineEvent( event_type="user_speech", start_time=tl["start_time"], end_time=tl.get("end_time"), duration_ms=tl.get("duration_ms"), text=tl.get("text") ) self.current_turn.timeline_event_metrics.append(event) logger.info(f"[metrics] start_turn() carried over interrupt timeline: {tl.get('text')}") self._pending_interrupt_timeline = None if self._pending_interrupt_stt is not None: self.current_turn.stt_metrics.append(self._pending_interrupt_stt) logger.info(f"[metrics] start_turn() carried over interrupt STT: {self._pending_interrupt_stt.stt_transcript}") self._pending_interrupt_stt = None if self._pending_interrupt_eou is not None: self.current_turn.eou_metrics.append(self._pending_interrupt_eou) logger.info(f"[metrics] start_turn() carried over interrupt EOU: {self._pending_interrupt_eou.eou_latency}") self._pending_interrupt_eou = None if self._pending_interrupt_vad is not None: self.current_turn.vad_metrics.append(self._pending_interrupt_vad) logger.info(f"[metrics] start_turn() carried over interrupt VAD") self._pending_interrupt_vad = None # If user is currently speaking, capture the start time if self._is_user_speaking and self._user_input_start_time: if self.current_turn.user_speech_start_time is None: self.current_turn.user_speech_start_time = self._user_input_start_time if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics): self._start_timeline_event("user_speech", self._user_input_start_time)Start a new turn. Completes any existing turn first.
def update_provider_class(self, component_type: str, provider_class: str, model_name: str = '') ‑> None-
Expand source code
def update_provider_class(self, component_type: str, provider_class: str, model_name: str = "") -> None: """Update the provider class and model name for a specific component when fallback occurs. Args: component_type: "STT", "LLM", "TTS", etc. provider_class: The new provider class name (e.g., "GoogleLLM") model_name: The new model name """ target_type = component_type.lower() if target_type in self.session.provider_per_component: self.session.provider_per_component[target_type]["provider_class"] = provider_class if model_name: self.session.provider_per_component[target_type]["model_name"] = model_name logger.info(f"Updated {target_type} provider class to: {provider_class}, model: {model_name}")Update the provider class and model name for a specific component when fallback occurs.
Args
component_type- "STT", "LLM", "TTS", etc.
provider_class- The new provider class name (e.g., "GoogleLLM")
model_name- The new model name
class SessionMetrics (session_id: str | None = None,
room_id: str | None = None,
system_instruction: str = '',
components: List[str] = <factory>,
pipeline_type: str | None = None,
pipeline_mode: str | None = None,
realtime_mode: str | None = None,
session_start_time: float = <factory>,
session_end_time: float | None = None,
participant_metrics: List[ParticipantMetrics] = <factory>,
errors: List[Dict[str, Any]] = <factory>,
provider_per_component: Dict[str, Dict[str, str]] = <factory>,
eou_config: Dict[str, Any] | None = None,
interrupt_config: Dict[str, Any] | None = None)-
Expand source code
@dataclass class SessionMetrics: """Session-level metrics.""" session_id: Optional[str] = None room_id: Optional[str] = None system_instruction: str = "" components: List[str] = field(default_factory=list) pipeline_type: Optional[str] = None pipeline_mode: Optional[str] = None realtime_mode: Optional[str] = None session_start_time: float = field(default_factory=time.time) session_end_time: Optional[float] = None participant_metrics: List[ParticipantMetrics] = field(default_factory=list) errors: List[Dict[str, Any]] = field(default_factory=list) provider_per_component: Dict[str, Dict[str, str]] = field(default_factory=dict) eou_config: Optional[Dict[str, Any]] = None interrupt_config: Optional[Dict[str, Any]] = NoneSession-level metrics.
Instance variables
var components : List[str]var eou_config : Dict[str, Any] | Nonevar errors : List[Dict[str, Any]]var interrupt_config : Dict[str, Any] | Nonevar participant_metrics : List[ParticipantMetrics]var pipeline_mode : str | Nonevar pipeline_type : str | Nonevar provider_per_component : Dict[str, Dict[str, str]]var realtime_mode : str | Nonevar room_id : str | Nonevar session_end_time : float | Nonevar session_id : str | Nonevar session_start_time : floatvar system_instruction : str
class TracesFlowManager (room_id: str, session_id: str | None = None)-
Expand source code
class TracesFlowManager: """ Manages the flow of OpenTelemetry traces for agent Turns, ensuring correct parent-child relationships between spans. """ def __init__(self, room_id: str, session_id: Optional[str] = None): self.room_id = room_id self.session_id = session_id self.root_span: Optional[Span] = None self.agent_session_span: Optional[Span] = None self.main_turn_span: Optional[Span] = None self.agent_session_config_span: Optional[Span] = None self.agent_session_closed_span: Optional[Span] = None self._turn_count = 0 self.root_span_ready = asyncio.Event() self.a2a_span: Optional[Span] = None self._a2a_turn_count = 0 self.session_metrics: Optional[SessionMetrics] = None self.participant_metrics: Optional[List[ParticipantMetrics]] = [] def set_session_metrics(self, session_metrics: SessionMetrics): """Set the session metrics for the trace manager.""" self.session_metrics = session_metrics def set_session_id(self, session_id: str): """Set the session ID for the trace manager.""" self.session_id = session_id def start_agent_joined_meeting(self, attributes: Dict[str, Any]): """Starts the root span for the agent joining a meeting.""" if self.root_span: return agent_name = attributes.get('agent_name', 'UnknownAgent') agent_id = attributes.get('peerId', 'UnknownID') span_name = f"Agent Session: agentName_{agent_name}_agentId_{agent_id}" start_time = attributes.get('start_time', time.perf_counter()) self.root_span = create_span(span_name, attributes, start_time=start_time) # Always set root_span_ready so downstream awaits don't hang # when telemetry is not initialized (create_span returns None) self.root_span_ready.set() async def start_agent_session_config(self, attributes: Dict[str, Any]): """Starts the span for the agent's session configuration, child of the root span.""" await self.root_span_ready.wait() if not self.root_span: return if self.agent_session_config_span: return start_time = attributes.get('start_time', time.perf_counter()) self.agent_session_config_span = create_span("Session Configuration", attributes, parent_span=self.root_span, start_time=start_time) if self.agent_session_config_span: with trace.use_span(self.agent_session_config_span): self.end_agent_session_config() def end_agent_session_config(self): """Completes the agent session config span.""" end_time = time.perf_counter() self.end_span(self.agent_session_config_span, "Agent session config ended", end_time=end_time) self.agent_session_config_span = None async def start_agent_session_closed(self, attributes: Dict[str, Any]): """Starts the span for agent session closed.""" await self.root_span_ready.wait() if not self.root_span: return if self.agent_session_closed_span: return start_time = attributes.get('start_time', time.perf_counter()) self.agent_session_closed_span = create_span("Agent Session Closed", attributes, parent_span=self.root_span, start_time=start_time) def end_agent_session_closed(self): """Completes the agent session closed span.""" end_time = time.perf_counter() self.end_span(self.agent_session_closed_span, "Agent session closed", end_time=end_time) self.agent_session_closed_span = None async def start_agent_session(self, attributes: Dict[str, Any]): """Starts the span for the agent's session, child of the root span.""" await self.root_span_ready.wait() if not self.root_span: return if self.agent_session_span: return start_time = attributes.get('start_time', time.perf_counter()) p_m =[] a_p_m = [] for p in self.participant_metrics: if p.kind == "user": p_m.append(asdict(p)) else: a_p_m.append(asdict(p)) attributes["participant_metrics"] = p_m attributes["agent_participant_metrics"] = a_p_m self.agent_session_span = create_span("Session Started", attributes, parent_span=self.root_span, start_time=start_time) self.start_main_turn() def start_main_turn(self): """Starts a parent span for all user-agent turn.""" if not self.agent_session_span: return if self.main_turn_span: return start_time = time.perf_counter() self.main_turn_span = create_span("User & Agent Turns", parent_span=self.agent_session_span, start_time=start_time) def create_unified_turn_trace(self, turn: TurnMetrics, session: Any = None) -> None: """ Creates a full trace for a single turn from the unified TurnMetrics schema. Handles both cascading and realtime component spans based on what data is present. """ if not self.main_turn_span: return self._turn_count += 1 turn_name = f"Turn #{self._turn_count}" # Determine turn start time dynamically to encompass all child spans start_times = [] if turn.user_speech_start_time: start_times.append(turn.user_speech_start_time) if turn.stt_metrics and turn.stt_metrics[0].stt_start_time: start_times.append(turn.stt_metrics[0].stt_start_time) if turn.llm_metrics and turn.llm_metrics[0].llm_start_time: start_times.append(turn.llm_metrics[0].llm_start_time) if turn.tts_metrics and turn.tts_metrics[0].tts_start_time: start_times.append(turn.tts_metrics[0].tts_start_time) if turn.eou_metrics and turn.eou_metrics[0].eou_start_time: start_times.append(turn.eou_metrics[0].eou_start_time) if turn.timeline_event_metrics: for ev in turn.timeline_event_metrics: if ev.start_time: start_times.append(ev.start_time) turn_span_start_time = min(start_times) if start_times else None turn_span = create_span(turn_name, parent_span=self.main_turn_span, start_time=turn_span_start_time) if not turn_span: return with trace.use_span(turn_span, end_on_exit=False): # --- VAD errors --- def create_vad_span(vad: VadMetrics): vad_errors = [e for e in turn.errors if e.get("source") == "VAD"] if vad_errors or turn.vad_metrics: vad_class = turn.session_metrics.provider_per_component.get("vad", {}).get("provider_class") vad_model = turn.session_metrics.provider_per_component.get("vad", {}).get("model_name") vad_span_name = f"{vad_class}: VAD Processing" vad_attrs = {} if not vad_class: return if vad_class: vad_attrs["provider_class"] = vad_class if vad_model: vad_attrs["model_name"] = vad_model if vad.vad_min_silence_duration: vad_attrs["min_silence_duration"] = vad.vad_min_silence_duration if vad.vad_min_speech_duration: vad_attrs["min_speech_duration"] = vad.vad_min_speech_duration if vad.vad_threshold: vad_attrs["threshold"] = vad.vad_threshold # Calculate span start time: end_of_speech_time - min_silence_duration if vad.user_speech_start_time is None and vad.user_speech_end_time is not None: vad.user_speech_start_time = vad.user_speech_end_time - vad.vad_min_silence_duration elif vad.user_speech_start_time is not None and vad.user_speech_end_time is None: vad.user_speech_end_time = vad.user_speech_start_time + vad.vad_min_silence_duration vad_start_time = vad.user_speech_start_time vad_end_time = vad.user_speech_end_time vad_span = create_span(vad_span_name, vad_attrs, parent_span=turn_span, start_time=vad_start_time) if vad_span: for error in vad_errors: vad_span.add_event("error", attributes={ "message": error["message"], "timestamp": error["timestamp"], "source": error["source"] }) with trace.use_span(vad_span): vad_error_span = create_span("VAD Error", {"message": error["message"]}, parent_span=vad_span, start_time=error["timestamp"]) self.end_span(vad_error_span, end_time=error["timestamp"]+0.100) vad_errors.remove(error) vad_status = StatusCode.ERROR if vad_errors else StatusCode.OK self.end_span(vad_span, status_code=vad_status, end_time=vad_end_time) vad_list = turn.vad_metrics if turn.vad_metrics else None if vad_list: for vad in vad_list: try: create_vad_span(vad) except Exception as e: logger.error(f"Error creating VAD span: {e}") # --- Interruption span --- def create_interruption_span(interruption: InterruptionMetrics): if interruption.false_interrupt_start_time and interruption.false_interrupt_end_time: false_interrupt_attrs = {} if interruption.interrupt_mode: false_interrupt_attrs["interrupt_mode"] = interruption.interrupt_mode if interruption.false_interrupt_pause_duration: false_interrupt_attrs["pause_duration_config"] = interruption.false_interrupt_pause_duration if interruption.false_interrupt_duration: false_interrupt_attrs["false_interrupt_duration"] = interruption.false_interrupt_duration if interruption.resumed_after_false_interrupt: false_interrupt_attrs["resumed_after_false_interrupt"] = True if interruption.false_interrupt_duration: false_interrupt_attrs["actual_duration"] = interruption.false_interrupt_duration false_interrupt_end = interruption.false_interrupt_end_time if false_interrupt_end is None: # False interrupt was followed by true interrupt false_interrupt_end = interruption.interrupt_start_time false_interrupt_span_name = "False Interruption (Resumed)" if interruption.resumed_after_false_interrupt else "False Interruption (Escalated)" false_interrupt_span = create_span(false_interrupt_span_name, false_interrupt_attrs, parent_span=turn_span, start_time=interruption.false_interrupt_start_time) self.end_span(false_interrupt_span, message="False interruption detected", end_time=false_interrupt_end) if turn.is_interrupted: interrupted_attrs = {} if interruption.interrupt_mode: interrupted_attrs["interrupt_mode"] = interruption.interrupt_mode if interruption.interrupt_min_duration: interrupted_attrs["interrupt_min_duration"] = interruption.interrupt_min_duration if interruption.interrupt_min_words: interrupted_attrs["interrupt_min_words"] = interruption.interrupt_min_words if interruption.false_interrupt_pause_duration: interrupted_attrs["false_interrupt_pause_duration"] = interruption.false_interrupt_pause_duration if interruption.resume_on_false_interrupt: interrupted_attrs["resume_on_false_interrupt"] = interruption.resume_on_false_interrupt if interruption.interrupt_reason: interrupted_attrs["interrupt_reason"] = interruption.interrupt_reason if interruption.interrupt_words: interrupted_attrs["interrupt_words"] = interruption.interrupt_words if interruption.interrupt_duration: interrupted_attrs["interrupt_duration"] = interruption.interrupt_duration # Mark if this was preceded by a false interrupt if interruption.false_interrupt_start_time is not None: interrupted_attrs["preceded_by_false_interrupt"] = True interrupted_span = create_span("Turn Interrupted", interrupted_attrs, parent_span=turn_span, start_time=interruption.interrupt_start_time) # Calculate interrupt end time with proper None checks if interruption.interrupt_start_time is not None: if interruption.interrupt_duration is not None: interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_duration elif interruption.interrupt_min_duration is not None: interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_min_duration else: interruption.interrupt_end_time = interruption.interrupt_start_time self.end_span(interrupted_span, message="Agent was interrupted", end_time=interruption.interrupt_end_time) if turn.interruption_metrics: try: create_interruption_span(turn.interruption_metrics) except Exception as e: logger.error(f"Error creating interruption span: {e}") # --- Fallback spans --- def create_fallback_span(fallback: FallbackEvent): is_recovery = fallback.is_recovery if is_recovery: fallback_span_name = f"Recovery: {fallback.component_type}" fallback_attrs = { "temporary_disable_sec": fallback.temporary_disable_sec, "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts, "recovery_attempt": fallback.recovery_attempt, "message": fallback.message, "restored_provider": fallback.new_provider_label, "previous_provider": fallback.original_provider_label, } span_time = fallback.start_time recovery_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time) if recovery_span: self.end_span(recovery_span, message="Recovery completed", end_time=fallback.end_time) return fallback_span_name = f"Fallback: {fallback.component_type}" fallback_attrs = { "temporary_disable_sec": fallback.temporary_disable_sec, "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts, "recovery_attempt": fallback.recovery_attempt, "message": fallback.message, } # Use same start_time for all spans (instant spans) span_time = fallback.start_time fallback_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time) if fallback_span: # Child trace for original connection attempt (if exists) if fallback.original_provider_label: original_conn_attrs = { "provider": fallback.original_provider_label, "status": "failed" } original_conn_span = create_span( f"Connection: {fallback.original_provider_label}", original_conn_attrs, parent_span=fallback_span, start_time=span_time ) self.end_span(original_conn_span, status_code=StatusCode.ERROR, end_time=span_time) # Child trace for new connection attempt (if switched successfully) if fallback.new_provider_label: new_conn_attrs = { "provider": fallback.new_provider_label, "status": "success" } new_conn_span = create_span( f"Connection: {fallback.new_provider_label}", new_conn_attrs, parent_span=fallback_span, start_time=span_time ) self.end_span(new_conn_span, status_code=StatusCode.OK, end_time=span_time) # End the fallback span - status depends on whether we successfully switched fallback_status = StatusCode.OK if fallback.new_provider_label else StatusCode.ERROR self.end_span(fallback_span, status_code=fallback_status, end_time=span_time) if turn.fallback_events: for fallback in turn.fallback_events: try: create_fallback_span(fallback) except Exception as e: logger.error(f"Error creating fallback span: {e}") # --- STT spans --- def create_stt_span(stt: SttMetrics): stt_errors = [e for e in turn.errors if e.get("source") == "STT"] if stt or stt_errors: stt_attrs = {} if stt: stt_class = turn.session_metrics.provider_per_component.get("stt", {}).get("provider_class") if stt_class: stt_attrs["provider_class"] = stt_class stt_model = turn.session_metrics.provider_per_component.get("stt", {}).get("model_name") stt_attrs["input"] = "N/A" if stt_model: stt_attrs["model_name"] = stt_model if stt.stt_latency is not None: stt_attrs["duration_ms"] = stt.stt_latency if stt.stt_start_time: stt_attrs["start_timestamp"] = stt.stt_start_time if stt.stt_end_time: stt_attrs["end_timestamp"] = stt.stt_end_time if stt.stt_transcript: stt_attrs["output"] = stt.stt_transcript if stt_class =="DeepgramSTTV2" and turn.preemtive_generation_enabled: stt_attrs["stt_preemptive_generation_enabled"] = turn.preemtive_generation_enabled stt_span_name = f"{stt_class}: Speech to Text Processing" stt_span = create_span( stt_span_name, stt_attrs, parent_span=turn_span, start_time=stt.stt_start_time if stt else None, ) if stt.stt_preemptive_generation_enabled: with trace.use_span(stt_span): preemptive_attributes = { "preemptive_generation_occurred": stt.stt_preemptive_generation_occurred, "partial_text": stt.stt_preflight_transcript, "final_text": stt.stt_transcript, } if stt.stt_preemptive_generation_occurred: preemptive_attributes["preemptive_generation_latency"] = stt.stt_preflight_latency preemptive_span = create_span("Preemptive Generation", preemptive_attributes, parent_span=stt_span, start_time=stt.stt_start_time) preemptive_end_time = stt.stt_preflight_end_time or stt.stt_end_time self.end_span(preemptive_span, end_time=preemptive_end_time) if stt_span: for error in stt_errors: stt_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) if stt.stt_start_time <= error.get("timestamp") <= stt.stt_end_time: with trace.use_span(stt_span): stt_error_span = create_span("STT Error", {"message": error.get("message", "")}, parent_span=stt_span, start_time=error.get("timestamp")) self.end_span(stt_error_span, end_time=error.get("timestamp")+0.100) stt_errors.remove(error) status = StatusCode.ERROR if stt_errors else StatusCode.OK self.end_span(stt_span, status_code=status, end_time=stt.stt_end_time if stt else None) stt_list = turn.stt_metrics if turn.stt_metrics else None if stt_list: for stt in stt_list: try: create_stt_span(stt) except Exception as e: logger.error(f"Error creating STT span: {e}") # --- EOU spans --- def create_eou_span(eou: EouMetrics): eou_errors = [e for e in turn.errors if e.get("source") == "TURN-D"] eou_attrs = {} if eou: eou_class = turn.session_metrics.provider_per_component.get("eou", {}).get("provider_class") eou_model = turn.session_metrics.provider_per_component.get("eou", {}).get("model_name") if eou_class: eou_attrs["provider_class"] = eou_class if eou_model: eou_attrs["model_name"] = eou_model if turn.user_speech: eou_attrs["input"] = turn.user_speech if eou.eou_latency is not None: eou_attrs["duration_ms"] = eou.eou_latency if eou.eou_start_time: eou_attrs["start_timestamp"] = eou.eou_start_time if eou.eou_end_time: eou_attrs["end_timestamp"] = eou.eou_end_time if eou.waited_for_additional_speech: eou_attrs["waited_for_additional_speech"] = eou.waited_for_additional_speech if eou.eou_probability: eou_attrs["eou_probability"] = round(eou.eou_probability, 4) if turn.session_metrics.eou_config.get("min_speech_wait_timeout"): eou_attrs["min_speech_wait_timeout"] = turn.session_metrics.eou_config.get("min_speech_wait_timeout") if turn.session_metrics.eou_config.get("max_speech_wait_timeout"): eou_attrs["max_speech_wait_timeout"] = turn.session_metrics.eou_config.get("max_speech_wait_timeout") eou_span_name = f"{eou_class}: End-Of-Utterance Detection" eou_span = create_span( eou_span_name, eou_attrs, parent_span=turn_span, start_time=eou.eou_start_time if eou else None, ) if eou.waited_for_additional_speech: delay = round(eou.wait_for_additional_speech_duration, 4) with trace.use_span(eou_span): wait_for_additional_speech_span =create_span ( "Wait for Additional Speech", { "wait_for_additional_speech_duration":delay, "eou_probability": round(eou.eou_probability, 4), }, start_time=eou.eou_end_time, ) self.end_span(wait_for_additional_speech_span, status_code=StatusCode.OK, end_time=eou.eou_end_time + delay) if eou_span: for error in eou_errors: eou_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) with trace.use_span(eou_span): eou_error_span = create_span("EOU Error", {"message": error.get("message", "")}, parent_span=eou_span, start_time=error.get("timestamp")) self.end_span(eou_error_span, end_time=error.get("timestamp")+0.100) eou_errors.remove(error) eou_status = StatusCode.ERROR if eou_errors else StatusCode.OK self.end_span(eou_span, status_code=eou_status, end_time=eou.eou_end_time if eou else None) eou_list = turn.eou_metrics if turn.eou_metrics else None if eou_list: for eou in eou_list: try: create_eou_span(eou) except Exception as e: logger.error(f"Error creating EOU span: {e}") # --- LLM spans --- def create_llm_span(llm: LlmMetrics): llm_errors = [e for e in turn.errors if e.get("source") == "LLM"] llm_attrs = {} if llm: llm_class = turn.session_metrics.provider_per_component.get("llm", {}).get("provider_class") if llm_class: llm_attrs["provider_class"] = llm_class llm_model = turn.session_metrics.provider_per_component.get("llm", {}).get("model_name") if llm_model: llm_attrs["model_name"] = llm_model if llm.llm_input: llm_attrs["input"] = llm.llm_input if llm.llm_duration: llm_attrs["duration_ms"] = llm.llm_duration if llm.llm_start_time: llm_attrs["start_timestamp"] = llm.llm_start_time if llm.llm_end_time: llm_attrs["end_timestamp"] = llm.llm_end_time if turn.agent_speech: llm_attrs["output"] = turn.agent_speech if llm.prompt_tokens: llm_attrs["input_tokens"] = llm.prompt_tokens if llm.completion_tokens: llm_attrs["output_tokens"] = llm.completion_tokens if llm.prompt_cached_tokens: llm_attrs["cached_input_tokens"] = llm.prompt_cached_tokens if llm.total_tokens: llm_attrs["total_tokens"] = llm.total_tokens llm_span_name = f"{llm_class}: LLM Processing" llm_span = create_span( llm_span_name, llm_attrs, parent_span=turn_span, start_time=llm.llm_start_time if llm else None, ) if llm_span: # Tool call sub-spans if turn.function_tool_timestamps: for tool_data in turn.function_tool_timestamps: tool_timestamp = tool_data.get("timestamp") tool_span = create_span( f"Invoked Tool: {tool_data.get('tool_name', 'unknown')}", parent_span=llm_span, start_time=tool_timestamp, ) self.end_span(tool_span, end_time=tool_timestamp) for error in llm_errors: llm_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) with trace.use_span(llm_span): llm_error_span = create_span("LLM Error", {"message": error.get("message", "")}, parent_span=llm_span, start_time=error.get("timestamp")) self.end_span(llm_error_span, end_time=error.get("timestamp")+0.100) llm_errors.remove(error) # TTFT sub-span if llm and llm.llm_ttft is not None and llm.llm_start_time is not None: ttft_span = create_span( "Time to First Token", attributes={"llm_ttft": llm.llm_ttft}, parent_span=llm_span, start_time=llm.llm_start_time, ) ttft_end = llm.llm_start_time + (llm.llm_ttft / 1000) self.end_span(ttft_span, end_time=ttft_end) llm_status = StatusCode.ERROR if llm_errors else StatusCode.OK self.end_span(llm_span, status_code=llm_status, end_time=llm.llm_end_time if llm else None) llm_list = turn.llm_metrics if turn.llm_metrics else None if llm_list: for llm in llm_list: try: create_llm_span(llm) except Exception as e: logger.error(f"Error creating LLM span: {e}") # --- TTS spans --- def create_tts_span(tts: TtsMetrics): tts_errors = [e for e in turn.errors if e.get("source") == "TTS"] tts_attrs = {} if tts: tts_class = turn.session_metrics.provider_per_component.get("tts", {}).get("provider_class") tts_model = turn.session_metrics.provider_per_component.get("tts", {}).get("model_name") if tts_class: tts_attrs["provider_class"] = tts_class if tts_model: tts_attrs["model_name"] = tts_model if turn.agent_speech: tts_attrs["input"] = turn.agent_speech if tts.tts_duration: tts_attrs["duration_ms"] = tts.tts_duration if tts.tts_start_time: tts_attrs["start_timestamp"] = tts.tts_start_time if tts.tts_end_time: tts_attrs["end_timestamp"] = tts.tts_end_time if tts.tts_characters: tts_attrs["characters"] = tts.tts_characters if turn.agent_speech_duration: tts_attrs["audio_duration_ms"] = turn.agent_speech_duration tts_attrs["output"] = "N/A" tts_span_name = f"{tts_class}: Text to Speech Processing" tts_span = create_span( tts_span_name, tts_attrs, parent_span=turn_span, start_time=tts.tts_start_time if tts else None, ) if tts_span: # TTFB sub-span if tts and tts.tts_first_byte_time is not None: ttfb_span = create_span( "Time to First Byte", parent_span=tts_span, start_time=tts.tts_start_time, ) self.end_span(ttfb_span, end_time=tts.tts_first_byte_time) for error in tts_errors: tts_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) with trace.use_span(tts_span): tts_error_span = create_span("TTS Error", {"message": error.get("message", "")}, parent_span=tts_span, start_time=error.get("timestamp")) self.end_span(tts_error_span, end_time=error.get("timestamp")+0.100) tts_errors.remove(error) tts_status = StatusCode.ERROR if tts_errors else StatusCode.OK self.end_span(tts_span, status_code=tts_status, end_time=tts.tts_end_time if tts else None) tts_list = turn.tts_metrics if turn.tts_metrics else None if tts_list: for tts in tts_list: try: create_tts_span(tts) except Exception as e: logger.error(f"Error creating TTS span: {e}") # --- KB spans --- def create_kb_span(kb: KbMetrics): kb_span_name = "Knowledge Base: Retrieval" kb_attrs = {} if turn.user_speech: kb_attrs["input"] = turn.user_speech if kb.kb_retrieval_latency: kb_attrs["retrieval_latency_ms"] = kb.kb_retrieval_latency if kb.kb_start_time: kb_attrs["start_timestamp"] = kb.kb_start_time if kb.kb_end_time: kb_attrs["end_timestamp"] = kb.kb_end_time if kb.kb_documents: # Join documents as comma-separated string for readability kb_attrs["documents"] = ", ".join(kb.kb_documents) if len(kb.kb_documents) <= 5 else f"{len(kb.kb_documents)} documents" kb_attrs["document_count"] = len(kb.kb_documents) if kb.kb_scores: # Include scores as comma-separated string kb_attrs["scores"] = ", ".join([str(round(s, 4)) for s in kb.kb_scores[:5]]) kb_span = create_span(kb_span_name, kb_attrs, parent_span=turn_span, start_time=kb.kb_start_time) if kb_span: self.end_span(kb_span, status_code=StatusCode.OK, end_time=kb.kb_end_time) if turn.kb_metrics: for kb in turn.kb_metrics: try: create_kb_span(kb) except Exception as e: logger.error(f"Error creating KB span: {e}") # --- Realtime spans (for S2S modes) --- def create_rt_span(rt: RealtimeMetrics): rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"] rt_attrs = {} if rt: rt_class = turn.session_metrics.provider_per_component.get("realtime", {}).get("provider_class") if rt_class: rt_attrs["provider_class"] = rt_class rt_model = turn.session_metrics.provider_per_component.get("realtime", {}).get("model_name") if rt_model: rt_attrs["model_name"] = rt_model rt_start_time = turn.user_speech_end_time if turn.user_speech_end_time else turn.agent_speech_start_time rt_end_time = turn.agent_speech_start_time # if turn.timeline_event_metrics: # for event in turn.timeline_event_metrics: # if event.event_type == "user_speech": # rt_start_time = event.end_time # break # for event in turn.timeline_event_metrics: # if event.event_type == "agent_speech": # rt_end_time = event.start_time # break rt_span_name = f"{rt_class}: Realtime Processing" rt_span = create_span( rt_span_name, rt_attrs, parent_span=turn_span, start_time=rt_start_time, ) if rt_span: # Realtime tool calls if turn.function_tools_called: for tool_name in turn.function_tools_called: tool_span = create_span( f"Invoked Tool: {tool_name}", parent_span=turn_span, start_time=time.perf_counter(), ) self.end_span(tool_span, end_time=time.perf_counter()) # TTFB span for realtime if turn.e2e_latency is not None: ttfb_span = create_span( "Time to First Word", {"duration_ms": turn.e2e_latency}, parent_span=rt_span, start_time=rt_start_time, ) self.end_span(ttfb_span, end_time=rt_end_time) # --- Realtime model errors --- rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"] if rt_errors: for error in rt_errors: turn_span.add_event("Errors", attributes={ "message": error.get("message", "Unknown error"), "timestamp": error.get("timestamp", "N/A"), }) with trace.use_span(turn_span): rt_error_span = create_span("Realtime Error", {"message": error.get("message", "")}, parent_span=turn_span, start_time=error.get("timestamp")) self.end_span(rt_error_span, end_time=error.get("timestamp")+0.100) rt_errors.remove(error) self.end_span(rt_span, status_code=StatusCode.ERROR if rt_errors else StatusCode.OK, end_time=rt_end_time) rt_list = turn.realtime_metrics if turn.realtime_metrics else None if rt_list: for rt in rt_list: try: create_rt_span(rt) except Exception as e: logger.error(f"Error creating RT span: {e}") def create_error_spans(errors:Dict[str, Any]): error_span_name = f"{errors.get('source', 'Unknown')} Error span" attr={} if errors.get('message'): attr['error message'] = errors.get('message') span_start_time = errors.get('timestamp_perf') error_span = create_span(error_span_name, attributes=attr, parent_span=turn_span, start_time=span_start_time) self.end_span(error_span, status_code=StatusCode.ERROR, end_time=span_start_time + 0.001) for e in turn.errors: try: create_error_spans(e) except Exception as e: logger.error(f"Error creating error span: {e}") # Determine turn end time first for unbounded children spans end_times = [] if turn.tts_metrics and turn.tts_metrics[-1].tts_end_time: end_times.append(turn.tts_metrics[-1].tts_end_time) if turn.llm_metrics and turn.llm_metrics[-1].llm_end_time: end_times.append(turn.llm_metrics[-1].llm_end_time) if turn.agent_speech_end_time: end_times.append(turn.agent_speech_end_time) if turn.eou_metrics and turn.eou_metrics[-1].eou_end_time: end_times.append(turn.eou_metrics[-1].eou_end_time) if turn.stt_metrics and turn.stt_metrics[-1].stt_end_time: end_times.append(turn.stt_metrics[-1].stt_end_time) if turn.user_speech_end_time: end_times.append(turn.user_speech_end_time) if turn.interruption_metrics and turn.interruption_metrics.false_interrupt_end_time: end_times.append(turn.interruption_metrics.false_interrupt_end_time) turn_end_time = max(end_times) if end_times else None if turn.is_interrupted or turn_end_time is None: turn_end_time = time.perf_counter() # --- Timeline events --- if turn.timeline_event_metrics: for event in turn.timeline_event_metrics: if event.event_type == "user_speech": user_speech_span = create_span( "User Input Speech", {"Transcript": event.text, "duration_ms": event.duration_ms}, parent_span=turn_span, start_time=event.start_time, ) self.end_span(user_speech_span, end_time=event.end_time if event.end_time else turn_end_time) elif event.event_type == "agent_speech": agent_speech_span = create_span( "Agent Output Speech", {"Transcript": event.text, "duration_ms": event.duration_ms}, parent_span=turn_span, start_time=event.start_time, ) self.end_span(agent_speech_span, end_time=event.end_time if event.end_time else turn_end_time) elif event.event_type == "thinking_audio": thinking_attrs = {} if turn.thinking_audio_file_path: thinking_attrs["file_path"] = turn.thinking_audio_file_path if turn.thinking_audio_looping is not None: thinking_attrs["looping"] = turn.thinking_audio_looping if turn.thinking_audio_override_thinking is not None: thinking_attrs["override_thinking"] = turn.thinking_audio_override_thinking if event.duration_ms is not None: thinking_attrs["duration_ms"] = event.duration_ms thinking_span = create_span( "Thinking Audio", thinking_attrs, parent_span=turn_span, start_time=event.start_time, ) self.end_span(thinking_span, end_time=event.end_time if event.end_time else turn_end_time) elif event.event_type == "background_audio": bg_attrs = {} if turn.background_audio_file_path: bg_attrs["file_path"] = turn.background_audio_file_path if turn.background_audio_looping is not None: bg_attrs["looping"] = turn.background_audio_looping if event.duration_ms is not None: bg_attrs["duration_ms"] = event.duration_ms bg_span = create_span( "Background Audio", bg_attrs, parent_span=turn_span, start_time=event.start_time, ) self.end_span(bg_span, end_time=event.end_time if event.end_time else turn_end_time) self.end_span(turn_span, message="End of turn trace.", end_time=turn_end_time) def end_main_turn(self): """Completes the main turn span.""" self.end_span(self.main_turn_span, "All turns processed", end_time=time.perf_counter()) self.main_turn_span = None def agent_say_called(self, message: str): """Creates a span for the agent's say method.""" if not self.agent_session_span: return current_span = trace.get_current_span() agent_say_span = create_span( "Agent Say", {"Agent Say Message": message}, parent_span=current_span if current_span else self.agent_session_span, start_time=time.perf_counter() ) self.end_span(agent_say_span, "Agent say span created", end_time=time.perf_counter()) def agent_reply_called(self, instructions: str): """Creates a span for an agent reply invocation.""" if not self.agent_session_span: return current_span = trace.get_current_span() agent_reply_span = create_span( "Agent Reply", {"Agent Reply Instructions": instructions}, parent_span=current_span if current_span else self.agent_session_span, start_time=time.perf_counter() ) self.end_span(agent_reply_span, "Agent reply span created", end_time=time.perf_counter()) def create_components_change_trace(self, components_change_status: Dict[str, Any], components_change_data: Dict[str, Any], time_data: Dict[str, Any]) -> None: """ Creates a span for the agent's components change. Args: components_change_status: Status of the components change. components_change_data: Data of the components change. time_data: Time data of the components change. """ if not self.main_turn_span: return attr = {} if components_change_data.get("new_stt") is not None: attr["new_stt"] = components_change_data["new_stt"] if components_change_data.get("new_tts") is not None: attr["new_tts"] = components_change_data["new_tts"] if components_change_data.get("new_llm") is not None: attr["new_llm"] = components_change_data["new_llm"] if components_change_data.get("new_vad") is not None: attr["new_vad"] = components_change_data["new_vad"] if components_change_data.get("new_turn_detector") is not None: attr["new_turn_detector"] = components_change_data["new_turn_detector"] if components_change_data.get("new_denoise") is not None: attr["new_denoise"] = components_change_data["new_denoise"] if components_change_status: attr["components_change_status"] = components_change_status self.components_change_span = create_span( "Components Change", attr, parent_span=self.main_turn_span, start_time=time_data.get("start_time", time.perf_counter()) ) self.end_span(self.components_change_span, "Components change span created", end_time=time_data.get("end_time", time.perf_counter())) self.components_change_span = None def create_pipeline_change_trace(self, time_data: Dict[str, Any], original_pipeline_config: Dict[str, Any], new_pipeline_config: Dict[str, Any]) -> None: """ Creates a span for the agent's pipeline change. Args: time_data: Time data of the pipeline change. original_pipeline_config: Original pipeline configuration. new_pipeline_config: New pipeline configuration. """ if not self.main_turn_span: return attr = { "original_pipeline_config": original_pipeline_config, "new_pipeline_config": new_pipeline_config } pipeline_change_span = create_span( "Pipeline Change", attr, parent_span=self.main_turn_span, start_time=time_data.get("start_time", time.perf_counter()) ) self.end_span(pipeline_change_span, "Pipeline change span created", end_time=time_data.get("end_time", time.perf_counter())) def create_a2a_trace(self, name: str, attributes: Dict[str, Any]) -> Optional[Span]: """Creates an A2A trace under the main turn span.""" if not self.main_turn_span: return None if not self.a2a_span: self.a2a_span = create_span( "Agent-to-Agent Communications", {"total_a2a_turns": self._a2a_turn_count}, parent_span=self.main_turn_span ) if not self.a2a_span: return None self._a2a_turn_count += 1 span_name = f"A2A {self._a2a_turn_count}: {name}" a2a_span = create_span( span_name, { **attributes, "a2a_turn_number": self._a2a_turn_count, "parent_span": "Agent-to-Agent Communications" }, parent_span=self.a2a_span, start_time=time.perf_counter() ) return a2a_span def end_a2a_trace(self, span: Optional[Span], message: str = ""): """Ends an A2A trace span.""" complete_span(span, StatusCode.OK, end_time=time.perf_counter()) def end_a2a_communication(self): """Ends the A2A communication parent span.""" complete_span(self.a2a_span, StatusCode.OK, end_time=time.perf_counter()) self.a2a_span = None self._a2a_turn_count = 0 def create_thinking_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None): """Creates a 'Playing Thinking Audio' point-in-time span at session level.""" if not self.main_turn_span: return None attrs = {"event": "start", "looping": looping} if file_path: attrs["file_path"] = file_path t = start_time or time.perf_counter() span = create_span("Playing Thinking Audio", attrs, parent_span=self.main_turn_span, start_time=t) self.end_span(span, message="Thinking audio started", end_time=t) return span def create_thinking_audio_stop_span(self, end_time: float = None): """Creates a 'Stopped Thinking Audio' point-in-time span at session level.""" if not self.main_turn_span: return None t = end_time or time.perf_counter() span = create_span("Stopped Thinking Audio", {"event": "stop"}, parent_span=self.main_turn_span, start_time=t) self.end_span(span, message="Thinking audio stopped", end_time=t) return span def create_background_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None): """Creates a 'Playing Background Audio' span at session level (same level as turn spans).""" if not self.main_turn_span: return None bg_audio_attrs = {} if file_path: bg_audio_attrs["file_path"] = file_path bg_audio_attrs["looping"] = looping bg_audio_attrs["event"] = "start" start_span = create_span("Playing Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=start_time or time.perf_counter()) # End immediately as a point-in-time event self.end_span(start_span, message="Background audio started", end_time=start_time or time.perf_counter()) return start_span def create_background_audio_stop_span(self, file_path: str = None, looping: bool = False, end_time: float = None): """Creates a 'Stopped Background Audio' span at session level (same level as turn spans).""" if not self.main_turn_span: return None bg_audio_attrs = {} if file_path: bg_audio_attrs["file_path"] = file_path bg_audio_attrs["looping"] = looping bg_audio_attrs["event"] = "stop" stop_span = create_span("Stopped Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=end_time or time.perf_counter()) # End immediately as a point-in-time event self.end_span(stop_span, message="Background audio stopped", end_time=end_time or time.perf_counter()) return stop_span def end_agent_session(self): """Completes the agent session span.""" if self.main_turn_span: self.end_main_turn() self.end_span(self.agent_session_span, "Agent session ended", end_time=time.perf_counter()) self.agent_session_span = None def agent_meeting_end(self): """Completes the root span.""" if self.agent_session_span: self.end_agent_session() if self.agent_session_config_span: self.end_agent_session_config() if self.agent_session_closed_span: self.end_agent_session_closed() self.end_span(self.root_span, "Agent left meeting", end_time=time.perf_counter()) self.root_span = None def end_span(self, span: Optional[Span], message: str = "", status_code: StatusCode = StatusCode.OK, end_time: Optional[float] = None): """Completes a given span with a status.""" if span: if end_time is None: end_time = time.perf_counter() desc = message if status_code == StatusCode.ERROR else "" complete_span(span, status_code, desc, end_time)Manages the flow of OpenTelemetry traces for agent Turns, ensuring correct parent-child relationships between spans.
Methods
def agent_meeting_end(self)-
Expand source code
def agent_meeting_end(self): """Completes the root span.""" if self.agent_session_span: self.end_agent_session() if self.agent_session_config_span: self.end_agent_session_config() if self.agent_session_closed_span: self.end_agent_session_closed() self.end_span(self.root_span, "Agent left meeting", end_time=time.perf_counter()) self.root_span = NoneCompletes the root span.
def agent_reply_called(self, instructions: str)-
Expand source code
def agent_reply_called(self, instructions: str): """Creates a span for an agent reply invocation.""" if not self.agent_session_span: return current_span = trace.get_current_span() agent_reply_span = create_span( "Agent Reply", {"Agent Reply Instructions": instructions}, parent_span=current_span if current_span else self.agent_session_span, start_time=time.perf_counter() ) self.end_span(agent_reply_span, "Agent reply span created", end_time=time.perf_counter())Creates a span for an agent reply invocation.
def agent_say_called(self, message: str)-
Expand source code
def agent_say_called(self, message: str): """Creates a span for the agent's say method.""" if not self.agent_session_span: return current_span = trace.get_current_span() agent_say_span = create_span( "Agent Say", {"Agent Say Message": message}, parent_span=current_span if current_span else self.agent_session_span, start_time=time.perf_counter() ) self.end_span(agent_say_span, "Agent say span created", end_time=time.perf_counter())Creates a span for the agent's say method.
def create_a2a_trace(self, name: str, attributes: Dict[str, Any]) ‑> opentelemetry.trace.span.Span | None-
Expand source code
def create_a2a_trace(self, name: str, attributes: Dict[str, Any]) -> Optional[Span]: """Creates an A2A trace under the main turn span.""" if not self.main_turn_span: return None if not self.a2a_span: self.a2a_span = create_span( "Agent-to-Agent Communications", {"total_a2a_turns": self._a2a_turn_count}, parent_span=self.main_turn_span ) if not self.a2a_span: return None self._a2a_turn_count += 1 span_name = f"A2A {self._a2a_turn_count}: {name}" a2a_span = create_span( span_name, { **attributes, "a2a_turn_number": self._a2a_turn_count, "parent_span": "Agent-to-Agent Communications" }, parent_span=self.a2a_span, start_time=time.perf_counter() ) return a2a_spanCreates an A2A trace under the main turn span.
def create_background_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None)-
Expand source code
def create_background_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None): """Creates a 'Playing Background Audio' span at session level (same level as turn spans).""" if not self.main_turn_span: return None bg_audio_attrs = {} if file_path: bg_audio_attrs["file_path"] = file_path bg_audio_attrs["looping"] = looping bg_audio_attrs["event"] = "start" start_span = create_span("Playing Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=start_time or time.perf_counter()) # End immediately as a point-in-time event self.end_span(start_span, message="Background audio started", end_time=start_time or time.perf_counter()) return start_spanCreates a 'Playing Background Audio' span at session level (same level as turn spans).
def create_background_audio_stop_span(self, file_path: str = None, looping: bool = False, end_time: float = None)-
Expand source code
def create_background_audio_stop_span(self, file_path: str = None, looping: bool = False, end_time: float = None): """Creates a 'Stopped Background Audio' span at session level (same level as turn spans).""" if not self.main_turn_span: return None bg_audio_attrs = {} if file_path: bg_audio_attrs["file_path"] = file_path bg_audio_attrs["looping"] = looping bg_audio_attrs["event"] = "stop" stop_span = create_span("Stopped Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=end_time or time.perf_counter()) # End immediately as a point-in-time event self.end_span(stop_span, message="Background audio stopped", end_time=end_time or time.perf_counter()) return stop_spanCreates a 'Stopped Background Audio' span at session level (same level as turn spans).
def create_components_change_trace(self,
components_change_status: Dict[str, Any],
components_change_data: Dict[str, Any],
time_data: Dict[str, Any]) ‑> None-
Expand source code
def create_components_change_trace(self, components_change_status: Dict[str, Any], components_change_data: Dict[str, Any], time_data: Dict[str, Any]) -> None: """ Creates a span for the agent's components change. Args: components_change_status: Status of the components change. components_change_data: Data of the components change. time_data: Time data of the components change. """ if not self.main_turn_span: return attr = {} if components_change_data.get("new_stt") is not None: attr["new_stt"] = components_change_data["new_stt"] if components_change_data.get("new_tts") is not None: attr["new_tts"] = components_change_data["new_tts"] if components_change_data.get("new_llm") is not None: attr["new_llm"] = components_change_data["new_llm"] if components_change_data.get("new_vad") is not None: attr["new_vad"] = components_change_data["new_vad"] if components_change_data.get("new_turn_detector") is not None: attr["new_turn_detector"] = components_change_data["new_turn_detector"] if components_change_data.get("new_denoise") is not None: attr["new_denoise"] = components_change_data["new_denoise"] if components_change_status: attr["components_change_status"] = components_change_status self.components_change_span = create_span( "Components Change", attr, parent_span=self.main_turn_span, start_time=time_data.get("start_time", time.perf_counter()) ) self.end_span(self.components_change_span, "Components change span created", end_time=time_data.get("end_time", time.perf_counter())) self.components_change_span = NoneCreates a span for the agent's components change.
Args
components_change_status- Status of the components change.
components_change_data- Data of the components change.
time_data- Time data of the components change.
def create_pipeline_change_trace(self,
time_data: Dict[str, Any],
original_pipeline_config: Dict[str, Any],
new_pipeline_config: Dict[str, Any]) ‑> None-
Expand source code
def create_pipeline_change_trace(self, time_data: Dict[str, Any], original_pipeline_config: Dict[str, Any], new_pipeline_config: Dict[str, Any]) -> None: """ Creates a span for the agent's pipeline change. Args: time_data: Time data of the pipeline change. original_pipeline_config: Original pipeline configuration. new_pipeline_config: New pipeline configuration. """ if not self.main_turn_span: return attr = { "original_pipeline_config": original_pipeline_config, "new_pipeline_config": new_pipeline_config } pipeline_change_span = create_span( "Pipeline Change", attr, parent_span=self.main_turn_span, start_time=time_data.get("start_time", time.perf_counter()) ) self.end_span(pipeline_change_span, "Pipeline change span created", end_time=time_data.get("end_time", time.perf_counter()))Creates a span for the agent's pipeline change.
Args
time_data- Time data of the pipeline change.
original_pipeline_config- Original pipeline configuration.
new_pipeline_config- New pipeline configuration.
def create_thinking_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None)-
Expand source code
def create_thinking_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None): """Creates a 'Playing Thinking Audio' point-in-time span at session level.""" if not self.main_turn_span: return None attrs = {"event": "start", "looping": looping} if file_path: attrs["file_path"] = file_path t = start_time or time.perf_counter() span = create_span("Playing Thinking Audio", attrs, parent_span=self.main_turn_span, start_time=t) self.end_span(span, message="Thinking audio started", end_time=t) return spanCreates a 'Playing Thinking Audio' point-in-time span at session level.
def create_thinking_audio_stop_span(self, end_time: float = None)-
Expand source code
def create_thinking_audio_stop_span(self, end_time: float = None): """Creates a 'Stopped Thinking Audio' point-in-time span at session level.""" if not self.main_turn_span: return None t = end_time or time.perf_counter() span = create_span("Stopped Thinking Audio", {"event": "stop"}, parent_span=self.main_turn_span, start_time=t) self.end_span(span, message="Thinking audio stopped", end_time=t) return spanCreates a 'Stopped Thinking Audio' point-in-time span at session level.
def create_unified_turn_trace(self,
turn: TurnMetrics,
session: Any = None) ‑> None-
Expand source code
def create_unified_turn_trace(self, turn: TurnMetrics, session: Any = None) -> None: """ Creates a full trace for a single turn from the unified TurnMetrics schema. Handles both cascading and realtime component spans based on what data is present. """ if not self.main_turn_span: return self._turn_count += 1 turn_name = f"Turn #{self._turn_count}" # Determine turn start time dynamically to encompass all child spans start_times = [] if turn.user_speech_start_time: start_times.append(turn.user_speech_start_time) if turn.stt_metrics and turn.stt_metrics[0].stt_start_time: start_times.append(turn.stt_metrics[0].stt_start_time) if turn.llm_metrics and turn.llm_metrics[0].llm_start_time: start_times.append(turn.llm_metrics[0].llm_start_time) if turn.tts_metrics and turn.tts_metrics[0].tts_start_time: start_times.append(turn.tts_metrics[0].tts_start_time) if turn.eou_metrics and turn.eou_metrics[0].eou_start_time: start_times.append(turn.eou_metrics[0].eou_start_time) if turn.timeline_event_metrics: for ev in turn.timeline_event_metrics: if ev.start_time: start_times.append(ev.start_time) turn_span_start_time = min(start_times) if start_times else None turn_span = create_span(turn_name, parent_span=self.main_turn_span, start_time=turn_span_start_time) if not turn_span: return with trace.use_span(turn_span, end_on_exit=False): # --- VAD errors --- def create_vad_span(vad: VadMetrics): vad_errors = [e for e in turn.errors if e.get("source") == "VAD"] if vad_errors or turn.vad_metrics: vad_class = turn.session_metrics.provider_per_component.get("vad", {}).get("provider_class") vad_model = turn.session_metrics.provider_per_component.get("vad", {}).get("model_name") vad_span_name = f"{vad_class}: VAD Processing" vad_attrs = {} if not vad_class: return if vad_class: vad_attrs["provider_class"] = vad_class if vad_model: vad_attrs["model_name"] = vad_model if vad.vad_min_silence_duration: vad_attrs["min_silence_duration"] = vad.vad_min_silence_duration if vad.vad_min_speech_duration: vad_attrs["min_speech_duration"] = vad.vad_min_speech_duration if vad.vad_threshold: vad_attrs["threshold"] = vad.vad_threshold # Calculate span start time: end_of_speech_time - min_silence_duration if vad.user_speech_start_time is None and vad.user_speech_end_time is not None: vad.user_speech_start_time = vad.user_speech_end_time - vad.vad_min_silence_duration elif vad.user_speech_start_time is not None and vad.user_speech_end_time is None: vad.user_speech_end_time = vad.user_speech_start_time + vad.vad_min_silence_duration vad_start_time = vad.user_speech_start_time vad_end_time = vad.user_speech_end_time vad_span = create_span(vad_span_name, vad_attrs, parent_span=turn_span, start_time=vad_start_time) if vad_span: for error in vad_errors: vad_span.add_event("error", attributes={ "message": error["message"], "timestamp": error["timestamp"], "source": error["source"] }) with trace.use_span(vad_span): vad_error_span = create_span("VAD Error", {"message": error["message"]}, parent_span=vad_span, start_time=error["timestamp"]) self.end_span(vad_error_span, end_time=error["timestamp"]+0.100) vad_errors.remove(error) vad_status = StatusCode.ERROR if vad_errors else StatusCode.OK self.end_span(vad_span, status_code=vad_status, end_time=vad_end_time) vad_list = turn.vad_metrics if turn.vad_metrics else None if vad_list: for vad in vad_list: try: create_vad_span(vad) except Exception as e: logger.error(f"Error creating VAD span: {e}") # --- Interruption span --- def create_interruption_span(interruption: InterruptionMetrics): if interruption.false_interrupt_start_time and interruption.false_interrupt_end_time: false_interrupt_attrs = {} if interruption.interrupt_mode: false_interrupt_attrs["interrupt_mode"] = interruption.interrupt_mode if interruption.false_interrupt_pause_duration: false_interrupt_attrs["pause_duration_config"] = interruption.false_interrupt_pause_duration if interruption.false_interrupt_duration: false_interrupt_attrs["false_interrupt_duration"] = interruption.false_interrupt_duration if interruption.resumed_after_false_interrupt: false_interrupt_attrs["resumed_after_false_interrupt"] = True if interruption.false_interrupt_duration: false_interrupt_attrs["actual_duration"] = interruption.false_interrupt_duration false_interrupt_end = interruption.false_interrupt_end_time if false_interrupt_end is None: # False interrupt was followed by true interrupt false_interrupt_end = interruption.interrupt_start_time false_interrupt_span_name = "False Interruption (Resumed)" if interruption.resumed_after_false_interrupt else "False Interruption (Escalated)" false_interrupt_span = create_span(false_interrupt_span_name, false_interrupt_attrs, parent_span=turn_span, start_time=interruption.false_interrupt_start_time) self.end_span(false_interrupt_span, message="False interruption detected", end_time=false_interrupt_end) if turn.is_interrupted: interrupted_attrs = {} if interruption.interrupt_mode: interrupted_attrs["interrupt_mode"] = interruption.interrupt_mode if interruption.interrupt_min_duration: interrupted_attrs["interrupt_min_duration"] = interruption.interrupt_min_duration if interruption.interrupt_min_words: interrupted_attrs["interrupt_min_words"] = interruption.interrupt_min_words if interruption.false_interrupt_pause_duration: interrupted_attrs["false_interrupt_pause_duration"] = interruption.false_interrupt_pause_duration if interruption.resume_on_false_interrupt: interrupted_attrs["resume_on_false_interrupt"] = interruption.resume_on_false_interrupt if interruption.interrupt_reason: interrupted_attrs["interrupt_reason"] = interruption.interrupt_reason if interruption.interrupt_words: interrupted_attrs["interrupt_words"] = interruption.interrupt_words if interruption.interrupt_duration: interrupted_attrs["interrupt_duration"] = interruption.interrupt_duration # Mark if this was preceded by a false interrupt if interruption.false_interrupt_start_time is not None: interrupted_attrs["preceded_by_false_interrupt"] = True interrupted_span = create_span("Turn Interrupted", interrupted_attrs, parent_span=turn_span, start_time=interruption.interrupt_start_time) # Calculate interrupt end time with proper None checks if interruption.interrupt_start_time is not None: if interruption.interrupt_duration is not None: interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_duration elif interruption.interrupt_min_duration is not None: interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_min_duration else: interruption.interrupt_end_time = interruption.interrupt_start_time self.end_span(interrupted_span, message="Agent was interrupted", end_time=interruption.interrupt_end_time) if turn.interruption_metrics: try: create_interruption_span(turn.interruption_metrics) except Exception as e: logger.error(f"Error creating interruption span: {e}") # --- Fallback spans --- def create_fallback_span(fallback: FallbackEvent): is_recovery = fallback.is_recovery if is_recovery: fallback_span_name = f"Recovery: {fallback.component_type}" fallback_attrs = { "temporary_disable_sec": fallback.temporary_disable_sec, "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts, "recovery_attempt": fallback.recovery_attempt, "message": fallback.message, "restored_provider": fallback.new_provider_label, "previous_provider": fallback.original_provider_label, } span_time = fallback.start_time recovery_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time) if recovery_span: self.end_span(recovery_span, message="Recovery completed", end_time=fallback.end_time) return fallback_span_name = f"Fallback: {fallback.component_type}" fallback_attrs = { "temporary_disable_sec": fallback.temporary_disable_sec, "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts, "recovery_attempt": fallback.recovery_attempt, "message": fallback.message, } # Use same start_time for all spans (instant spans) span_time = fallback.start_time fallback_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time) if fallback_span: # Child trace for original connection attempt (if exists) if fallback.original_provider_label: original_conn_attrs = { "provider": fallback.original_provider_label, "status": "failed" } original_conn_span = create_span( f"Connection: {fallback.original_provider_label}", original_conn_attrs, parent_span=fallback_span, start_time=span_time ) self.end_span(original_conn_span, status_code=StatusCode.ERROR, end_time=span_time) # Child trace for new connection attempt (if switched successfully) if fallback.new_provider_label: new_conn_attrs = { "provider": fallback.new_provider_label, "status": "success" } new_conn_span = create_span( f"Connection: {fallback.new_provider_label}", new_conn_attrs, parent_span=fallback_span, start_time=span_time ) self.end_span(new_conn_span, status_code=StatusCode.OK, end_time=span_time) # End the fallback span - status depends on whether we successfully switched fallback_status = StatusCode.OK if fallback.new_provider_label else StatusCode.ERROR self.end_span(fallback_span, status_code=fallback_status, end_time=span_time) if turn.fallback_events: for fallback in turn.fallback_events: try: create_fallback_span(fallback) except Exception as e: logger.error(f"Error creating fallback span: {e}") # --- STT spans --- def create_stt_span(stt: SttMetrics): stt_errors = [e for e in turn.errors if e.get("source") == "STT"] if stt or stt_errors: stt_attrs = {} if stt: stt_class = turn.session_metrics.provider_per_component.get("stt", {}).get("provider_class") if stt_class: stt_attrs["provider_class"] = stt_class stt_model = turn.session_metrics.provider_per_component.get("stt", {}).get("model_name") stt_attrs["input"] = "N/A" if stt_model: stt_attrs["model_name"] = stt_model if stt.stt_latency is not None: stt_attrs["duration_ms"] = stt.stt_latency if stt.stt_start_time: stt_attrs["start_timestamp"] = stt.stt_start_time if stt.stt_end_time: stt_attrs["end_timestamp"] = stt.stt_end_time if stt.stt_transcript: stt_attrs["output"] = stt.stt_transcript if stt_class =="DeepgramSTTV2" and turn.preemtive_generation_enabled: stt_attrs["stt_preemptive_generation_enabled"] = turn.preemtive_generation_enabled stt_span_name = f"{stt_class}: Speech to Text Processing" stt_span = create_span( stt_span_name, stt_attrs, parent_span=turn_span, start_time=stt.stt_start_time if stt else None, ) if stt.stt_preemptive_generation_enabled: with trace.use_span(stt_span): preemptive_attributes = { "preemptive_generation_occurred": stt.stt_preemptive_generation_occurred, "partial_text": stt.stt_preflight_transcript, "final_text": stt.stt_transcript, } if stt.stt_preemptive_generation_occurred: preemptive_attributes["preemptive_generation_latency"] = stt.stt_preflight_latency preemptive_span = create_span("Preemptive Generation", preemptive_attributes, parent_span=stt_span, start_time=stt.stt_start_time) preemptive_end_time = stt.stt_preflight_end_time or stt.stt_end_time self.end_span(preemptive_span, end_time=preemptive_end_time) if stt_span: for error in stt_errors: stt_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) if stt.stt_start_time <= error.get("timestamp") <= stt.stt_end_time: with trace.use_span(stt_span): stt_error_span = create_span("STT Error", {"message": error.get("message", "")}, parent_span=stt_span, start_time=error.get("timestamp")) self.end_span(stt_error_span, end_time=error.get("timestamp")+0.100) stt_errors.remove(error) status = StatusCode.ERROR if stt_errors else StatusCode.OK self.end_span(stt_span, status_code=status, end_time=stt.stt_end_time if stt else None) stt_list = turn.stt_metrics if turn.stt_metrics else None if stt_list: for stt in stt_list: try: create_stt_span(stt) except Exception as e: logger.error(f"Error creating STT span: {e}") # --- EOU spans --- def create_eou_span(eou: EouMetrics): eou_errors = [e for e in turn.errors if e.get("source") == "TURN-D"] eou_attrs = {} if eou: eou_class = turn.session_metrics.provider_per_component.get("eou", {}).get("provider_class") eou_model = turn.session_metrics.provider_per_component.get("eou", {}).get("model_name") if eou_class: eou_attrs["provider_class"] = eou_class if eou_model: eou_attrs["model_name"] = eou_model if turn.user_speech: eou_attrs["input"] = turn.user_speech if eou.eou_latency is not None: eou_attrs["duration_ms"] = eou.eou_latency if eou.eou_start_time: eou_attrs["start_timestamp"] = eou.eou_start_time if eou.eou_end_time: eou_attrs["end_timestamp"] = eou.eou_end_time if eou.waited_for_additional_speech: eou_attrs["waited_for_additional_speech"] = eou.waited_for_additional_speech if eou.eou_probability: eou_attrs["eou_probability"] = round(eou.eou_probability, 4) if turn.session_metrics.eou_config.get("min_speech_wait_timeout"): eou_attrs["min_speech_wait_timeout"] = turn.session_metrics.eou_config.get("min_speech_wait_timeout") if turn.session_metrics.eou_config.get("max_speech_wait_timeout"): eou_attrs["max_speech_wait_timeout"] = turn.session_metrics.eou_config.get("max_speech_wait_timeout") eou_span_name = f"{eou_class}: End-Of-Utterance Detection" eou_span = create_span( eou_span_name, eou_attrs, parent_span=turn_span, start_time=eou.eou_start_time if eou else None, ) if eou.waited_for_additional_speech: delay = round(eou.wait_for_additional_speech_duration, 4) with trace.use_span(eou_span): wait_for_additional_speech_span =create_span ( "Wait for Additional Speech", { "wait_for_additional_speech_duration":delay, "eou_probability": round(eou.eou_probability, 4), }, start_time=eou.eou_end_time, ) self.end_span(wait_for_additional_speech_span, status_code=StatusCode.OK, end_time=eou.eou_end_time + delay) if eou_span: for error in eou_errors: eou_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) with trace.use_span(eou_span): eou_error_span = create_span("EOU Error", {"message": error.get("message", "")}, parent_span=eou_span, start_time=error.get("timestamp")) self.end_span(eou_error_span, end_time=error.get("timestamp")+0.100) eou_errors.remove(error) eou_status = StatusCode.ERROR if eou_errors else StatusCode.OK self.end_span(eou_span, status_code=eou_status, end_time=eou.eou_end_time if eou else None) eou_list = turn.eou_metrics if turn.eou_metrics else None if eou_list: for eou in eou_list: try: create_eou_span(eou) except Exception as e: logger.error(f"Error creating EOU span: {e}") # --- LLM spans --- def create_llm_span(llm: LlmMetrics): llm_errors = [e for e in turn.errors if e.get("source") == "LLM"] llm_attrs = {} if llm: llm_class = turn.session_metrics.provider_per_component.get("llm", {}).get("provider_class") if llm_class: llm_attrs["provider_class"] = llm_class llm_model = turn.session_metrics.provider_per_component.get("llm", {}).get("model_name") if llm_model: llm_attrs["model_name"] = llm_model if llm.llm_input: llm_attrs["input"] = llm.llm_input if llm.llm_duration: llm_attrs["duration_ms"] = llm.llm_duration if llm.llm_start_time: llm_attrs["start_timestamp"] = llm.llm_start_time if llm.llm_end_time: llm_attrs["end_timestamp"] = llm.llm_end_time if turn.agent_speech: llm_attrs["output"] = turn.agent_speech if llm.prompt_tokens: llm_attrs["input_tokens"] = llm.prompt_tokens if llm.completion_tokens: llm_attrs["output_tokens"] = llm.completion_tokens if llm.prompt_cached_tokens: llm_attrs["cached_input_tokens"] = llm.prompt_cached_tokens if llm.total_tokens: llm_attrs["total_tokens"] = llm.total_tokens llm_span_name = f"{llm_class}: LLM Processing" llm_span = create_span( llm_span_name, llm_attrs, parent_span=turn_span, start_time=llm.llm_start_time if llm else None, ) if llm_span: # Tool call sub-spans if turn.function_tool_timestamps: for tool_data in turn.function_tool_timestamps: tool_timestamp = tool_data.get("timestamp") tool_span = create_span( f"Invoked Tool: {tool_data.get('tool_name', 'unknown')}", parent_span=llm_span, start_time=tool_timestamp, ) self.end_span(tool_span, end_time=tool_timestamp) for error in llm_errors: llm_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) with trace.use_span(llm_span): llm_error_span = create_span("LLM Error", {"message": error.get("message", "")}, parent_span=llm_span, start_time=error.get("timestamp")) self.end_span(llm_error_span, end_time=error.get("timestamp")+0.100) llm_errors.remove(error) # TTFT sub-span if llm and llm.llm_ttft is not None and llm.llm_start_time is not None: ttft_span = create_span( "Time to First Token", attributes={"llm_ttft": llm.llm_ttft}, parent_span=llm_span, start_time=llm.llm_start_time, ) ttft_end = llm.llm_start_time + (llm.llm_ttft / 1000) self.end_span(ttft_span, end_time=ttft_end) llm_status = StatusCode.ERROR if llm_errors else StatusCode.OK self.end_span(llm_span, status_code=llm_status, end_time=llm.llm_end_time if llm else None) llm_list = turn.llm_metrics if turn.llm_metrics else None if llm_list: for llm in llm_list: try: create_llm_span(llm) except Exception as e: logger.error(f"Error creating LLM span: {e}") # --- TTS spans --- def create_tts_span(tts: TtsMetrics): tts_errors = [e for e in turn.errors if e.get("source") == "TTS"] tts_attrs = {} if tts: tts_class = turn.session_metrics.provider_per_component.get("tts", {}).get("provider_class") tts_model = turn.session_metrics.provider_per_component.get("tts", {}).get("model_name") if tts_class: tts_attrs["provider_class"] = tts_class if tts_model: tts_attrs["model_name"] = tts_model if turn.agent_speech: tts_attrs["input"] = turn.agent_speech if tts.tts_duration: tts_attrs["duration_ms"] = tts.tts_duration if tts.tts_start_time: tts_attrs["start_timestamp"] = tts.tts_start_time if tts.tts_end_time: tts_attrs["end_timestamp"] = tts.tts_end_time if tts.tts_characters: tts_attrs["characters"] = tts.tts_characters if turn.agent_speech_duration: tts_attrs["audio_duration_ms"] = turn.agent_speech_duration tts_attrs["output"] = "N/A" tts_span_name = f"{tts_class}: Text to Speech Processing" tts_span = create_span( tts_span_name, tts_attrs, parent_span=turn_span, start_time=tts.tts_start_time if tts else None, ) if tts_span: # TTFB sub-span if tts and tts.tts_first_byte_time is not None: ttfb_span = create_span( "Time to First Byte", parent_span=tts_span, start_time=tts.tts_start_time, ) self.end_span(ttfb_span, end_time=tts.tts_first_byte_time) for error in tts_errors: tts_span.add_event("error", attributes={ "message": error.get("message", ""), "timestamp": error.get("timestamp", ""), }) with trace.use_span(tts_span): tts_error_span = create_span("TTS Error", {"message": error.get("message", "")}, parent_span=tts_span, start_time=error.get("timestamp")) self.end_span(tts_error_span, end_time=error.get("timestamp")+0.100) tts_errors.remove(error) tts_status = StatusCode.ERROR if tts_errors else StatusCode.OK self.end_span(tts_span, status_code=tts_status, end_time=tts.tts_end_time if tts else None) tts_list = turn.tts_metrics if turn.tts_metrics else None if tts_list: for tts in tts_list: try: create_tts_span(tts) except Exception as e: logger.error(f"Error creating TTS span: {e}") # --- KB spans --- def create_kb_span(kb: KbMetrics): kb_span_name = "Knowledge Base: Retrieval" kb_attrs = {} if turn.user_speech: kb_attrs["input"] = turn.user_speech if kb.kb_retrieval_latency: kb_attrs["retrieval_latency_ms"] = kb.kb_retrieval_latency if kb.kb_start_time: kb_attrs["start_timestamp"] = kb.kb_start_time if kb.kb_end_time: kb_attrs["end_timestamp"] = kb.kb_end_time if kb.kb_documents: # Join documents as comma-separated string for readability kb_attrs["documents"] = ", ".join(kb.kb_documents) if len(kb.kb_documents) <= 5 else f"{len(kb.kb_documents)} documents" kb_attrs["document_count"] = len(kb.kb_documents) if kb.kb_scores: # Include scores as comma-separated string kb_attrs["scores"] = ", ".join([str(round(s, 4)) for s in kb.kb_scores[:5]]) kb_span = create_span(kb_span_name, kb_attrs, parent_span=turn_span, start_time=kb.kb_start_time) if kb_span: self.end_span(kb_span, status_code=StatusCode.OK, end_time=kb.kb_end_time) if turn.kb_metrics: for kb in turn.kb_metrics: try: create_kb_span(kb) except Exception as e: logger.error(f"Error creating KB span: {e}") # --- Realtime spans (for S2S modes) --- def create_rt_span(rt: RealtimeMetrics): rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"] rt_attrs = {} if rt: rt_class = turn.session_metrics.provider_per_component.get("realtime", {}).get("provider_class") if rt_class: rt_attrs["provider_class"] = rt_class rt_model = turn.session_metrics.provider_per_component.get("realtime", {}).get("model_name") if rt_model: rt_attrs["model_name"] = rt_model rt_start_time = turn.user_speech_end_time if turn.user_speech_end_time else turn.agent_speech_start_time rt_end_time = turn.agent_speech_start_time # if turn.timeline_event_metrics: # for event in turn.timeline_event_metrics: # if event.event_type == "user_speech": # rt_start_time = event.end_time # break # for event in turn.timeline_event_metrics: # if event.event_type == "agent_speech": # rt_end_time = event.start_time # break rt_span_name = f"{rt_class}: Realtime Processing" rt_span = create_span( rt_span_name, rt_attrs, parent_span=turn_span, start_time=rt_start_time, ) if rt_span: # Realtime tool calls if turn.function_tools_called: for tool_name in turn.function_tools_called: tool_span = create_span( f"Invoked Tool: {tool_name}", parent_span=turn_span, start_time=time.perf_counter(), ) self.end_span(tool_span, end_time=time.perf_counter()) # TTFB span for realtime if turn.e2e_latency is not None: ttfb_span = create_span( "Time to First Word", {"duration_ms": turn.e2e_latency}, parent_span=rt_span, start_time=rt_start_time, ) self.end_span(ttfb_span, end_time=rt_end_time) # --- Realtime model errors --- rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"] if rt_errors: for error in rt_errors: turn_span.add_event("Errors", attributes={ "message": error.get("message", "Unknown error"), "timestamp": error.get("timestamp", "N/A"), }) with trace.use_span(turn_span): rt_error_span = create_span("Realtime Error", {"message": error.get("message", "")}, parent_span=turn_span, start_time=error.get("timestamp")) self.end_span(rt_error_span, end_time=error.get("timestamp")+0.100) rt_errors.remove(error) self.end_span(rt_span, status_code=StatusCode.ERROR if rt_errors else StatusCode.OK, end_time=rt_end_time) rt_list = turn.realtime_metrics if turn.realtime_metrics else None if rt_list: for rt in rt_list: try: create_rt_span(rt) except Exception as e: logger.error(f"Error creating RT span: {e}") def create_error_spans(errors:Dict[str, Any]): error_span_name = f"{errors.get('source', 'Unknown')} Error span" attr={} if errors.get('message'): attr['error message'] = errors.get('message') span_start_time = errors.get('timestamp_perf') error_span = create_span(error_span_name, attributes=attr, parent_span=turn_span, start_time=span_start_time) self.end_span(error_span, status_code=StatusCode.ERROR, end_time=span_start_time + 0.001) for e in turn.errors: try: create_error_spans(e) except Exception as e: logger.error(f"Error creating error span: {e}") # Determine turn end time first for unbounded children spans end_times = [] if turn.tts_metrics and turn.tts_metrics[-1].tts_end_time: end_times.append(turn.tts_metrics[-1].tts_end_time) if turn.llm_metrics and turn.llm_metrics[-1].llm_end_time: end_times.append(turn.llm_metrics[-1].llm_end_time) if turn.agent_speech_end_time: end_times.append(turn.agent_speech_end_time) if turn.eou_metrics and turn.eou_metrics[-1].eou_end_time: end_times.append(turn.eou_metrics[-1].eou_end_time) if turn.stt_metrics and turn.stt_metrics[-1].stt_end_time: end_times.append(turn.stt_metrics[-1].stt_end_time) if turn.user_speech_end_time: end_times.append(turn.user_speech_end_time) if turn.interruption_metrics and turn.interruption_metrics.false_interrupt_end_time: end_times.append(turn.interruption_metrics.false_interrupt_end_time) turn_end_time = max(end_times) if end_times else None if turn.is_interrupted or turn_end_time is None: turn_end_time = time.perf_counter() # --- Timeline events --- if turn.timeline_event_metrics: for event in turn.timeline_event_metrics: if event.event_type == "user_speech": user_speech_span = create_span( "User Input Speech", {"Transcript": event.text, "duration_ms": event.duration_ms}, parent_span=turn_span, start_time=event.start_time, ) self.end_span(user_speech_span, end_time=event.end_time if event.end_time else turn_end_time) elif event.event_type == "agent_speech": agent_speech_span = create_span( "Agent Output Speech", {"Transcript": event.text, "duration_ms": event.duration_ms}, parent_span=turn_span, start_time=event.start_time, ) self.end_span(agent_speech_span, end_time=event.end_time if event.end_time else turn_end_time) elif event.event_type == "thinking_audio": thinking_attrs = {} if turn.thinking_audio_file_path: thinking_attrs["file_path"] = turn.thinking_audio_file_path if turn.thinking_audio_looping is not None: thinking_attrs["looping"] = turn.thinking_audio_looping if turn.thinking_audio_override_thinking is not None: thinking_attrs["override_thinking"] = turn.thinking_audio_override_thinking if event.duration_ms is not None: thinking_attrs["duration_ms"] = event.duration_ms thinking_span = create_span( "Thinking Audio", thinking_attrs, parent_span=turn_span, start_time=event.start_time, ) self.end_span(thinking_span, end_time=event.end_time if event.end_time else turn_end_time) elif event.event_type == "background_audio": bg_attrs = {} if turn.background_audio_file_path: bg_attrs["file_path"] = turn.background_audio_file_path if turn.background_audio_looping is not None: bg_attrs["looping"] = turn.background_audio_looping if event.duration_ms is not None: bg_attrs["duration_ms"] = event.duration_ms bg_span = create_span( "Background Audio", bg_attrs, parent_span=turn_span, start_time=event.start_time, ) self.end_span(bg_span, end_time=event.end_time if event.end_time else turn_end_time) self.end_span(turn_span, message="End of turn trace.", end_time=turn_end_time)Creates a full trace for a single turn from the unified TurnMetrics schema. Handles both cascading and realtime component spans based on what data is present.
def end_a2a_communication(self)-
Expand source code
def end_a2a_communication(self): """Ends the A2A communication parent span.""" complete_span(self.a2a_span, StatusCode.OK, end_time=time.perf_counter()) self.a2a_span = None self._a2a_turn_count = 0Ends the A2A communication parent span.
def end_a2a_trace(self, span: opentelemetry.trace.span.Span | None, message: str = '')-
Expand source code
def end_a2a_trace(self, span: Optional[Span], message: str = ""): """Ends an A2A trace span.""" complete_span(span, StatusCode.OK, end_time=time.perf_counter())Ends an A2A trace span.
def end_agent_session(self)-
Expand source code
def end_agent_session(self): """Completes the agent session span.""" if self.main_turn_span: self.end_main_turn() self.end_span(self.agent_session_span, "Agent session ended", end_time=time.perf_counter()) self.agent_session_span = NoneCompletes the agent session span.
def end_agent_session_closed(self)-
Expand source code
def end_agent_session_closed(self): """Completes the agent session closed span.""" end_time = time.perf_counter() self.end_span(self.agent_session_closed_span, "Agent session closed", end_time=end_time) self.agent_session_closed_span = NoneCompletes the agent session closed span.
def end_agent_session_config(self)-
Expand source code
def end_agent_session_config(self): """Completes the agent session config span.""" end_time = time.perf_counter() self.end_span(self.agent_session_config_span, "Agent session config ended", end_time=end_time) self.agent_session_config_span = NoneCompletes the agent session config span.
def end_main_turn(self)-
Expand source code
def end_main_turn(self): """Completes the main turn span.""" self.end_span(self.main_turn_span, "All turns processed", end_time=time.perf_counter()) self.main_turn_span = NoneCompletes the main turn span.
def end_span(self,
span: opentelemetry.trace.span.Span | None,
message: str = '',
status_code: opentelemetry.trace.status.StatusCode = StatusCode.OK,
end_time: float | None = None)-
Expand source code
def end_span(self, span: Optional[Span], message: str = "", status_code: StatusCode = StatusCode.OK, end_time: Optional[float] = None): """Completes a given span with a status.""" if span: if end_time is None: end_time = time.perf_counter() desc = message if status_code == StatusCode.ERROR else "" complete_span(span, status_code, desc, end_time)Completes a given span with a status.
def set_session_id(self, session_id: str)-
Expand source code
def set_session_id(self, session_id: str): """Set the session ID for the trace manager.""" self.session_id = session_idSet the session ID for the trace manager.
def set_session_metrics(self,
session_metrics: SessionMetrics)-
Expand source code
def set_session_metrics(self, session_metrics: SessionMetrics): """Set the session metrics for the trace manager.""" self.session_metrics = session_metricsSet the session metrics for the trace manager.
def start_agent_joined_meeting(self, attributes: Dict[str, Any])-
Expand source code
def start_agent_joined_meeting(self, attributes: Dict[str, Any]): """Starts the root span for the agent joining a meeting.""" if self.root_span: return agent_name = attributes.get('agent_name', 'UnknownAgent') agent_id = attributes.get('peerId', 'UnknownID') span_name = f"Agent Session: agentName_{agent_name}_agentId_{agent_id}" start_time = attributes.get('start_time', time.perf_counter()) self.root_span = create_span(span_name, attributes, start_time=start_time) # Always set root_span_ready so downstream awaits don't hang # when telemetry is not initialized (create_span returns None) self.root_span_ready.set()Starts the root span for the agent joining a meeting.
async def start_agent_session(self, attributes: Dict[str, Any])-
Expand source code
async def start_agent_session(self, attributes: Dict[str, Any]): """Starts the span for the agent's session, child of the root span.""" await self.root_span_ready.wait() if not self.root_span: return if self.agent_session_span: return start_time = attributes.get('start_time', time.perf_counter()) p_m =[] a_p_m = [] for p in self.participant_metrics: if p.kind == "user": p_m.append(asdict(p)) else: a_p_m.append(asdict(p)) attributes["participant_metrics"] = p_m attributes["agent_participant_metrics"] = a_p_m self.agent_session_span = create_span("Session Started", attributes, parent_span=self.root_span, start_time=start_time) self.start_main_turn()Starts the span for the agent's session, child of the root span.
async def start_agent_session_closed(self, attributes: Dict[str, Any])-
Expand source code
async def start_agent_session_closed(self, attributes: Dict[str, Any]): """Starts the span for agent session closed.""" await self.root_span_ready.wait() if not self.root_span: return if self.agent_session_closed_span: return start_time = attributes.get('start_time', time.perf_counter()) self.agent_session_closed_span = create_span("Agent Session Closed", attributes, parent_span=self.root_span, start_time=start_time)Starts the span for agent session closed.
async def start_agent_session_config(self, attributes: Dict[str, Any])-
Expand source code
async def start_agent_session_config(self, attributes: Dict[str, Any]): """Starts the span for the agent's session configuration, child of the root span.""" await self.root_span_ready.wait() if not self.root_span: return if self.agent_session_config_span: return start_time = attributes.get('start_time', time.perf_counter()) self.agent_session_config_span = create_span("Session Configuration", attributes, parent_span=self.root_span, start_time=start_time) if self.agent_session_config_span: with trace.use_span(self.agent_session_config_span): self.end_agent_session_config()Starts the span for the agent's session configuration, child of the root span.
def start_main_turn(self)-
Expand source code
def start_main_turn(self): """Starts a parent span for all user-agent turn.""" if not self.agent_session_span: return if self.main_turn_span: return start_time = time.perf_counter() self.main_turn_span = create_span("User & Agent Turns", parent_span=self.agent_session_span, start_time=start_time)Starts a parent span for all user-agent turn.
class TurnMetrics (turn_id: str = '',
is_interrupted: bool = False,
user_speech_start_time: float | None = None,
user_speech_end_time: float | None = None,
user_speech_duration: float | None = None,
user_speech: str | None = None,
agent_speech_start_time: float | None = None,
agent_speech_end_time: float | None = None,
agent_speech_duration: float | None = None,
agent_speech: str | None = None,
e2e_latency: float | None = None,
function_tools_called: List[str] = <factory>,
function_tool_timestamps: List[Dict[str, Any]] = <factory>,
is_a2a_enabled: bool = False,
handoff_occurred: bool = False,
errors: List[Dict[str, Any]] = <factory>,
timestamp: float = <factory>,
preemtive_generation_enabled: bool = False,
vad_metrics: List[VadMetrics] = <factory>,
stt_metrics: List[SttMetrics] = <factory>,
eou_metrics: List[EouMetrics] = <factory>,
kb_metrics: List[KbMetrics] = <factory>,
llm_metrics: List[LlmMetrics] = <factory>,
tts_metrics: List[TtsMetrics] = <factory>,
interruption_metrics: InterruptionMetrics | None = None,
realtime_metrics: List[RealtimeMetrics] = <factory>,
timeline_event_metrics: List[TimelineEvent] = <factory>,
fallback_events: List[FallbackEvent] = <factory>,
function_tool_metrics: List[FunctionToolMetrics] = <factory>,
mcp_tool_metrics: List[McpToolMetrics] = <factory>,
session_metrics: SessionMetrics | None = <factory>,
background_audio_file_path: str | None = None,
background_audio_looping: bool | None = None,
thinking_audio_file_path: str | None = None,
thinking_audio_looping: bool | None = None,
thinking_audio_override_thinking: bool | None = None)-
Expand source code
@dataclass class TurnMetrics: """Single turn with lists of per-component metrics.""" turn_id: str = "" is_interrupted: bool = False user_speech_start_time: Optional[float] = None user_speech_end_time: Optional[float] = None user_speech_duration: Optional[float] = None user_speech: Optional[str] = None agent_speech_start_time: Optional[float] = None agent_speech_end_time: Optional[float] = None agent_speech_duration: Optional[float] = None agent_speech: Optional[str] = None e2e_latency: Optional[float] = None function_tools_called: List[str] = field(default_factory=list) function_tool_timestamps: List[Dict[str, Any]] = field(default_factory=list) is_a2a_enabled: bool = False handoff_occurred: bool = False errors: List[Dict[str, Any]] = field(default_factory=list) timestamp: float = field(default_factory=time.time) preemtive_generation_enabled: bool = False vad_metrics: List[VadMetrics] = field(default_factory=list) stt_metrics: List[SttMetrics] = field(default_factory=list) eou_metrics: List[EouMetrics] = field(default_factory=list) kb_metrics: List[KbMetrics] = field(default_factory=list) llm_metrics: List[LlmMetrics] = field(default_factory=list) tts_metrics: List[TtsMetrics] = field(default_factory=list) interruption_metrics: Optional[InterruptionMetrics] = None realtime_metrics: List[RealtimeMetrics] = field(default_factory=list) timeline_event_metrics: List[TimelineEvent] = field(default_factory=list) fallback_events: List[FallbackEvent] = field(default_factory=list) function_tool_metrics: List[FunctionToolMetrics] = field(default_factory=list) mcp_tool_metrics: List[McpToolMetrics] = field(default_factory=list) session_metrics: Optional[SessionMetrics] = field(default_factory=SessionMetrics) # Background audio background_audio_file_path: Optional[str] = None background_audio_looping: Optional[bool] = None # Thinking audio thinking_audio_file_path: Optional[str] = None thinking_audio_looping: Optional[bool] = None thinking_audio_override_thinking: Optional[bool] = None def compute_e2e_latency(self) -> None: """Calculate E2E latency by summing component latencies (STT + EOU + LLM TTFT + TTS).""" e2e_components = [] if self.stt_metrics: stt = self.stt_metrics[-1] if stt.stt_latency is not None: e2e_components.append(stt.stt_latency) if self.eou_metrics: eou = self.eou_metrics[-1] if eou.eou_latency is not None: e2e_components.append(eou.eou_latency) if self.llm_metrics: llm = self.llm_metrics[-1] if llm.llm_ttft is not None: e2e_components.append(llm.llm_ttft) if self.tts_metrics: tts = self.tts_metrics[-1] if tts.tts_latency is not None: e2e_components.append(tts.tts_latency) if self.realtime_metrics: rt = self.realtime_metrics[-1] if rt.realtime_ttfb is not None: e2e_components.append(rt.realtime_ttfb) if e2e_components: self.e2e_latency = round(sum(e2e_components), 4) def to_dict(self) -> Dict[str, Any]: """Serialize turn metrics to a dictionary.""" self.compute_e2e_latency() return asdict(self)Single turn with lists of per-component metrics.
Instance variables
var agent_speech : str | Nonevar agent_speech_duration : float | Nonevar agent_speech_end_time : float | Nonevar agent_speech_start_time : float | Nonevar background_audio_file_path : str | Nonevar background_audio_looping : bool | Nonevar e2e_latency : float | Nonevar eou_metrics : List[EouMetrics]var errors : List[Dict[str, Any]]var fallback_events : List[FallbackEvent]var function_tool_metrics : List[FunctionToolMetrics]var function_tool_timestamps : List[Dict[str, Any]]var function_tools_called : List[str]var handoff_occurred : boolvar interruption_metrics : InterruptionMetrics | Nonevar is_a2a_enabled : boolvar is_interrupted : boolvar kb_metrics : List[KbMetrics]var llm_metrics : List[LlmMetrics]var mcp_tool_metrics : List[McpToolMetrics]var preemtive_generation_enabled : boolvar realtime_metrics : List[RealtimeMetrics]var session_metrics : SessionMetrics | Nonevar stt_metrics : List[SttMetrics]var thinking_audio_file_path : str | Nonevar thinking_audio_looping : bool | Nonevar thinking_audio_override_thinking : bool | Nonevar timeline_event_metrics : List[TimelineEvent]var timestamp : floatvar tts_metrics : List[TtsMetrics]var turn_id : strvar user_speech : str | Nonevar user_speech_duration : float | Nonevar user_speech_end_time : float | Nonevar user_speech_start_time : float | Nonevar vad_metrics : List[VadMetrics]
Methods
def compute_e2e_latency(self) ‑> None-
Expand source code
def compute_e2e_latency(self) -> None: """Calculate E2E latency by summing component latencies (STT + EOU + LLM TTFT + TTS).""" e2e_components = [] if self.stt_metrics: stt = self.stt_metrics[-1] if stt.stt_latency is not None: e2e_components.append(stt.stt_latency) if self.eou_metrics: eou = self.eou_metrics[-1] if eou.eou_latency is not None: e2e_components.append(eou.eou_latency) if self.llm_metrics: llm = self.llm_metrics[-1] if llm.llm_ttft is not None: e2e_components.append(llm.llm_ttft) if self.tts_metrics: tts = self.tts_metrics[-1] if tts.tts_latency is not None: e2e_components.append(tts.tts_latency) if self.realtime_metrics: rt = self.realtime_metrics[-1] if rt.realtime_ttfb is not None: e2e_components.append(rt.realtime_ttfb) if e2e_components: self.e2e_latency = round(sum(e2e_components), 4)Calculate E2E latency by summing component latencies (STT + EOU + LLM TTFT + TTS).
def to_dict(self) ‑> Dict[str, Any]-
Expand source code
def to_dict(self) -> Dict[str, Any]: """Serialize turn metrics to a dictionary.""" self.compute_e2e_latency() return asdict(self)Serialize turn metrics to a dictionary.