Module agents.metrics

Sub-modules

agents.metrics.analytics
agents.metrics.integration
agents.metrics.logger_handler
agents.metrics.metrics_collector
agents.metrics.metrics_schema
agents.metrics.telemetry
agents.metrics.traces_flow

Functions

def auto_initialize_telemetry_and_logs(room_id: str,
peer_id: str,
room_attributes: Dict[str, Any] = None,
session_id: str = None,
sdk_metadata: Dict[str, Any] = None,
custom_traces_config: Dict[str, Any] = None)
Expand source code
def auto_initialize_telemetry_and_logs(room_id: str, peer_id: str, 
                                      room_attributes: Dict[str, Any] = None, session_id: str = None, sdk_metadata: Dict[str, Any] = None,
                                      custom_traces_config: Dict[str, Any] = None):
    """
    Auto-initialize telemetry and logs from room attributes
    """
    if not room_attributes:
        return
    
    observability_jwt = room_attributes.get('observability', '')
    
    traces_config = custom_traces_config or room_attributes.get('traces', {})
    if traces_config.get('enabled'):
        metadata = {
        }
        
        initialize_telemetry(
            room_id=room_id,
            peer_id=peer_id,
            sdk_name="agents",
            observability_jwt=observability_jwt,
            traces_config=traces_config,
            metadata=metadata,
            sdk_metadata=sdk_metadata
        )

Auto-initialize telemetry and logs from room attributes

def complete_span(span: opentelemetry.trace.span.Span | None,
status_code,
message: str = '',
end_time: float | None = None)
Expand source code
def complete_span(span: Optional[Span], status_code, message: str = "", end_time: Optional[float] = None):
    """
    Complete a trace span (convenience method)
    
    Args:
        span: Span to complete
        status_code: Status code
        message: Status message
        end_time: End time in seconds since epoch (optional)
    """
    telemetry = get_telemetry()
    if telemetry and span:
        telemetry.complete_span(span, status_code, message, end_time)

Complete a trace span (convenience method)

Args

span
Span to complete
status_code
Status code
message
Status message
end_time
End time in seconds since epoch (optional)
def create_span(span_name: str,
attributes: Dict[str, Any] = None,
parent_span: opentelemetry.trace.span.Span | None = None,
start_time: float | None = None)
Expand source code
def create_span(span_name: str, attributes: Dict[str, Any] = None, parent_span: Optional[Span] = None, start_time: Optional[float] = None):
    """
    Create a trace span (convenience method)
    
    Args:
        span_name: Name of the span
        attributes: Span attributes
        parent_span: Parent span (optional)
        start_time: Start time in seconds since epoch (optional)
        
    Returns:
        Span object or None
    """
    telemetry = get_telemetry()
    if telemetry:
        return telemetry.trace(span_name, attributes, parent_span, start_time)
    return None

Create a trace span (convenience method)

Args

span_name
Name of the span
attributes
Span attributes
parent_span
Parent span (optional)
start_time
Start time in seconds since epoch (optional)

Returns

Span object or None

Classes

class JobLogger (queue: queue.Queue,
room_id: str,
peer_id: str,
auth_token: str,
session_id: str | None = None,
dashboard_log_level: str = 'INFO',
sdk_metadata: Dict[str, Any] | None = None,
send_logs_to_dashboard: bool = False)
Expand source code
class JobLogger:
    """
    Intercepts ALL existing ``logger.info()`` / ``.debug()`` / ``.warning()``
    etc. calls under the ``videosdk.agents`` namespace by attaching a
    ``QueueHandler`` to the parent logger.

    * Does ZERO I/O – records are written to a ``queue.Queue``.
    * Attaches job context (roomId, sessionId, peerId, …) via a Filter.
    * Thread-safe: queue.Queue is thread-safe; multiple jobs each own their
      own queue, so no cross-agent interference.
    """

    def __init__(
        self,
        queue: "queue.Queue",
        room_id: str,
        peer_id: str,
        auth_token: str,
        session_id: Optional[str] = None,
        dashboard_log_level: str = "INFO",
        sdk_metadata: Optional[Dict[str, Any]] = None,
        send_logs_to_dashboard: bool = False,
    ):
        self._queue = queue 
        self._context: Dict[str, Any] = {
            "roomId": room_id,
            "peerId": peer_id,
            "authToken": auth_token,
            "sessionId": session_id or "",
            "sdk_name": "",
            "sdk_version": "",
            "service_name": "videosdk-otel-telemetry-agents",
            "send_logs_to_dashboard": send_logs_to_dashboard,
        }

        if sdk_metadata:
            self._context["sdk_name"] = sdk_metadata.get("sdk", "AGENTS").upper()
            self._context["sdk_version"] = sdk_metadata.get("sdk_version", "0.0.62")

        self._parent_logger = logging.getLogger("videosdk.agents")

        target_level = getattr(logging, dashboard_log_level.upper(), logging.INFO)
        if self._parent_logger.getEffectiveLevel() > target_level:
            self._parent_logger.setLevel(target_level)

        self._queue_handler = QueueHandler(queue)
        self._queue_handler.setLevel(target_level)

        self._context_filter = _JobContextFilter(self._context)
        self._queue_handler.addFilter(self._context_filter)

        self._parent_logger.addHandler(self._queue_handler)

    def update_context(self, **kwargs) -> None:
        """Update context fields (e.g. sessionId once available)."""
        self._context.update(kwargs)

    def set_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None:
        """
        Send the log endpoint and optional custom headers to the consumer thread
        via the queue so the LogConsumerHandler can start posting logs.

        Called from room.py when room attributes become available.
        """
        record = logging.LogRecord(
            name="videosdk.agents._config",
            level=logging.INFO,
            pathname="", lineno=0,
            msg="", args=(), exc_info=None,
        )
        record._config_endpoint = endpoint
        record._config_jwt_key = jwt_key
        record._config_custom_headers = custom_headers
        self._queue.put_nowait(record)

    def cleanup(self) -> None:
        """Remove the QueueHandler from the parent logger (called on job end)."""
        self._parent_logger.removeHandler(self._queue_handler)

Intercepts ALL existing logger.info() / .debug() / .warning() etc. calls under the videosdk.agents namespace by attaching a QueueHandler to the parent logger.

  • Does ZERO I/O – records are written to a queue.Queue.
  • Attaches job context (roomId, sessionId, peerId, …) via a Filter.
  • Thread-safe: queue.Queue is thread-safe; multiple jobs each own their own queue, so no cross-agent interference.

Methods

def cleanup(self) ‑> None
Expand source code
def cleanup(self) -> None:
    """Remove the QueueHandler from the parent logger (called on job end)."""
    self._parent_logger.removeHandler(self._queue_handler)

Remove the QueueHandler from the parent logger (called on job end).

def set_endpoint(self,
endpoint: str,
jwt_key: str = '',
custom_headers: Dict[str, str] | None = None) ‑> None
Expand source code
def set_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None:
    """
    Send the log endpoint and optional custom headers to the consumer thread
    via the queue so the LogConsumerHandler can start posting logs.

    Called from room.py when room attributes become available.
    """
    record = logging.LogRecord(
        name="videosdk.agents._config",
        level=logging.INFO,
        pathname="", lineno=0,
        msg="", args=(), exc_info=None,
    )
    record._config_endpoint = endpoint
    record._config_jwt_key = jwt_key
    record._config_custom_headers = custom_headers
    self._queue.put_nowait(record)

Send the log endpoint and optional custom headers to the consumer thread via the queue so the LogConsumerHandler can start posting logs.

Called from room.py when room attributes become available.

def update_context(self, **kwargs) ‑> None
Expand source code
def update_context(self, **kwargs) -> None:
    """Update context fields (e.g. sessionId once available)."""
    self._context.update(kwargs)

Update context fields (e.g. sessionId once available).

class LogManager (batch_size: int = 50, flush_interval_seconds: float = 5.0)
Expand source code
class LogManager:
    """
    One instance per **Job** (not per Worker).

    Owns:
    * A ``queue.Queue`` (thread-safe, no extra manager process needed).
    * A ``QueueListener`` that consumes from the queue and dispatches to
      ``LogConsumerHandler``.

    Lifecycle::

        mgr = LogManager()
        mgr.start(auth_token="...")       # starts background listener thread
        # … job runs, logs are captured …
        mgr.stop()                        # flushes & tears down

    The ``queue`` attribute is passed to ``JobLogger`` (producer side).
    """

    def __init__(
        self,
        batch_size: int = 50,
        flush_interval_seconds: float = 5.0,
    ):
        self.queue: queue.Queue = queue.Queue()
        self._listener: Optional[QueueListener] = None
        self._consumer_handler: Optional[LogConsumerHandler] = None
        self._batch_size = batch_size
        self._flush_interval = flush_interval_seconds

    def start(self, auth_token: str = "") -> None:
        """Start consuming from the queue."""
        self._consumer_handler = LogConsumerHandler(
            auth_token=auth_token,
            batch_size=self._batch_size,
            flush_interval_seconds=self._flush_interval,
        )
        self._listener = QueueListener(
            self.queue, self._consumer_handler, respect_handler_level=True
        )
        self._listener.start()

    def update_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None:
        """Set/update the endpoint and flush buffered records."""
        if self._consumer_handler:
            self._consumer_handler.update_endpoint(endpoint, jwt_key, custom_headers)

    def stop(self) -> None:
        """Stop the listener and flush remaining logs."""
        if self._listener:
            self._listener.stop()
            self._listener = None
        if self._consumer_handler:
            self._consumer_handler.close()
            self._consumer_handler = None

    def get_queue(self) -> queue.Queue:
        """Return the shared queue for the JobLogger (producer)."""
        return self.queue

One instance per Job (not per Worker).

Owns: * A queue.Queue (thread-safe, no extra manager process needed). * A QueueListener that consumes from the queue and dispatches to LogConsumerHandler.

Lifecycle::

mgr = LogManager()
mgr.start(auth_token="...")       # starts background listener thread
# … job runs, logs are captured …
mgr.stop()                        # flushes & tears down

The queue attribute is passed to JobLogger (producer side).

Methods

def get_queue(self) ‑> queue.Queue
Expand source code
def get_queue(self) -> queue.Queue:
    """Return the shared queue for the JobLogger (producer)."""
    return self.queue

Return the shared queue for the JobLogger (producer).

def start(self, auth_token: str = '') ‑> None
Expand source code
def start(self, auth_token: str = "") -> None:
    """Start consuming from the queue."""
    self._consumer_handler = LogConsumerHandler(
        auth_token=auth_token,
        batch_size=self._batch_size,
        flush_interval_seconds=self._flush_interval,
    )
    self._listener = QueueListener(
        self.queue, self._consumer_handler, respect_handler_level=True
    )
    self._listener.start()

Start consuming from the queue.

def stop(self) ‑> None
Expand source code
def stop(self) -> None:
    """Stop the listener and flush remaining logs."""
    if self._listener:
        self._listener.stop()
        self._listener = None
    if self._consumer_handler:
        self._consumer_handler.close()
        self._consumer_handler = None

Stop the listener and flush remaining logs.

def update_endpoint(self,
endpoint: str,
jwt_key: str = '',
custom_headers: Dict[str, str] | None = None) ‑> None
Expand source code
def update_endpoint(self, endpoint: str, jwt_key: str = "", custom_headers: Optional[Dict[str, str]] = None) -> None:
    """Set/update the endpoint and flush buffered records."""
    if self._consumer_handler:
        self._consumer_handler.update_endpoint(endpoint, jwt_key, custom_headers)

Set/update the endpoint and flush buffered records.

class MetricsCollector
Expand source code
class MetricsCollector:
    """Single metrics collector for all pipeline modes.

    Replaces both CascadingMetricsCollector and RealtimeMetricsCollector
    with one collector that uses the component-wise metrics schema.
    """

    def __init__(self) -> None:
        self.session = SessionMetrics()
        self.current_turn: Optional[TurnMetrics] = None
        self.turns: List[TurnMetrics] = []
        self.analytics_client = AnalyticsClient()
        self.traces_flow_manager: Optional[TracesFlowManager] = None
        self.playground_manager: Optional[PlaygroundManager] = None
        self.playground: bool = False

        # Pipeline configuration
        self.pipeline_mode: Optional[str] = None
        self.realtime_mode: Optional[str] = None
        self.active_components: Optional[frozenset] = None

        # Transient timing state (not part of schema)
        self._stt_start_time: Optional[float] = None
        self._llm_start_time: Optional[float] = None
        self._tts_start_time: Optional[float] = None
        self._eou_start_time: Optional[float] = None
        self._tts_first_byte_time: Optional[float] = None

        # Speech state tracking
        self._is_agent_speaking: bool = False
        self._is_user_speaking: bool = False
        self._user_input_start_time: Optional[float] = None
        self._user_speech_end_time: Optional[float] = None
        self._agent_speech_start_time: Optional[float] = None
        self._pending_user_start_time: Optional[float] = None

        # Turn counting
        self._total_turns: int = 0
        self._total_interruptions: int = 0

        # Realtime agent speech end timer
        self._agent_speech_end_timer: Optional[asyncio.TimerHandle] = None
        self.preemtive_generation_enabled: bool = False
        self.session_set_in_trace: bool = False
        self.transport_mode: Optional[str] = None

        # Buffered data from an interrupted turn's interrupting speech
        self._pending_interrupt_timeline: Optional[dict] = None
        self._pending_interrupt_stt: Optional['SttMetrics'] = None
        self._pending_interrupt_eou: Optional['EouMetrics'] = None
        self._pending_interrupt_vad: Optional['VadMetrics'] = None

        # Audio playback timing (session-level)
        self._background_audio_start_time: Optional[float] = None
        self._thinking_audio_start_time: Optional[float] = None
    # ──────────────────────────────────────────────
    # Session lifecycle
    # ──────────────────────────────────────────────

    def configure_pipeline(
        self,
        pipeline_mode: Any,
        realtime_mode: Any = None,
        active_components: Any = None,
    ) -> None:
        """Configure the collector with pipeline information.

        Args:
            pipeline_mode: PipelineMode enum value
            realtime_mode: RealtimeMode enum value (optional)
            active_components: frozenset of PipelineComponent (optional)
        """
        self.pipeline_mode = pipeline_mode.value if hasattr(pipeline_mode, "value") else str(pipeline_mode)
        self.realtime_mode = realtime_mode.value if realtime_mode and hasattr(realtime_mode, "value") else (str(realtime_mode) if realtime_mode else None)
        self.active_components = active_components

        self.session.pipeline_mode = self.pipeline_mode
        self.session.realtime_mode = self.realtime_mode

        if active_components:
            self.session.components = sorted(
                c.value if hasattr(c, "value") else str(c) for c in active_components
            )

        logger.info(f"[metrics] Pipeline configured: mode={self.pipeline_mode}, realtime={self.realtime_mode}")

    def set_session_id(self, session_id: str) -> None:
        """Set the session ID for metrics tracking."""
        self.session.session_id = session_id
        self.analytics_client.set_session_id(session_id)

    def set_system_instructions(self, instructions: str) -> None:
        """Set the system instructions for this session."""
        self.session.system_instruction = instructions

    def set_provider_info(self, component_type: str, provider_class: str, model_name: str) -> None:
        """Set provider info for a specific component.

        Args:
            component_type: e.g. "stt", "llm", "tts", "vad", "eou", "realtime"
            provider_class: class name of the provider
            model_name: model identifier
        """
        self.session.provider_per_component[component_type] = {
            "provider_class": provider_class,
            "model_name": model_name,
        }

    def update_provider_class(self, component_type: str, provider_class: str, model_name: str = "") -> None:
        """Update the provider class and model name for a specific component when fallback occurs.
        
        Args:
            component_type: "STT", "LLM", "TTS", etc.
            provider_class: The new provider class name (e.g., "GoogleLLM")
            model_name: The new model name
        """
        target_type = component_type.lower()
        if target_type in self.session.provider_per_component:
            self.session.provider_per_component[target_type]["provider_class"] = provider_class
            if model_name:
                self.session.provider_per_component[target_type]["model_name"] = model_name
            logger.info(f"Updated {target_type} provider class to: {provider_class}, model: {model_name}")

    @staticmethod
    def _eou_config_to_dict(eou_config: Any) -> Dict[str, Any]:
        """Convert EOUConfig to a serializable dict for session storage."""
        if eou_config is None:
            return {}
        return {
            "mode": getattr(eou_config, "mode", None),
            "min_max_speech_wait_timeout": getattr(eou_config, "min_max_speech_wait_timeout", None),
        }

    @staticmethod
    def _interrupt_config_to_dict(interrupt_config: Any) -> Dict[str, Any]:
        """Convert InterruptConfig to a serializable dict for session storage."""
        if interrupt_config is None:
            return {}
        return {
            "mode": getattr(interrupt_config, "mode", None),
            "interrupt_min_duration": getattr(interrupt_config, "interrupt_min_duration", None),
            "interrupt_min_words": getattr(interrupt_config, "interrupt_min_words", None),
            "false_interrupt_pause_duration": getattr(interrupt_config, "false_interrupt_pause_duration", None),
            "resume_on_false_interrupt": getattr(interrupt_config, "resume_on_false_interrupt", None),
        }

    def set_eou_config(self, eou_config: Any) -> None:
        """Store EOU config on session for later use (internal tracking, not sent)."""
        self.session.eou_config = self._eou_config_to_dict(eou_config)

    def set_interrupt_config(self, interrupt_config: Any) -> None:
        """Store Interrupt config on session for later use (internal tracking, not sent)."""
        self.session.interrupt_config = self._interrupt_config_to_dict(interrupt_config)

    def add_participant_metrics(
        self,
        participant_id: str,
        kind: Optional[str] = None,
        sip_user: Optional[bool] = None,
        join_time: Optional[float] = None,
        meta: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Append a participant entry (agent or user) into session.participant_metrics."""
        self.session.participant_metrics.append(
            ParticipantMetrics(
                participant_id=participant_id,
                kind=kind,
                sip_user=sip_user,
                join_time=join_time,
                meta=meta,
            )
        )
        self.traces_flow_manager.participant_metrics.append(
            ParticipantMetrics(
                participant_id=participant_id,
                kind=kind,
                sip_user=sip_user,
                join_time=join_time,
            )
        )

    def set_traces_flow_manager(self, manager: TracesFlowManager) -> None:
        """Set the TracesFlowManager instance."""
        self.traces_flow_manager = manager

    def set_playground_manager(self, manager: Optional[PlaygroundManager]) -> None:
        """Set the PlaygroundManager instance."""
        self.playground = True
        self.playground_manager = manager

    def finalize_session(self) -> None:
        """Finalize the session, completing any in-progress turn."""
        if self.current_turn:
            self.complete_turn()
        self.session.session_end_time = time.time()

    # ──────────────────────────────────────────────
    # Turn lifecycle
    # ──────────────────────────────────────────────

    def _generate_turn_id(self) -> str:
        """Generate a hash-based turn ID."""
        timestamp = str(time.time())
        session_id = self.session.session_id or "default"
        turn_count = str(self._total_turns)
        hash_input = f"{timestamp}_{session_id}_{turn_count}"
        return hashlib.md5(hash_input.encode()).hexdigest()[:16]

    @staticmethod
    def _round_latency(latency: float) -> float:
        """Convert latency from seconds to milliseconds and round."""
        return round(latency * 1000, 4)

    def start_turn(self) -> None:
        """Start a new turn. Completes any existing turn first."""
        logger.info(f"[metrics] start_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}")
        if self.current_turn:
            logger.info(f"[metrics] start_turn() completing existing turn before creating new one")
            self.complete_turn()

        self._total_turns += 1
        self.current_turn = TurnMetrics(turn_id=self._generate_turn_id(), preemtive_generation_enabled=self.preemtive_generation_enabled)

        # Carry over pending user start time from a previously discarded turn
        if self._pending_user_start_time is not None:
            self.current_turn.user_speech_start_time = self._pending_user_start_time
            self._start_timeline_event("user_speech", self._pending_user_start_time)

        # Carry over buffered data from an interrupted turn
        if self._pending_interrupt_timeline is not None:
            tl = self._pending_interrupt_timeline
            event = TimelineEvent(
                event_type="user_speech",
                start_time=tl["start_time"],
                end_time=tl.get("end_time"),
                duration_ms=tl.get("duration_ms"),
                text=tl.get("text")
            )
            self.current_turn.timeline_event_metrics.append(event)
            logger.info(f"[metrics] start_turn() carried over interrupt timeline: {tl.get('text')}")
            self._pending_interrupt_timeline = None
        if self._pending_interrupt_stt is not None:
            self.current_turn.stt_metrics.append(self._pending_interrupt_stt)
            logger.info(f"[metrics] start_turn() carried over interrupt STT: {self._pending_interrupt_stt.stt_transcript}")
            self._pending_interrupt_stt = None
        if self._pending_interrupt_eou is not None:
            self.current_turn.eou_metrics.append(self._pending_interrupt_eou)
            logger.info(f"[metrics] start_turn() carried over interrupt EOU: {self._pending_interrupt_eou.eou_latency}")
            self._pending_interrupt_eou = None
        if self._pending_interrupt_vad is not None:
            self.current_turn.vad_metrics.append(self._pending_interrupt_vad)
            logger.info(f"[metrics] start_turn() carried over interrupt VAD")
            self._pending_interrupt_vad = None

        # If user is currently speaking, capture the start time
        if self._is_user_speaking and self._user_input_start_time:
            if self.current_turn.user_speech_start_time is None:
                self.current_turn.user_speech_start_time = self._user_input_start_time
                if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics):
                    self._start_timeline_event("user_speech", self._user_input_start_time)

    def complete_turn(self) -> None:
        """Complete the current turn, calculate E2E, validate, serialize and send."""
        logger.info(f"[metrics] complete_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}")
        if not self.current_turn:
            logger.info(f"[metrics] complete_turn() early return — no current_turn")
            return
        self.current_turn.session_metrics = self.session
        # Calculate E2E latency
        self.current_turn.compute_e2e_latency()

        # Validate turn has meaningful data
        has_stt = bool(self.current_turn.stt_metrics and any(s.stt_latency is not None for s in self.current_turn.stt_metrics))
        has_llm = bool(self.current_turn.llm_metrics and any(l.llm_ttft is not None for l in self.current_turn.llm_metrics))
        has_tts = bool(self.current_turn.tts_metrics and any(t.tts_latency is not None or t.ttfb is not None for t in self.current_turn.tts_metrics))
        logger.info(f"[metrics] complete_turn() validation | stt={has_stt} llm={has_llm} tts={has_tts} | user_speech_start={self.current_turn.user_speech_start_time is not None} | e2e={self.current_turn.e2e_latency}")
        if not self._validate_turn(self.current_turn) and self._total_turns > 1:
            logger.warning(f"[metrics] complete_turn() DISCARDING turn — validation failed | total_turns={self._total_turns}")
            # Cache user start time for next turn
            if self.current_turn.user_speech_start_time is not None:
                if (self._pending_user_start_time is None or
                        self.current_turn.user_speech_start_time < self._pending_user_start_time):
                    self._pending_user_start_time = self.current_turn.user_speech_start_time
                    logger.info(f"[metrics] Caching earliest user start: {self._pending_user_start_time}")
            self.current_turn = None
            return

        # If this turn was interrupted, buffer the interrupting user's metrics
        # (STT, EOU, VAD, timeline) for carry-over to the next turn.
        if self.current_turn.is_interrupted:
            # Buffer STT metrics from interrupting speech
            if len(self.current_turn.stt_metrics) >= 2:
                self._pending_interrupt_stt = self.current_turn.stt_metrics.pop()
                logger.info(f"[metrics] complete_turn() buffering interrupt STT for next turn: {self._pending_interrupt_stt.stt_transcript}")

            # Buffer EOU metrics from interrupting speech
            if len(self.current_turn.eou_metrics) >= 2:
                self._pending_interrupt_eou = self.current_turn.eou_metrics.pop()
                logger.info(f"[metrics] complete_turn() buffering interrupt EOU for next turn: {self._pending_interrupt_eou.eou_latency}")

            # Buffer VAD metrics from interrupting speech and construct timeline from it
            if len(self.current_turn.vad_metrics) >= 2:
                self._pending_interrupt_vad = self.current_turn.vad_metrics.pop()
                # Build the pending timeline from the buffered VAD data
                self._pending_interrupt_timeline = {
                    "start_time": self._pending_interrupt_vad.user_speech_start_time,
                    "end_time": self._pending_interrupt_vad.user_speech_end_time,
                    "duration_ms": (
                        round((self._pending_interrupt_vad.user_speech_end_time - self._pending_interrupt_vad.user_speech_start_time) * 1000, 4)
                        if self._pending_interrupt_vad.user_speech_start_time and self._pending_interrupt_vad.user_speech_end_time
                        else None
                    ),
                    "text": None,  # Will be set by set_user_transcript in the next turn
                }
                logger.info(f"[metrics] complete_turn() buffering interrupt VAD + timeline for next turn")

        # Send trace
        if self.traces_flow_manager:
            self.traces_flow_manager.create_unified_turn_trace(self.current_turn, self.session)

        self.turns.append(self.current_turn)

        # Send to playground
        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(metrics=self.current_turn, full_turn_data=True)

        # Serialize and send analytics
        interaction_data = self._serialize_turn(self.current_turn)
        transformed_data = self._transform_to_camel_case(interaction_data)
        transformed_data = self._remove_internal_fields(transformed_data)

        if len(self.turns) > 1:
            self._remove_provider_fields(transformed_data)

        transformed_data = self._remove_negatives(transformed_data)
        
        global_event_emitter.emit("TURN_METRICS_ADDED", {"metrics": transformed_data})

        self.analytics_client.send_interaction_analytics_safe(transformed_data)

        self.current_turn = None
        self._pending_user_start_time = None

        # Reset transient timing state so stale values don't leak into the next turn
        self._stt_start_time = None
        self._is_user_speaking = False

    def schedule_turn_complete(self, timeout: float = 1.0) -> None:
        """Schedule turn completion after a timeout (for realtime modes)."""
        if self._agent_speech_end_timer:
            self._agent_speech_end_timer.cancel()

        try:
            loop = asyncio.get_event_loop()
            self._agent_speech_end_timer = loop.call_later(timeout, self._finalize_realtime_turn)
        except RuntimeError:
            # No event loop available, complete immediately
            self.complete_turn()

    def _finalize_realtime_turn(self) -> None:
        """Finalize a realtime turn after the timeout."""
        if not self.current_turn:
            return

        # Ensure agent speech end time is set
        if self.current_turn.agent_speech_start_time and not self.current_turn.agent_speech_end_time:
            self.current_turn.agent_speech_end_time = time.perf_counter()

        # Ensure user speech end time is set
        if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time:
            self.current_turn.user_speech_end_time = time.perf_counter()

        # Close any open timeline events
        current_time = time.perf_counter()
        for event in self.current_turn.timeline_event_metrics:
            if event.end_time is None:
                event.end_time = current_time
                event.duration_ms = round((current_time - event.start_time) * 1000, 4)

        # Compute realtime latencies
        self._compute_realtime_latencies()

        self._agent_speech_end_timer = None
        self.complete_turn()

    def _compute_realtime_latencies(self) -> None:
        """Compute realtime TTFB (used as E2E) and agent speech duration based on user and agent speech timestamps."""
        turn = self.current_turn
        if not turn:
            return

        if turn.user_speech_end_time and turn.agent_speech_start_time:
            ttfb = round((turn.agent_speech_start_time - turn.user_speech_end_time) * 1000, 4)
            turn.e2e_latency = ttfb
            if turn.realtime_metrics:
                turn.realtime_metrics[-1].realtime_ttfb = ttfb

        if turn.agent_speech_start_time and turn.agent_speech_end_time:
            turn.agent_speech_duration = round(
                (turn.agent_speech_end_time - turn.agent_speech_start_time) * 1000, 4
            )

    # ──────────────────────────────────────────────
    # VAD metrics
    # ──────────────────────────────────────────────

    def on_user_speech_start(self) -> None:
        """Called when user starts speaking (VAD start)."""
        if not self.current_turn:
            self.start_turn()

        logger.info(f"[metrics] on_user_speech_start() called | _is_user_speaking={self._is_user_speaking} | current_turn={'exists' if self.current_turn else 'None'}")
        if self._is_user_speaking:
            logger.info(f"[metrics] on_user_speech_start() early return — already speaking")
            return

        self._is_user_speaking = True
        self._user_input_start_time = time.perf_counter()

        if self.current_turn:
            if self.current_turn.user_speech_start_time is None:
                logger.info(f"[DEBUG_START] on_user_speech_start() | setting user_speech_start_time={self._user_input_start_time}")
                self.current_turn.user_speech_start_time = self._user_input_start_time

            if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics):
                self._start_timeline_event("user_speech", self._user_input_start_time)

            # Create new entry if none exists or last entry is already complete
            if not self.current_turn.vad_metrics or self.current_turn.vad_metrics[-1].user_speech_end_time is not None:
                self.current_turn.vad_metrics.append(VadMetrics())
            self.current_turn.vad_metrics[-1].user_speech_start_time = self._user_input_start_time

    def on_user_speech_end(self) -> None:
        """Called when user stops speaking (VAD end)."""
        logger.info(f"[metrics] on_user_speech_end() called | current_turn={'exists' if self.current_turn else 'None'}")
        self._is_user_speaking = False
        self._user_speech_end_time = time.perf_counter()

        if self.current_turn and self.current_turn.user_speech_start_time:
            self.current_turn.user_speech_end_time = self._user_speech_end_time
            logger.info(f"[DEBUG_END] on_user_speech_end() | setting user_speech_end_time={self._user_speech_end_time}")
            self.current_turn.user_speech_duration = self._round_latency(
                self.current_turn.user_speech_end_time - self.current_turn.user_speech_start_time
            )
            self._end_timeline_event("user_speech", self._user_speech_end_time)
            
            if not self.current_turn.vad_metrics:
                self.current_turn.vad_metrics.append(VadMetrics())
            self.current_turn.vad_metrics[-1].user_speech_end_time = self._user_speech_end_time

            logger.info(f"user speech duration: {self.current_turn.user_speech_duration}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"user_speech_duration": self.current_turn.user_speech_duration}
                )

    def set_vad_config(self, vad_config: Dict[str, Any]) -> None:
        """Set VAD configuration."""
        if not self.current_turn.vad_metrics:
            self.current_turn.vad_metrics.append(VadMetrics())

        vad = self.current_turn.vad_metrics[-1]
        vad.vad_threshold = vad_config.get("threshold")
        vad.vad_min_speech_duration = vad_config.get("min_speech_duration")
        vad.vad_min_silence_duration = vad_config.get("min_silence_duration")

    # ──────────────────────────────────────────────
    # STT metrics
    # ──────────────────────────────────────────────

    def on_stt_start(self) -> None:
        """Called when STT processing starts."""
        self._stt_start_time = time.perf_counter()
        if self.current_turn:
            logger.info(f"[metrics] on_stt_start() called | current_turn={'exists' if self.current_turn else 'None'}")
            # Create new entry if none exists or last entry is already complete
            if not self.current_turn.stt_metrics or self.current_turn.stt_metrics[-1].stt_latency is not None:
                self.current_turn.stt_metrics.append(SttMetrics())
            stt = self.current_turn.stt_metrics[-1]
            stt.stt_start_time = self._stt_start_time

    def on_stt_complete(self, transcript: str = "", duration: float = 0.0, confidence: float = 0.0) -> None:
        """Called when STT processing completes."""
        if self.current_turn and self.current_turn.stt_metrics:
            stt = self.current_turn.stt_metrics[-1]
            if stt.stt_preemptive_generation_enabled and stt.stt_preemptive_generation_occurred:
                logger.info("STT preemptive generation occurred, skipping stt complete")
                return

        if not self.current_turn:
            return

        if not self.current_turn.stt_metrics:
            self.current_turn.stt_metrics.append(SttMetrics())
        stt = self.current_turn.stt_metrics[-1]

        stt_end_time = time.perf_counter()
        stt.stt_end_time = stt_end_time

        if self._stt_start_time:
            stt_latency = self._round_latency(stt_end_time - self._stt_start_time)
            stt.stt_latency = stt_latency
            stt.stt_confidence = confidence
            stt.stt_duration = duration
            logger.info(f"stt latency: {stt_latency}ms | stt confidence: {confidence} | stt duration: {duration}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(metrics={"stt_latency": stt_latency})

        if transcript:
            stt.stt_transcript = transcript

        self._stt_start_time = None

    def on_stt_preflight_end(self, transcript: str = "") -> None:
        """Called when STT preflight event is received."""
        if self.current_turn and self.current_turn.stt_metrics:
            stt = self.current_turn.stt_metrics[-1]
            if stt.stt_start_time:
                stt.stt_preflight_end_time = time.perf_counter()
                stt.stt_preflight_latency = self._round_latency(
                    stt.stt_preflight_end_time - stt.stt_start_time
                )
                stt.stt_preflight_transcript = transcript
                logger.info(f"stt preflight latency: {stt.stt_preflight_latency}ms")

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(
                        metrics={"stt_preflight_latency": stt.stt_preflight_latency}
                    )

    def on_stt_interim_end(self) -> None:
        """Called when STT interim event is received."""
        if self.current_turn and self.current_turn.stt_metrics:
            stt = self.current_turn.stt_metrics[-1]
            if stt.stt_start_time and not stt.stt_interim_latency:
                stt.stt_interim_end_time = time.perf_counter()
                stt.stt_interim_latency = self._round_latency(
                    stt.stt_interim_end_time - stt.stt_start_time
                )
                logger.info(f"stt interim latency: {stt.stt_interim_latency}ms")

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(
                        metrics={"stt_interim_latency": stt.stt_interim_latency}
                    )

    def set_stt_usage(self, input_tokens: int = 0, output_tokens: int = 0, total_tokens: int = 0) -> None:
        """Set STT token usage."""
        if self.current_turn:
            if not self.current_turn.stt_metrics:
                self.current_turn.stt_metrics.append(SttMetrics())
            stt = self.current_turn.stt_metrics[-1]
            stt.stt_input_tokens = input_tokens
            stt.stt_output_tokens = output_tokens
            stt.stt_total_tokens = total_tokens

    def set_preemptive_generation_enabled(self, enabled: bool) -> None:
        """Mark preemptive generation as enabled for current STT."""
        self.preemtive_generation_enabled = enabled
            
    # ──────────────────────────────────────────────
    # EOU metrics
    # ──────────────────────────────────────────────

    def on_eou_start(self) -> None:
        """Called when EOU processing starts."""
        if not self.current_turn:
            self.start_turn()
        self._eou_start_time = time.perf_counter()
        if self.current_turn:
            # Create new entry if none exists or last entry is already complete
            if not self.current_turn.eou_metrics or self.current_turn.eou_metrics[-1].eou_latency is not None:
                self.current_turn.eou_metrics.append(EouMetrics())
            self.current_turn.eou_metrics[-1].eou_start_time = self._eou_start_time

    def on_eou_complete(self) -> None:
        """Called when EOU processing completes."""
        if self._eou_start_time:
            eou_end_time = time.perf_counter()
            eou_latency = self._round_latency(eou_end_time - self._eou_start_time)

            if self.current_turn:
                if not self.current_turn.eou_metrics:
                    self.current_turn.eou_metrics.append(EouMetrics())
                eou = self.current_turn.eou_metrics[-1]
                eou.eou_end_time = eou_end_time
                eou.eou_latency = eou_latency
                logger.info(f"eou latency: {eou_latency}ms")

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(metrics={"eou_latency": eou_latency})

            self._eou_start_time = None
    
    def on_wait_for_additional_speech(self, duration: float, eou_probability: float):
        if self.current_turn:
            if not self.current_turn.eou_metrics:
                self.current_turn.eou_metrics.append(EouMetrics())
            eou = self.current_turn.eou_metrics[-1]
            eou.wait_for_additional_speech_duration = duration
            eou.eou_probability = eou_probability
            eou.waited_for_additional_speech = True
            logger.info(f"wait for additional speech duration: {duration}ms")
            logger.info(f"wait for additional speech eou probability: {eou_probability}")

    # ──────────────────────────────────────────────
    # LLM metrics
    # ──────────────────────────────────────────────

    def on_llm_start(self) -> None:
        """Called when LLM processing starts."""
        self._llm_start_time = time.perf_counter()
        if self.current_turn:
            if not self.current_turn.llm_metrics:
                self.current_turn.llm_metrics.append(LlmMetrics())
            self.current_turn.llm_metrics[-1].llm_start_time = self._llm_start_time

    def on_llm_first_token(self) -> None:
        """Called when first LLM token is received."""
        if self.current_turn and self.current_turn.llm_metrics:
            llm = self.current_turn.llm_metrics[-1]
            if llm.llm_start_time:
                llm.llm_first_token_time = time.perf_counter()
                llm.llm_ttft = self._round_latency(llm.llm_first_token_time - llm.llm_start_time)
                logger.info(f"llm ttft: {llm.llm_ttft}ms")

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(metrics={"llm_ttft": llm.llm_ttft})

    def on_llm_complete(self) -> None:
        """Called when LLM processing completes."""
        if self._llm_start_time:
            llm_end_time = time.perf_counter()
            llm_duration = self._round_latency(llm_end_time - self._llm_start_time)

            if self.current_turn and self.current_turn.llm_metrics:
                llm = self.current_turn.llm_metrics[-1]
                llm.llm_end_time = llm_end_time
                llm.llm_duration = llm_duration
                logger.info(f"llm duration: {llm_duration}ms")

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(metrics={"llm_duration": llm_duration})

            self._llm_start_time = None

    def set_llm_input(self, text: str) -> None:
        """Record the actual text sent to LLM."""
        if self.current_turn:
            if not self.current_turn.llm_metrics:
                self.current_turn.llm_metrics.append(LlmMetrics())
            self.current_turn.llm_metrics[-1].llm_input = text

    def set_llm_usage(self, usage: Dict[str, Any]) -> None:
        """Set LLM token usage and calculate TPS."""
        if not self.current_turn or not usage:
            return

        if not self.current_turn.llm_metrics:
            self.current_turn.llm_metrics.append(LlmMetrics())

        llm = self.current_turn.llm_metrics[-1]
        llm.prompt_tokens = usage.get("prompt_tokens")
        llm.completion_tokens = usage.get("completion_tokens")
        llm.total_tokens = usage.get("total_tokens")
        llm.prompt_cached_tokens = usage.get("prompt_cached_tokens")

        if llm.llm_duration and llm.llm_duration > 0 and llm.completion_tokens and llm.completion_tokens > 0:
            latency_seconds = llm.llm_duration / 1000
            llm.tokens_per_second = round(llm.completion_tokens / latency_seconds, 2)

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(metrics={
                "prompt_tokens": llm.prompt_tokens,
                "completion_tokens": llm.completion_tokens,
                "total_tokens": llm.total_tokens,
                "tokens_per_second": llm.tokens_per_second,
            })

    # ──────────────────────────────────────────────
    # TTS metrics
    # ──────────────────────────────────────────────

    def on_tts_start(self) -> None:
        """Called when TTS processing starts."""
        if not self.current_turn:
            self.start_turn()
        self._tts_start_time = time.perf_counter()
        self._tts_first_byte_time = None
        if self.current_turn:
            if not self.current_turn.tts_metrics:
                self.current_turn.tts_metrics.append(TtsMetrics())
            self.current_turn.tts_metrics[-1].tts_start_time = self._tts_start_time

    def on_tts_first_byte(self) -> None:
        """Called when TTS produces first audio byte."""
        if self._tts_start_time:
            now = time.perf_counter()
            if self.current_turn and self.current_turn.tts_metrics:
                tts = self.current_turn.tts_metrics[-1]
                tts.tts_end_time = now
                tts.ttfb = self._round_latency(now - self._tts_start_time)
                tts.tts_first_byte_time = now
                logger.info(f"tts ttfb: {tts.ttfb}ms")

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(metrics={"ttfb": tts.ttfb})

            self._tts_first_byte_time = now

    def on_agent_speech_start(self) -> None:
        """Called when agent starts speaking (actual audio output)."""
        if not self.current_turn:
            self.start_turn()
        self._is_agent_speaking = True
        self._agent_speech_start_time = time.perf_counter()

        if self.current_turn:
            self.current_turn.agent_speech_start_time = self._agent_speech_start_time
            if not any(
                ev.event_type == "agent_speech" and ev.end_time is None
                for ev in self.current_turn.timeline_event_metrics
            ):
                self._start_timeline_event("agent_speech", self._agent_speech_start_time)

    def on_agent_speech_end(self) -> None:
        """Called when agent stops speaking."""
        logger.info(f"[metrics] on_agent_speech_end() called | current_turn={'exists' if self.current_turn else 'None'} | _tts_start={self._tts_start_time is not None}")
        self._is_agent_speaking = False
        agent_speech_end_time = time.perf_counter()

        if self.current_turn:
            self._end_timeline_event("agent_speech", agent_speech_end_time)
            self.current_turn.agent_speech_end_time = agent_speech_end_time

        # Calculate TTS latency from first_byte - start
        if self._tts_start_time and self._tts_first_byte_time:
            total_tts_latency = self._tts_first_byte_time - self._tts_start_time
            if self.current_turn and self.current_turn.agent_speech_start_time:
                if self.current_turn.tts_metrics:
                    tts = self.current_turn.tts_metrics[-1]
                    tts.tts_latency = self._round_latency(total_tts_latency)

                self.current_turn.agent_speech_duration = self._round_latency(
                    agent_speech_end_time - self.current_turn.agent_speech_start_time
                )

                if self.playground and self.playground_manager:
                    self.playground_manager.send_cascading_metrics(
                        metrics={"tts_latency": self.current_turn.tts_metrics[-1].tts_latency if self.current_turn.tts_metrics else None}
                    )
                    self.playground_manager.send_cascading_metrics(
                        metrics={"agent_speech_duration": self.current_turn.agent_speech_duration}
                    )

            self._tts_start_time = None
            self._tts_first_byte_time = None
        elif self._tts_start_time:
            self._tts_start_time = None
            self._tts_first_byte_time = None

    def add_tts_characters(self, count: int) -> None:
        """Add to the total character count for the current turn."""
        if self.current_turn:
            if not self.current_turn.tts_metrics:
                self.current_turn.tts_metrics.append(TtsMetrics())
            tts = self.current_turn.tts_metrics[-1]
            tts.tts_characters = (tts.tts_characters or 0) + count

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(metrics={"tts_characters": tts.tts_characters})

    # ──────────────────────────────────────────────
    # Interruption metrics
    # ──────────────────────────────────────────────

    def on_interrupted(self) -> None:
        """Called when user interrupts the agent."""
        if self._is_agent_speaking:
            self._total_interruptions += 1
            if self.current_turn:
                self.current_turn.is_interrupted = True
                logger.info(f"User interrupted the agent. Total interruptions: {self._total_interruptions}")

            if self.playground and self.playground_manager and self.current_turn:
                self.playground_manager.send_cascading_metrics(
                    metrics={"interrupted": self.current_turn.is_interrupted}
                )

    def on_false_interrupt_start(self, pause_duration: Optional[float] = None) -> None:
        """Called when a false interrupt is detected."""
        if self.current_turn:
            if not self.current_turn.interruption_metrics:
                self.current_turn.interruption_metrics = InterruptionMetrics()
            im = self.current_turn.interruption_metrics
            im.is_false_interrupt = True
            im.false_interrupt_start_time = time.perf_counter()
            if pause_duration is not None:
                im.false_interrupt_pause_duration = pause_duration

    def on_false_interrupt_resume(self) -> None:
        """Called when agent resumes after a false interrupt."""
        if self.current_turn and self.current_turn.interruption_metrics:
            im = self.current_turn.interruption_metrics
            im.resumed_after_false_interrupt = True
            im.resume_on_false_interrupt = True
            im.false_interrupt_end_time = time.perf_counter()
            if im.false_interrupt_start_time:
                im.false_interrupt_duration = self._round_latency(
                    im.false_interrupt_end_time - im.false_interrupt_start_time
                )

    def on_false_interrupt_escalated(self, word_count: Optional[int] = None) -> None:
        """Called when a false interrupt escalates to a real interrupt."""
        if self.current_turn and self.current_turn.interruption_metrics:
            im = self.current_turn.interruption_metrics
            im.is_false_interrupt = False
            if word_count is not None:
                im.false_interrupt_words = word_count

    # ──────────────────────────────────────────────
    # Tool metrics
    # ──────────────────────────────────────────────

    def add_function_tool_call(
        self,
        tool_name: str,
        tool_params: Optional[Union[Dict[str, Any], list]] = None,
        tool_response: Optional[Dict[str, Any]] = None,
        start_time: Optional[float] = None,
        end_time: Optional[float] = None,
    ) -> None:
        """Track a function tool call in the current turn."""
        if self.current_turn:
            self.current_turn.function_tools_called.append(tool_name)

            tool_metric = FunctionToolMetrics(
                tool_name=tool_name,
                tool_params=tool_params or {},
                tool_response=tool_response or {},
                start_time=start_time or time.perf_counter(),
                end_time=end_time,
            )
            if tool_metric.start_time and tool_metric.end_time:
                tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time)

            self.current_turn.function_tool_metrics.append(tool_metric)

            # Also track in function_tool_timestamps for backward compat
            tool_timestamp = {
                "tool_name": tool_name,
                "timestamp": start_time or time.perf_counter(),
                "readable_time": time.strftime("%H:%M:%S", time.localtime()),
            }
            self.current_turn.function_tool_timestamps.append(tool_timestamp)

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"function_tool_timestamps": self.current_turn.function_tool_timestamps}
                )

    def add_mcp_tool_call(
        self,
        tool_type: str = "local",
        tool_url: Optional[str] = None,
        tool_params: Optional[Union[Dict[str, Any], list]] = None,
        tool_response: Optional[Dict[str, Any]] = None,
        start_time: Optional[float] = None,
        end_time: Optional[float] = None,
    ) -> None:
        """Track an MCP tool call in the current turn."""
        if self.current_turn:
            tool_metric = McpToolMetrics(
                type=tool_type,
                tool_url=tool_url,
                tool_params=tool_params or {},
                tool_response=tool_response or {},
                start_time=start_time or time.perf_counter(),
                end_time=end_time,
            )
            if tool_metric.start_time and tool_metric.end_time:
                tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time)

            self.current_turn.mcp_tool_metrics.append(tool_metric)

    # ──────────────────────────────────────────────
    # Transcript / response
    # ──────────────────────────────────────────────

    def set_user_transcript(self, transcript: str) -> None:
        """Set the user transcript for the current turn."""
        if self.current_turn:
            # Guard: skip if the turn already has user_speech AND the pipeline
            # is actively processing (agent speaking OR LLM/TTS already started).
            # This prevents the interrupting speech's transcript from overwriting
            # the original transcript of the current turn.
            pipeline_active = (
                self._is_agent_speaking
                or bool(self.current_turn.llm_metrics)
                or bool(self.current_turn.tts_metrics)
            )
            if pipeline_active and self.current_turn.user_speech:
                logger.info(
                    f"[metrics] Skipping set_user_transcript — pipeline active "
                    f"and turn already has user_speech, new transcript "
                    f"belongs to the next turn: {transcript}"
                )
                return

            self.current_turn.user_speech = transcript
            logger.info(f"user input speech: {transcript}")
            global_event_emitter.emit("USER_TRANSCRIPT_ADDED", {"text": transcript})

            # Update timeline
            user_events = [
                ev for ev in self.current_turn.timeline_event_metrics if ev.event_type == "user_speech"
            ]
            if user_events:
                user_events[-1].text = transcript
            else:
                current_time = time.perf_counter()
                self._start_timeline_event("user_speech", current_time)
                if self.current_turn.timeline_event_metrics:
                    self.current_turn.timeline_event_metrics[-1].text = transcript

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"user_speech": self.current_turn.user_speech}
                )

    def set_agent_response(self, response: str) -> None:
        """Set the agent response for the current turn."""
        if not self.current_turn:
            self.start_turn()
        self.current_turn.agent_speech = response
        logger.info(f"agent output speech: {response}")
        global_event_emitter.emit("AGENT_TRANSCRIPT_ADDED", {"text": response})

        if not any(ev.event_type == "agent_speech" for ev in self.current_turn.timeline_event_metrics):
            current_time = time.perf_counter()
            self._start_timeline_event("agent_speech", current_time)

        self._update_timeline_event_text("agent_speech", response)

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={"agent_speech": self.current_turn.agent_speech}
            )

    # ──────────────────────────────────────────────
    # Knowledge Base
    # ──────────────────────────────────────────────

    def on_knowledge_base_start(self, kb_id: Optional[str] = None) -> None:
        """Called when knowledge base processing starts."""
        if self.current_turn:
            kb_metric = KbMetrics(
                kb_id=kb_id,
                kb_start_time=time.perf_counter(),
            )
            self.current_turn.kb_metrics.append(kb_metric)

    def on_knowledge_base_complete(self, documents: List[str], scores: List[float]) -> None:
        """Called when knowledge base processing completes."""
        if self.current_turn and self.current_turn.kb_metrics:
            kb = self.current_turn.kb_metrics[-1]
            kb.kb_documents = documents
            kb.kb_scores = scores
            kb.kb_end_time = time.perf_counter()

            if kb.kb_start_time:
                kb.kb_retrieval_latency = self._round_latency(kb.kb_end_time - kb.kb_start_time)
                logger.info(f"kb retrieval latency: {kb.kb_retrieval_latency}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={
                        "kb_id": kb.kb_id,
                        "kb_retrieval_latency": kb.kb_retrieval_latency,
                        "kb_documents": kb.kb_documents,
                        "kb_scores": kb.kb_scores,
                    }
                )

    # ──────────────────────────────────────────────
    # A2A
    # ──────────────────────────────────────────────

    def set_a2a_handoff(self) -> None:
        """Set the A2A enabled and handoff occurred flags."""
        if self.current_turn:
            self.current_turn.is_a2a_enabled = True
            self.current_turn.handoff_occurred = True

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"handoff_occurred": self.current_turn.handoff_occurred}
                )

    # ──────────────────────────────────────────────
    # Error tracking
    # ──────────────────────────────────────────────

    def add_error(self, source: str, message: str) -> None:
        """Add an error to the current turn."""
        if self.current_turn:
            now = time.perf_counter()
            self.current_turn.errors.append({
                "source": source,
                "message": str(message),
                "timestamp": now,
                "timestamp_perf": now,
            })

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"errors": self.current_turn.errors}
                )

    def on_fallback_event(self, event_data: Dict[str, Any]) -> None:
        """Record a fallback event for the current turn"""
        if not self.current_turn:
            self.start_turn()
        if self.current_turn:
            fallback_event = FallbackEvent(
                component_type=event_data.get("component_type", ""),
                temporary_disable_sec=event_data.get("temporary_disable_sec", 0),
                permanent_disable_after_attempts=event_data.get("permanent_disable_after_attempts", 0),
                recovery_attempt=event_data.get("recovery_attempt", 0),
                message=event_data.get("message", ""),
                start_time=event_data.get("start_time"),
                end_time=event_data.get("end_time"),
                duration_ms=event_data.get("duration_ms"),
                original_provider_label=event_data.get("original_provider_label"),
                original_connection_start=event_data.get("original_connection_start"),
                original_connection_end=event_data.get("original_connection_end"),
                original_connection_duration_ms=event_data.get("original_connection_duration_ms"),
                new_provider_label=event_data.get("new_provider_label"),
                new_connection_start=event_data.get("new_connection_start"),
                new_connection_end=event_data.get("new_connection_end"),
                new_connection_duration_ms=event_data.get("new_connection_duration_ms"),
                is_recovery=event_data.get("is_recovery", False),
            )
            self.current_turn.fallback_events.append(fallback_event)
            logger.info(f"Fallback event recorded: {event_data.get('component_type')} - {event_data.get('message')}")

    # ──────────────────────────────────────────────
    # Background audio metrics
    # ──────────────────────────────────────────────

    def on_background_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None:
        """Called when background audio playback starts."""
        now = time.perf_counter()
        self._background_audio_start_time = now
        file_name = os.path.basename(file_path) if file_path else None
        if self.current_turn:
            self.current_turn.background_audio_file_path = file_name
            self.current_turn.background_audio_looping = looping
            self._start_timeline_event("background_audio", now)
        if self.traces_flow_manager:
            self.traces_flow_manager.create_background_audio_start_span(
                file_path=file_name, looping=looping, start_time=now
            )
        logger.info(f"[metrics] background audio started | file={file_name} | looping={looping}")

    def on_background_audio_stop(self) -> None:
        """Called when background audio playback stops."""
        now = time.perf_counter()
        self._background_audio_start_time = None
        if self.current_turn:
            self._end_timeline_event("background_audio", now)
        if self.traces_flow_manager:
            self.traces_flow_manager.create_background_audio_stop_span(end_time=now)
        logger.info("[metrics] background audio stopped")

    # ──────────────────────────────────────────────
    # Thinking audio metrics
    # ──────────────────────────────────────────────

    def on_thinking_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None:
        """Called when thinking audio playback starts."""
        now = time.perf_counter()
        self._thinking_audio_start_time = now
        file_name = os.path.basename(file_path) if file_path else None
        if self.current_turn:
            self.current_turn.thinking_audio_file_path = file_name
            self.current_turn.thinking_audio_looping = looping
            self._start_timeline_event("thinking_audio", now)
        logger.info(f"[metrics] thinking audio started | file={file_name} | looping={looping}")

    def on_thinking_audio_stop(self) -> None:
        """Called when thinking audio playback stops."""
        now = time.perf_counter()
        self._thinking_audio_start_time = None
        if self.current_turn:
            self._end_timeline_event("thinking_audio", now)
        logger.info("[metrics] thinking audio stopped")

    # ──────────────────────────────────────────────
    # Realtime-specific metrics
    # ──────────────────────────────────────────────

    def set_realtime_usage(self, usage: Dict[str, Any]) -> None:
        """Set realtime model token usage from a flat dict (input_tokens, total_tokens, cached_*, thoughts_tokens, etc.)."""
        if not self.current_turn or not usage:
            return

        if not self.current_turn.realtime_metrics:
            self.current_turn.realtime_metrics.append(RealtimeMetrics())

        rt = self.current_turn.realtime_metrics[-1]
        rt.realtime_input_tokens = usage.get("input_tokens")
        rt.realtime_output_tokens = usage.get("output_tokens")
        rt.realtime_total_tokens = usage.get("total_tokens")
        rt.realtime_input_text_tokens = usage.get("input_text_tokens")
        rt.realtime_input_audio_tokens = usage.get("input_audio_tokens")
        rt.realtime_input_image_tokens = usage.get("input_image_tokens")
        rt.realtime_input_cached_tokens = usage.get("input_cached_tokens")
        rt.realtime_thoughts_tokens = usage.get("thoughts_tokens")
        rt.realtime_cached_text_tokens = usage.get("cached_text_tokens")
        rt.realtime_cached_audio_tokens = usage.get("cached_audio_tokens")
        rt.realtime_cached_image_tokens = usage.get("cached_image_tokens")
        rt.realtime_output_text_tokens = usage.get("output_text_tokens")
        rt.realtime_output_audio_tokens = usage.get("output_audio_tokens")
        rt.realtime_output_image_tokens = usage.get("output_image_tokens")

    def set_realtime_model_error(self, error: Dict[str, Any]) -> None:
        """Track a realtime model error."""
        if self.current_turn:
            logger.error(f"realtime model error: {error}")
            now = time.perf_counter()
            self.current_turn.errors.append({
                "source": "REALTIME_MODEL",
                "message": str(error.get("message", "Unknown error")),
                "timestamp": now,
                "timestamp_perf": now,
            })

    # ──────────────────────────────────────────────
    # Timeline helpers
    # ──────────────────────────────────────────────

    def _start_timeline_event(self, event_type: str, start_time: float) -> None:
        """Start a timeline event."""
        if self.current_turn:
            event = TimelineEvent(event_type=event_type, start_time=start_time)
            self.current_turn.timeline_event_metrics.append(event)

    def _end_timeline_event(self, event_type: str, end_time: float) -> None:
        """End a timeline event and calculate duration."""
        if self.current_turn:
            for event in reversed(self.current_turn.timeline_event_metrics):
                if event.event_type == event_type:
                    # Allow extending user_speech events which may rapidly start/stop due to VAD
                    if event.end_time is None or event_type == "user_speech":
                        event.end_time = end_time
                        event.duration_ms = self._round_latency(end_time - event.start_time)
                        break

    def _update_timeline_event_text(self, event_type: str, text: str) -> None:
        """Update the most recent timeline event of this type with text content."""
        if self.current_turn:
            for event in reversed(self.current_turn.timeline_event_metrics):
                if event.event_type == event_type:
                    event.text = text
                    break

    # ──────────────────────────────────────────────
    # Validation & Serialization
    # ──────────────────────────────────────────────

    def _validate_turn(self, turn: TurnMetrics) -> bool:
        """Check that the turn has at least one meaningful latency metric."""
        has_stt = bool(turn.stt_metrics and any(s.stt_latency is not None for s in turn.stt_metrics))
        has_llm = bool(turn.llm_metrics and any(l.llm_ttft is not None for l in turn.llm_metrics))
        has_tts = bool(turn.tts_metrics and any(t.tts_latency is not None or t.ttfb is not None for t in turn.tts_metrics))
        has_eou = bool(turn.eou_metrics and any(e.eou_latency is not None for e in turn.eou_metrics))
        has_realtime = bool(turn.realtime_metrics)
        has_e2e = turn.e2e_latency is not None

        return any([has_stt, has_llm, has_tts, has_eou, has_realtime, has_e2e])

    def _serialize_turn(self, turn: TurnMetrics) -> Dict[str, Any]:
        """Serialize a TurnMetrics to a flat dict for analytics API."""
        data: Dict[str, Any] = {}

        # Top-level turn fields
        data["timestamp"] = turn.timestamp
        data["e2e_latency"] = turn.e2e_latency
        data["interrupted"] = turn.is_interrupted
        data["user_speech_start_time"] = turn.user_speech_start_time
        data["user_speech_end_time"] = turn.user_speech_end_time
        data["user_speech_duration"] = turn.user_speech_duration
        data["user_speech"] = turn.user_speech
        data["agent_speech_start_time"] = turn.agent_speech_start_time
        data["agent_speech_end_time"] = turn.agent_speech_end_time
        data["agent_speech_duration"] = turn.agent_speech_duration
        data["agent_speech"] = turn.agent_speech
        data["function_tools_called"] = turn.function_tools_called
        data["function_tool_timestamps"] = turn.function_tool_timestamps
        data["is_a2a_enabled"] = turn.is_a2a_enabled
        data["handoff_occurred"] = turn.handoff_occurred
        data["errors"] = turn.errors

        # Flatten the last STT metrics entry
        if turn.stt_metrics:
            stt = turn.stt_metrics[-1]
            data["stt_latency"] = stt.stt_latency
            data["stt_start_time"] = stt.stt_start_time
            data["stt_end_time"] = stt.stt_end_time
            data["stt_preflight_latency"] = stt.stt_preflight_latency
            data["stt_interim_latency"] = stt.stt_interim_latency
            data["stt_confidence"] = stt.stt_confidence
            data["stt_duration"] = stt.stt_duration

        # Flatten the last EOU metrics entry
        if turn.eou_metrics:
            eou = turn.eou_metrics[-1]
            data["eou_latency"] = eou.eou_latency
            data["eou_start_time"] = eou.eou_start_time
            data["eou_end_time"] = eou.eou_end_time

        # Flatten the last LLM metrics entry
        if turn.llm_metrics:
            llm = turn.llm_metrics[-1]
            data["llm_ttft"] = llm.llm_ttft
            data["llm_duration"] = llm.llm_duration
            data["llm_start_time"] = llm.llm_start_time
            data["llm_end_time"] = llm.llm_end_time
            data["prompt_tokens"] = llm.prompt_tokens
            data["completion_tokens"] = llm.completion_tokens
            data["total_tokens"] = llm.total_tokens
            data["prompt_cached_tokens"] = llm.prompt_cached_tokens
            data["tokens_per_second"] = llm.tokens_per_second

        # Flatten the last KB metrics entry
        if turn.kb_metrics:
            kb = turn.kb_metrics[-1]
            data["kb_id"] = kb.kb_id
            data["kb_documents"] = kb.kb_documents
            data["kb_scores"] = kb.kb_scores
            data["kb_retrieval_latency"] = kb.kb_retrieval_latency
        # Flatten the last TTS metrics entry
        if turn.tts_metrics:
            tts = turn.tts_metrics[-1]
            data["tts_latency"] = tts.tts_latency
            data["ttfb"] = tts.ttfb
            data["tts_start_time"] = tts.tts_start_time
            data["tts_end_time"] = tts.tts_end_time
            data["tts_duration"] = tts.tts_duration
            data["tts_characters"] = tts.tts_characters

        # Flatten the last realtime (full s2s) metrics entry for analytics
        if turn.realtime_metrics:
            rt = turn.realtime_metrics[-1]
            data["ttfb"] = rt.realtime_ttfb
            data["realtime_input_tokens"] = rt.realtime_input_tokens
            data["realtime_total_tokens"] = rt.realtime_total_tokens
            data["realtime_output_tokens"] = rt.realtime_output_tokens
            data["realtime_input_text_tokens"] = rt.realtime_input_text_tokens
            data["realtime_input_audio_tokens"] = rt.realtime_input_audio_tokens
            data["realtime_input_image_tokens"] = rt.realtime_input_image_tokens
            data["realtime_input_cached_tokens"] = rt.realtime_input_cached_tokens
            data["realtime_thoughts_tokens"] = rt.realtime_thoughts_tokens
            data["realtime_cached_text_tokens"] = rt.realtime_cached_text_tokens
            data["realtime_cached_audio_tokens"] = rt.realtime_cached_audio_tokens
            data["realtime_cached_image_tokens"] = rt.realtime_cached_image_tokens
            data["realtime_output_text_tokens"] = rt.realtime_output_text_tokens
            data["realtime_output_audio_tokens"] = rt.realtime_output_audio_tokens
            data["realtime_output_image_tokens"] = rt.realtime_output_image_tokens

        # Provider info (from session). Use realtime_* keys for known S2S models.
        providers = self.session.provider_per_component
        if "realtime" in providers:
            data["realtime_provider_class"] = providers["realtime"]["provider_class"]
            data["realtime_model_name"] = providers["realtime"]["model_name"]
        elif "llm" in providers:
            pc = providers["llm"]["provider_class"]
            if pc in REALTIME_PROVIDER_CLASS_NAMES:
                data["realtime_provider_class"] = pc
                data["realtime_model_name"] = providers["llm"]["model_name"]
            else:
                data["llm_provider_class"] = pc
                data["llm_model_name"] = providers["llm"]["model_name"]
        if "stt" in providers:
            data["stt_provider_class"] = providers["stt"]["provider_class"]
            data["stt_model_name"] = providers["stt"]["model_name"]
        if "tts" in providers:
            data["tts_provider_class"] = providers["tts"]["provider_class"]
            data["tts_model_name"] = providers["tts"]["model_name"]
        if "vad" in providers:
            data["vad_provider_class"] = providers["vad"]["provider_class"]
            data["vad_model_name"] = providers["vad"]["model_name"]
        if "eou" in providers:
            data["eou_provider_class"] = providers["eou"]["provider_class"]
            data["eou_model_name"] = providers["eou"]["model_name"]

        # System instructions (first turn only)
        if self._total_turns == 1 or len(self.turns) == 0:
            data["system_instructions"] = self.session.system_instruction
        else:
            data["system_instructions"] = ""

        # Timeline
        data["timeline"] = [asdict(ev) for ev in turn.timeline_event_metrics]

        return data

    def _transform_to_camel_case(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform snake_case field names to camelCase for analytics."""
        field_mapping = {
            # User and agent metrics 
            "user_speech_start_time": "userSpeechStartTime",
            "user_speech_end_time": "userSpeechEndTime",
            "user_speech_duration": "userSpeechDuration",
            "agent_speech_start_time": "agentSpeechStartTime",
            "agent_speech_end_time": "agentSpeechEndTime",
            "agent_speech_duration": "agentSpeechDuration",

            # STT metrics
            "stt_latency": "sttLatency",
            "stt_start_time": "sttStartTime",
            "stt_end_time": "sttEndTime",
            "stt_preflight_latency": "sttPreflightLatency",
            "stt_interim_latency": "sttInterimLatency",
            "stt_confidence": "sttConfidence",
            "stt_duration": "sttDuration",

            # LLM metrics
            "llm_duration": "llmDuration",
            "llm_start_time": "llmStartTime",
            "llm_end_time": "llmEndTime",
            "llm_ttft": "ttft",
            "prompt_tokens": "promptTokens",
            "completion_tokens": "completionTokens",
            "total_tokens": "totalTokens",
            "prompt_cached_tokens": "promptCachedTokens",
            "tokens_per_second": "tokensPerSecond",

            # TTS metrics
            "tts_start_time": "ttsStartTime",
            "tts_end_time": "ttsEndTime",
            "tts_duration": "ttsDuration",
            "tts_characters": "ttsCharacters",
            "ttfb": "ttfb",
            "tts_latency": "ttsLatency",

            # EOU metrics
            "eou_latency": "eouLatency",
            "eou_start_time": "eouStartTime",
            "eou_end_time": "eouEndTime",

            # Realtime (full s2s) metrics
            "realtime_ttfb": "realtimeTtfb",
            "realtime_input_tokens": "realtimeInputTokens",
            "realtime_total_tokens": "realtimeTotalTokens",
            "realtime_output_tokens": "realtimeOutputTokens",
            "realtime_input_text_tokens": "realtimeInputTextTokens",
            "realtime_input_audio_tokens": "realtimeInputAudioTokens",
            "realtime_input_image_tokens": "realtimeInputImageTokens",
            "realtime_input_cached_tokens": "realtimeInputCachedTokens",
            "realtime_thoughts_tokens": "realtimeThoughtsTokens",
            "realtime_cached_text_tokens": "realtimeCachedTextTokens",
            "realtime_cached_audio_tokens": "realtimeCachedAudioTokens",
            "realtime_cached_image_tokens": "realtimeCachedImageTokens",
            "realtime_output_text_tokens": "realtimeOutputTextTokens",
            "realtime_output_audio_tokens": "realtimeOutputAudioTokens",
            "realtime_output_image_tokens": "realtimeOutputImageTokens",

            "kb_id": "kbId",
            "kb_retrieval_latency": "kbRetrievalLatency",
            "kb_documents": "kbDocuments",
            "kb_scores": "kbScores",

            # Provider metrics
            "llm_provider_class": "llmProviderClass",
            "llm_model_name": "llmModelName",
            "realtime_provider_class": "realtimeProviderClass",
            "realtime_model_name": "realtimeModelName",
            "stt_provider_class": "sttProviderClass",
            "stt_model_name": "sttModelName",
            "tts_provider_class": "ttsProviderClass",
            "tts_model_name": "ttsModelName",

            # VAD metrics
            "vad_provider_class": "vadProviderClass",
            "vad_model_name": "vadModelName",

            # EOU metrics
            "eou_provider_class": "eouProviderClass",
            "eou_model_name": "eouModelName",

            # Other metrics
            "e2e_latency": "e2eLatency",
            "interrupted": "interrupted",
            "timestamp": "timestamp",
            "function_tools_called": "functionToolsCalled",
            "function_tool_timestamps": "functionToolTimestamps",
            "system_instructions": "systemInstructions",
            "handoff_occurred": "handOffOccurred",
            "is_a2a_enabled": "isA2aEnabled",
            "errors": "errors",
        }

        timeline_field_mapping = {
            "event_type": "eventType",
            "start_time": "startTime",
            "end_time": "endTime",
            "duration_ms": "durationInMs",
        }

        transformed: Dict[str, Any] = {}
        for key, value in data.items():
            camel_key = field_mapping.get(key, key)
            if key == "timeline" and isinstance(value, list):
                transformed_timeline = []
                for event in value:
                    transformed_event = {}
                    for ek, ev in event.items():
                        transformed_event[timeline_field_mapping.get(ek, ek)] = ev
                    transformed_timeline.append(transformed_event)
                transformed[camel_key] = transformed_timeline
            else:
                transformed[camel_key] = value

        return transformed

    def _remove_internal_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Remove internal-only fields from the analytics payload."""
        always_remove = [
            "errors",
            "functionToolTimestamps",
            "sttStartTime", "sttEndTime",
            "ttsStartTime", "ttsEndTime",
            "llmStartTime", "llmEndTime",
            "eouStartTime", "eouEndTime",
            "isA2aEnabled",
            "interactionId",
            "timestamp",
        ]

        if self.current_turn and not self.current_turn.is_a2a_enabled:
            always_remove.append("handOffOccurred")

        for f in always_remove:
            data.pop(f, None)

        return data

    def _remove_provider_fields(self, data: Dict[str, Any]) -> None:
        """Remove provider fields after first turn."""
        provider_fields = [
            "systemInstructions",
            # "llmProviderClass", "llmModelName",
            # "sttProviderClass", "sttModelName",
            # "ttsProviderClass", "ttsModelName",
            "vadProviderClass", "vadModelName",
            "eouProviderClass", "eouModelName",
        ]
        for f in provider_fields:
            data.pop(f, None)

    def _remove_negatives(self, obj: Any) -> Any:
        """Recursively clamp any numeric value < 0 to 0."""
        if isinstance(obj, dict):
            for k, v in obj.items():
                if isinstance(v, (int, float)) and v < 0:
                    obj[k] = 0
                elif isinstance(v, (dict, list)):
                    obj[k] = self._remove_negatives(v)
            return obj
        if isinstance(obj, list):
            for i, v in enumerate(obj):
                if isinstance(v, (int, float)) and v < 0:
                    obj[i] = 0
                elif isinstance(v, (dict, list)):
                    obj[i] = self._remove_negatives(v)
            return obj
        return obj

Single metrics collector for all pipeline modes.

Replaces both CascadingMetricsCollector and RealtimeMetricsCollector with one collector that uses the component-wise metrics schema.

Methods

def add_error(self, source: str, message: str) ‑> None
Expand source code
def add_error(self, source: str, message: str) -> None:
    """Add an error to the current turn."""
    if self.current_turn:
        now = time.perf_counter()
        self.current_turn.errors.append({
            "source": source,
            "message": str(message),
            "timestamp": now,
            "timestamp_perf": now,
        })

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={"errors": self.current_turn.errors}
            )

Add an error to the current turn.

def add_function_tool_call(self,
tool_name: str,
tool_params: Optional[Union[Dict[str, Any], list]] = None,
tool_response: Optional[Dict[str, Any]] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None) ‑> None
Expand source code
def add_function_tool_call(
    self,
    tool_name: str,
    tool_params: Optional[Union[Dict[str, Any], list]] = None,
    tool_response: Optional[Dict[str, Any]] = None,
    start_time: Optional[float] = None,
    end_time: Optional[float] = None,
) -> None:
    """Track a function tool call in the current turn."""
    if self.current_turn:
        self.current_turn.function_tools_called.append(tool_name)

        tool_metric = FunctionToolMetrics(
            tool_name=tool_name,
            tool_params=tool_params or {},
            tool_response=tool_response or {},
            start_time=start_time or time.perf_counter(),
            end_time=end_time,
        )
        if tool_metric.start_time and tool_metric.end_time:
            tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time)

        self.current_turn.function_tool_metrics.append(tool_metric)

        # Also track in function_tool_timestamps for backward compat
        tool_timestamp = {
            "tool_name": tool_name,
            "timestamp": start_time or time.perf_counter(),
            "readable_time": time.strftime("%H:%M:%S", time.localtime()),
        }
        self.current_turn.function_tool_timestamps.append(tool_timestamp)

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={"function_tool_timestamps": self.current_turn.function_tool_timestamps}
            )

Track a function tool call in the current turn.

def add_mcp_tool_call(self,
tool_type: str = 'local',
tool_url: Optional[str] = None,
tool_params: Optional[Union[Dict[str, Any], list]] = None,
tool_response: Optional[Dict[str, Any]] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None) ‑> None
Expand source code
def add_mcp_tool_call(
    self,
    tool_type: str = "local",
    tool_url: Optional[str] = None,
    tool_params: Optional[Union[Dict[str, Any], list]] = None,
    tool_response: Optional[Dict[str, Any]] = None,
    start_time: Optional[float] = None,
    end_time: Optional[float] = None,
) -> None:
    """Track an MCP tool call in the current turn."""
    if self.current_turn:
        tool_metric = McpToolMetrics(
            type=tool_type,
            tool_url=tool_url,
            tool_params=tool_params or {},
            tool_response=tool_response or {},
            start_time=start_time or time.perf_counter(),
            end_time=end_time,
        )
        if tool_metric.start_time and tool_metric.end_time:
            tool_metric.latency = self._round_latency(tool_metric.end_time - tool_metric.start_time)

        self.current_turn.mcp_tool_metrics.append(tool_metric)

Track an MCP tool call in the current turn.

def add_participant_metrics(self,
participant_id: str,
kind: Optional[str] = None,
sip_user: Optional[bool] = None,
join_time: Optional[float] = None,
meta: Optional[Dict[str, Any]] = None) ‑> None
Expand source code
def add_participant_metrics(
    self,
    participant_id: str,
    kind: Optional[str] = None,
    sip_user: Optional[bool] = None,
    join_time: Optional[float] = None,
    meta: Optional[Dict[str, Any]] = None,
) -> None:
    """Append a participant entry (agent or user) into session.participant_metrics."""
    self.session.participant_metrics.append(
        ParticipantMetrics(
            participant_id=participant_id,
            kind=kind,
            sip_user=sip_user,
            join_time=join_time,
            meta=meta,
        )
    )
    self.traces_flow_manager.participant_metrics.append(
        ParticipantMetrics(
            participant_id=participant_id,
            kind=kind,
            sip_user=sip_user,
            join_time=join_time,
        )
    )

Append a participant entry (agent or user) into session.participant_metrics.

def add_tts_characters(self, count: int) ‑> None
Expand source code
def add_tts_characters(self, count: int) -> None:
    """Add to the total character count for the current turn."""
    if self.current_turn:
        if not self.current_turn.tts_metrics:
            self.current_turn.tts_metrics.append(TtsMetrics())
        tts = self.current_turn.tts_metrics[-1]
        tts.tts_characters = (tts.tts_characters or 0) + count

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(metrics={"tts_characters": tts.tts_characters})

Add to the total character count for the current turn.

def complete_turn(self) ‑> None
Expand source code
def complete_turn(self) -> None:
    """Complete the current turn, calculate E2E, validate, serialize and send."""
    logger.info(f"[metrics] complete_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}")
    if not self.current_turn:
        logger.info(f"[metrics] complete_turn() early return — no current_turn")
        return
    self.current_turn.session_metrics = self.session
    # Calculate E2E latency
    self.current_turn.compute_e2e_latency()

    # Validate turn has meaningful data
    has_stt = bool(self.current_turn.stt_metrics and any(s.stt_latency is not None for s in self.current_turn.stt_metrics))
    has_llm = bool(self.current_turn.llm_metrics and any(l.llm_ttft is not None for l in self.current_turn.llm_metrics))
    has_tts = bool(self.current_turn.tts_metrics and any(t.tts_latency is not None or t.ttfb is not None for t in self.current_turn.tts_metrics))
    logger.info(f"[metrics] complete_turn() validation | stt={has_stt} llm={has_llm} tts={has_tts} | user_speech_start={self.current_turn.user_speech_start_time is not None} | e2e={self.current_turn.e2e_latency}")
    if not self._validate_turn(self.current_turn) and self._total_turns > 1:
        logger.warning(f"[metrics] complete_turn() DISCARDING turn — validation failed | total_turns={self._total_turns}")
        # Cache user start time for next turn
        if self.current_turn.user_speech_start_time is not None:
            if (self._pending_user_start_time is None or
                    self.current_turn.user_speech_start_time < self._pending_user_start_time):
                self._pending_user_start_time = self.current_turn.user_speech_start_time
                logger.info(f"[metrics] Caching earliest user start: {self._pending_user_start_time}")
        self.current_turn = None
        return

    # If this turn was interrupted, buffer the interrupting user's metrics
    # (STT, EOU, VAD, timeline) for carry-over to the next turn.
    if self.current_turn.is_interrupted:
        # Buffer STT metrics from interrupting speech
        if len(self.current_turn.stt_metrics) >= 2:
            self._pending_interrupt_stt = self.current_turn.stt_metrics.pop()
            logger.info(f"[metrics] complete_turn() buffering interrupt STT for next turn: {self._pending_interrupt_stt.stt_transcript}")

        # Buffer EOU metrics from interrupting speech
        if len(self.current_turn.eou_metrics) >= 2:
            self._pending_interrupt_eou = self.current_turn.eou_metrics.pop()
            logger.info(f"[metrics] complete_turn() buffering interrupt EOU for next turn: {self._pending_interrupt_eou.eou_latency}")

        # Buffer VAD metrics from interrupting speech and construct timeline from it
        if len(self.current_turn.vad_metrics) >= 2:
            self._pending_interrupt_vad = self.current_turn.vad_metrics.pop()
            # Build the pending timeline from the buffered VAD data
            self._pending_interrupt_timeline = {
                "start_time": self._pending_interrupt_vad.user_speech_start_time,
                "end_time": self._pending_interrupt_vad.user_speech_end_time,
                "duration_ms": (
                    round((self._pending_interrupt_vad.user_speech_end_time - self._pending_interrupt_vad.user_speech_start_time) * 1000, 4)
                    if self._pending_interrupt_vad.user_speech_start_time and self._pending_interrupt_vad.user_speech_end_time
                    else None
                ),
                "text": None,  # Will be set by set_user_transcript in the next turn
            }
            logger.info(f"[metrics] complete_turn() buffering interrupt VAD + timeline for next turn")

    # Send trace
    if self.traces_flow_manager:
        self.traces_flow_manager.create_unified_turn_trace(self.current_turn, self.session)

    self.turns.append(self.current_turn)

    # Send to playground
    if self.playground and self.playground_manager:
        self.playground_manager.send_cascading_metrics(metrics=self.current_turn, full_turn_data=True)

    # Serialize and send analytics
    interaction_data = self._serialize_turn(self.current_turn)
    transformed_data = self._transform_to_camel_case(interaction_data)
    transformed_data = self._remove_internal_fields(transformed_data)

    if len(self.turns) > 1:
        self._remove_provider_fields(transformed_data)

    transformed_data = self._remove_negatives(transformed_data)
    
    global_event_emitter.emit("TURN_METRICS_ADDED", {"metrics": transformed_data})

    self.analytics_client.send_interaction_analytics_safe(transformed_data)

    self.current_turn = None
    self._pending_user_start_time = None

    # Reset transient timing state so stale values don't leak into the next turn
    self._stt_start_time = None
    self._is_user_speaking = False

Complete the current turn, calculate E2E, validate, serialize and send.

def configure_pipeline(self,
pipeline_mode: Any,
realtime_mode: Any = None,
active_components: Any = None) ‑> None
Expand source code
def configure_pipeline(
    self,
    pipeline_mode: Any,
    realtime_mode: Any = None,
    active_components: Any = None,
) -> None:
    """Configure the collector with pipeline information.

    Args:
        pipeline_mode: PipelineMode enum value
        realtime_mode: RealtimeMode enum value (optional)
        active_components: frozenset of PipelineComponent (optional)
    """
    self.pipeline_mode = pipeline_mode.value if hasattr(pipeline_mode, "value") else str(pipeline_mode)
    self.realtime_mode = realtime_mode.value if realtime_mode and hasattr(realtime_mode, "value") else (str(realtime_mode) if realtime_mode else None)
    self.active_components = active_components

    self.session.pipeline_mode = self.pipeline_mode
    self.session.realtime_mode = self.realtime_mode

    if active_components:
        self.session.components = sorted(
            c.value if hasattr(c, "value") else str(c) for c in active_components
        )

    logger.info(f"[metrics] Pipeline configured: mode={self.pipeline_mode}, realtime={self.realtime_mode}")

Configure the collector with pipeline information.

Args

pipeline_mode
PipelineMode enum value
realtime_mode
RealtimeMode enum value (optional)
active_components
frozenset of PipelineComponent (optional)
def finalize_session(self) ‑> None
Expand source code
def finalize_session(self) -> None:
    """Finalize the session, completing any in-progress turn."""
    if self.current_turn:
        self.complete_turn()
    self.session.session_end_time = time.time()

Finalize the session, completing any in-progress turn.

def on_agent_speech_end(self) ‑> None
Expand source code
def on_agent_speech_end(self) -> None:
    """Called when agent stops speaking."""
    logger.info(f"[metrics] on_agent_speech_end() called | current_turn={'exists' if self.current_turn else 'None'} | _tts_start={self._tts_start_time is not None}")
    self._is_agent_speaking = False
    agent_speech_end_time = time.perf_counter()

    if self.current_turn:
        self._end_timeline_event("agent_speech", agent_speech_end_time)
        self.current_turn.agent_speech_end_time = agent_speech_end_time

    # Calculate TTS latency from first_byte - start
    if self._tts_start_time and self._tts_first_byte_time:
        total_tts_latency = self._tts_first_byte_time - self._tts_start_time
        if self.current_turn and self.current_turn.agent_speech_start_time:
            if self.current_turn.tts_metrics:
                tts = self.current_turn.tts_metrics[-1]
                tts.tts_latency = self._round_latency(total_tts_latency)

            self.current_turn.agent_speech_duration = self._round_latency(
                agent_speech_end_time - self.current_turn.agent_speech_start_time
            )

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"tts_latency": self.current_turn.tts_metrics[-1].tts_latency if self.current_turn.tts_metrics else None}
                )
                self.playground_manager.send_cascading_metrics(
                    metrics={"agent_speech_duration": self.current_turn.agent_speech_duration}
                )

        self._tts_start_time = None
        self._tts_first_byte_time = None
    elif self._tts_start_time:
        self._tts_start_time = None
        self._tts_first_byte_time = None

Called when agent stops speaking.

def on_agent_speech_start(self) ‑> None
Expand source code
def on_agent_speech_start(self) -> None:
    """Called when agent starts speaking (actual audio output)."""
    if not self.current_turn:
        self.start_turn()
    self._is_agent_speaking = True
    self._agent_speech_start_time = time.perf_counter()

    if self.current_turn:
        self.current_turn.agent_speech_start_time = self._agent_speech_start_time
        if not any(
            ev.event_type == "agent_speech" and ev.end_time is None
            for ev in self.current_turn.timeline_event_metrics
        ):
            self._start_timeline_event("agent_speech", self._agent_speech_start_time)

Called when agent starts speaking (actual audio output).

def on_background_audio_start(self, file_path: Optional[str] = None, looping: bool = False) ‑> None
Expand source code
def on_background_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None:
    """Called when background audio playback starts."""
    now = time.perf_counter()
    self._background_audio_start_time = now
    file_name = os.path.basename(file_path) if file_path else None
    if self.current_turn:
        self.current_turn.background_audio_file_path = file_name
        self.current_turn.background_audio_looping = looping
        self._start_timeline_event("background_audio", now)
    if self.traces_flow_manager:
        self.traces_flow_manager.create_background_audio_start_span(
            file_path=file_name, looping=looping, start_time=now
        )
    logger.info(f"[metrics] background audio started | file={file_name} | looping={looping}")

Called when background audio playback starts.

def on_background_audio_stop(self) ‑> None
Expand source code
def on_background_audio_stop(self) -> None:
    """Called when background audio playback stops."""
    now = time.perf_counter()
    self._background_audio_start_time = None
    if self.current_turn:
        self._end_timeline_event("background_audio", now)
    if self.traces_flow_manager:
        self.traces_flow_manager.create_background_audio_stop_span(end_time=now)
    logger.info("[metrics] background audio stopped")

Called when background audio playback stops.

def on_eou_complete(self) ‑> None
Expand source code
def on_eou_complete(self) -> None:
    """Called when EOU processing completes."""
    if self._eou_start_time:
        eou_end_time = time.perf_counter()
        eou_latency = self._round_latency(eou_end_time - self._eou_start_time)

        if self.current_turn:
            if not self.current_turn.eou_metrics:
                self.current_turn.eou_metrics.append(EouMetrics())
            eou = self.current_turn.eou_metrics[-1]
            eou.eou_end_time = eou_end_time
            eou.eou_latency = eou_latency
            logger.info(f"eou latency: {eou_latency}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(metrics={"eou_latency": eou_latency})

        self._eou_start_time = None

Called when EOU processing completes.

def on_eou_start(self) ‑> None
Expand source code
def on_eou_start(self) -> None:
    """Called when EOU processing starts."""
    if not self.current_turn:
        self.start_turn()
    self._eou_start_time = time.perf_counter()
    if self.current_turn:
        # Create new entry if none exists or last entry is already complete
        if not self.current_turn.eou_metrics or self.current_turn.eou_metrics[-1].eou_latency is not None:
            self.current_turn.eou_metrics.append(EouMetrics())
        self.current_turn.eou_metrics[-1].eou_start_time = self._eou_start_time

Called when EOU processing starts.

def on_fallback_event(self, event_data: Dict[str, Any]) ‑> None
Expand source code
def on_fallback_event(self, event_data: Dict[str, Any]) -> None:
    """Record a fallback event for the current turn"""
    if not self.current_turn:
        self.start_turn()
    if self.current_turn:
        fallback_event = FallbackEvent(
            component_type=event_data.get("component_type", ""),
            temporary_disable_sec=event_data.get("temporary_disable_sec", 0),
            permanent_disable_after_attempts=event_data.get("permanent_disable_after_attempts", 0),
            recovery_attempt=event_data.get("recovery_attempt", 0),
            message=event_data.get("message", ""),
            start_time=event_data.get("start_time"),
            end_time=event_data.get("end_time"),
            duration_ms=event_data.get("duration_ms"),
            original_provider_label=event_data.get("original_provider_label"),
            original_connection_start=event_data.get("original_connection_start"),
            original_connection_end=event_data.get("original_connection_end"),
            original_connection_duration_ms=event_data.get("original_connection_duration_ms"),
            new_provider_label=event_data.get("new_provider_label"),
            new_connection_start=event_data.get("new_connection_start"),
            new_connection_end=event_data.get("new_connection_end"),
            new_connection_duration_ms=event_data.get("new_connection_duration_ms"),
            is_recovery=event_data.get("is_recovery", False),
        )
        self.current_turn.fallback_events.append(fallback_event)
        logger.info(f"Fallback event recorded: {event_data.get('component_type')} - {event_data.get('message')}")

Record a fallback event for the current turn

def on_false_interrupt_escalated(self, word_count: Optional[int] = None) ‑> None
Expand source code
def on_false_interrupt_escalated(self, word_count: Optional[int] = None) -> None:
    """Called when a false interrupt escalates to a real interrupt."""
    if self.current_turn and self.current_turn.interruption_metrics:
        im = self.current_turn.interruption_metrics
        im.is_false_interrupt = False
        if word_count is not None:
            im.false_interrupt_words = word_count

Called when a false interrupt escalates to a real interrupt.

def on_false_interrupt_resume(self) ‑> None
Expand source code
def on_false_interrupt_resume(self) -> None:
    """Called when agent resumes after a false interrupt."""
    if self.current_turn and self.current_turn.interruption_metrics:
        im = self.current_turn.interruption_metrics
        im.resumed_after_false_interrupt = True
        im.resume_on_false_interrupt = True
        im.false_interrupt_end_time = time.perf_counter()
        if im.false_interrupt_start_time:
            im.false_interrupt_duration = self._round_latency(
                im.false_interrupt_end_time - im.false_interrupt_start_time
            )

Called when agent resumes after a false interrupt.

def on_false_interrupt_start(self, pause_duration: Optional[float] = None) ‑> None
Expand source code
def on_false_interrupt_start(self, pause_duration: Optional[float] = None) -> None:
    """Called when a false interrupt is detected."""
    if self.current_turn:
        if not self.current_turn.interruption_metrics:
            self.current_turn.interruption_metrics = InterruptionMetrics()
        im = self.current_turn.interruption_metrics
        im.is_false_interrupt = True
        im.false_interrupt_start_time = time.perf_counter()
        if pause_duration is not None:
            im.false_interrupt_pause_duration = pause_duration

Called when a false interrupt is detected.

def on_interrupted(self) ‑> None
Expand source code
def on_interrupted(self) -> None:
    """Called when user interrupts the agent."""
    if self._is_agent_speaking:
        self._total_interruptions += 1
        if self.current_turn:
            self.current_turn.is_interrupted = True
            logger.info(f"User interrupted the agent. Total interruptions: {self._total_interruptions}")

        if self.playground and self.playground_manager and self.current_turn:
            self.playground_manager.send_cascading_metrics(
                metrics={"interrupted": self.current_turn.is_interrupted}
            )

Called when user interrupts the agent.

def on_knowledge_base_complete(self, documents: List[str], scores: List[float]) ‑> None
Expand source code
def on_knowledge_base_complete(self, documents: List[str], scores: List[float]) -> None:
    """Called when knowledge base processing completes."""
    if self.current_turn and self.current_turn.kb_metrics:
        kb = self.current_turn.kb_metrics[-1]
        kb.kb_documents = documents
        kb.kb_scores = scores
        kb.kb_end_time = time.perf_counter()

        if kb.kb_start_time:
            kb.kb_retrieval_latency = self._round_latency(kb.kb_end_time - kb.kb_start_time)
            logger.info(f"kb retrieval latency: {kb.kb_retrieval_latency}ms")

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={
                    "kb_id": kb.kb_id,
                    "kb_retrieval_latency": kb.kb_retrieval_latency,
                    "kb_documents": kb.kb_documents,
                    "kb_scores": kb.kb_scores,
                }
            )

Called when knowledge base processing completes.

def on_knowledge_base_start(self, kb_id: Optional[str] = None) ‑> None
Expand source code
def on_knowledge_base_start(self, kb_id: Optional[str] = None) -> None:
    """Called when knowledge base processing starts."""
    if self.current_turn:
        kb_metric = KbMetrics(
            kb_id=kb_id,
            kb_start_time=time.perf_counter(),
        )
        self.current_turn.kb_metrics.append(kb_metric)

Called when knowledge base processing starts.

def on_llm_complete(self) ‑> None
Expand source code
def on_llm_complete(self) -> None:
    """Called when LLM processing completes."""
    if self._llm_start_time:
        llm_end_time = time.perf_counter()
        llm_duration = self._round_latency(llm_end_time - self._llm_start_time)

        if self.current_turn and self.current_turn.llm_metrics:
            llm = self.current_turn.llm_metrics[-1]
            llm.llm_end_time = llm_end_time
            llm.llm_duration = llm_duration
            logger.info(f"llm duration: {llm_duration}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(metrics={"llm_duration": llm_duration})

        self._llm_start_time = None

Called when LLM processing completes.

def on_llm_first_token(self) ‑> None
Expand source code
def on_llm_first_token(self) -> None:
    """Called when first LLM token is received."""
    if self.current_turn and self.current_turn.llm_metrics:
        llm = self.current_turn.llm_metrics[-1]
        if llm.llm_start_time:
            llm.llm_first_token_time = time.perf_counter()
            llm.llm_ttft = self._round_latency(llm.llm_first_token_time - llm.llm_start_time)
            logger.info(f"llm ttft: {llm.llm_ttft}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(metrics={"llm_ttft": llm.llm_ttft})

Called when first LLM token is received.

def on_llm_start(self) ‑> None
Expand source code
def on_llm_start(self) -> None:
    """Called when LLM processing starts."""
    self._llm_start_time = time.perf_counter()
    if self.current_turn:
        if not self.current_turn.llm_metrics:
            self.current_turn.llm_metrics.append(LlmMetrics())
        self.current_turn.llm_metrics[-1].llm_start_time = self._llm_start_time

Called when LLM processing starts.

def on_stt_complete(self, transcript: str = '', duration: float = 0.0, confidence: float = 0.0) ‑> None
Expand source code
def on_stt_complete(self, transcript: str = "", duration: float = 0.0, confidence: float = 0.0) -> None:
    """Called when STT processing completes."""
    if self.current_turn and self.current_turn.stt_metrics:
        stt = self.current_turn.stt_metrics[-1]
        if stt.stt_preemptive_generation_enabled and stt.stt_preemptive_generation_occurred:
            logger.info("STT preemptive generation occurred, skipping stt complete")
            return

    if not self.current_turn:
        return

    if not self.current_turn.stt_metrics:
        self.current_turn.stt_metrics.append(SttMetrics())
    stt = self.current_turn.stt_metrics[-1]

    stt_end_time = time.perf_counter()
    stt.stt_end_time = stt_end_time

    if self._stt_start_time:
        stt_latency = self._round_latency(stt_end_time - self._stt_start_time)
        stt.stt_latency = stt_latency
        stt.stt_confidence = confidence
        stt.stt_duration = duration
        logger.info(f"stt latency: {stt_latency}ms | stt confidence: {confidence} | stt duration: {duration}ms")

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(metrics={"stt_latency": stt_latency})

    if transcript:
        stt.stt_transcript = transcript

    self._stt_start_time = None

Called when STT processing completes.

def on_stt_interim_end(self) ‑> None
Expand source code
def on_stt_interim_end(self) -> None:
    """Called when STT interim event is received."""
    if self.current_turn and self.current_turn.stt_metrics:
        stt = self.current_turn.stt_metrics[-1]
        if stt.stt_start_time and not stt.stt_interim_latency:
            stt.stt_interim_end_time = time.perf_counter()
            stt.stt_interim_latency = self._round_latency(
                stt.stt_interim_end_time - stt.stt_start_time
            )
            logger.info(f"stt interim latency: {stt.stt_interim_latency}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"stt_interim_latency": stt.stt_interim_latency}
                )

Called when STT interim event is received.

def on_stt_preflight_end(self, transcript: str = '') ‑> None
Expand source code
def on_stt_preflight_end(self, transcript: str = "") -> None:
    """Called when STT preflight event is received."""
    if self.current_turn and self.current_turn.stt_metrics:
        stt = self.current_turn.stt_metrics[-1]
        if stt.stt_start_time:
            stt.stt_preflight_end_time = time.perf_counter()
            stt.stt_preflight_latency = self._round_latency(
                stt.stt_preflight_end_time - stt.stt_start_time
            )
            stt.stt_preflight_transcript = transcript
            logger.info(f"stt preflight latency: {stt.stt_preflight_latency}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(
                    metrics={"stt_preflight_latency": stt.stt_preflight_latency}
                )

Called when STT preflight event is received.

def on_stt_start(self) ‑> None
Expand source code
def on_stt_start(self) -> None:
    """Called when STT processing starts."""
    self._stt_start_time = time.perf_counter()
    if self.current_turn:
        logger.info(f"[metrics] on_stt_start() called | current_turn={'exists' if self.current_turn else 'None'}")
        # Create new entry if none exists or last entry is already complete
        if not self.current_turn.stt_metrics or self.current_turn.stt_metrics[-1].stt_latency is not None:
            self.current_turn.stt_metrics.append(SttMetrics())
        stt = self.current_turn.stt_metrics[-1]
        stt.stt_start_time = self._stt_start_time

Called when STT processing starts.

def on_thinking_audio_start(self, file_path: Optional[str] = None, looping: bool = False) ‑> None
Expand source code
def on_thinking_audio_start(self, file_path: Optional[str] = None, looping: bool = False) -> None:
    """Called when thinking audio playback starts."""
    now = time.perf_counter()
    self._thinking_audio_start_time = now
    file_name = os.path.basename(file_path) if file_path else None
    if self.current_turn:
        self.current_turn.thinking_audio_file_path = file_name
        self.current_turn.thinking_audio_looping = looping
        self._start_timeline_event("thinking_audio", now)
    logger.info(f"[metrics] thinking audio started | file={file_name} | looping={looping}")

Called when thinking audio playback starts.

def on_thinking_audio_stop(self) ‑> None
Expand source code
def on_thinking_audio_stop(self) -> None:
    """Called when thinking audio playback stops."""
    now = time.perf_counter()
    self._thinking_audio_start_time = None
    if self.current_turn:
        self._end_timeline_event("thinking_audio", now)
    logger.info("[metrics] thinking audio stopped")

Called when thinking audio playback stops.

def on_tts_first_byte(self) ‑> None
Expand source code
def on_tts_first_byte(self) -> None:
    """Called when TTS produces first audio byte."""
    if self._tts_start_time:
        now = time.perf_counter()
        if self.current_turn and self.current_turn.tts_metrics:
            tts = self.current_turn.tts_metrics[-1]
            tts.tts_end_time = now
            tts.ttfb = self._round_latency(now - self._tts_start_time)
            tts.tts_first_byte_time = now
            logger.info(f"tts ttfb: {tts.ttfb}ms")

            if self.playground and self.playground_manager:
                self.playground_manager.send_cascading_metrics(metrics={"ttfb": tts.ttfb})

        self._tts_first_byte_time = now

Called when TTS produces first audio byte.

def on_tts_start(self) ‑> None
Expand source code
def on_tts_start(self) -> None:
    """Called when TTS processing starts."""
    if not self.current_turn:
        self.start_turn()
    self._tts_start_time = time.perf_counter()
    self._tts_first_byte_time = None
    if self.current_turn:
        if not self.current_turn.tts_metrics:
            self.current_turn.tts_metrics.append(TtsMetrics())
        self.current_turn.tts_metrics[-1].tts_start_time = self._tts_start_time

Called when TTS processing starts.

def on_user_speech_end(self) ‑> None
Expand source code
def on_user_speech_end(self) -> None:
    """Called when user stops speaking (VAD end)."""
    logger.info(f"[metrics] on_user_speech_end() called | current_turn={'exists' if self.current_turn else 'None'}")
    self._is_user_speaking = False
    self._user_speech_end_time = time.perf_counter()

    if self.current_turn and self.current_turn.user_speech_start_time:
        self.current_turn.user_speech_end_time = self._user_speech_end_time
        logger.info(f"[DEBUG_END] on_user_speech_end() | setting user_speech_end_time={self._user_speech_end_time}")
        self.current_turn.user_speech_duration = self._round_latency(
            self.current_turn.user_speech_end_time - self.current_turn.user_speech_start_time
        )
        self._end_timeline_event("user_speech", self._user_speech_end_time)
        
        if not self.current_turn.vad_metrics:
            self.current_turn.vad_metrics.append(VadMetrics())
        self.current_turn.vad_metrics[-1].user_speech_end_time = self._user_speech_end_time

        logger.info(f"user speech duration: {self.current_turn.user_speech_duration}ms")

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={"user_speech_duration": self.current_turn.user_speech_duration}
            )

Called when user stops speaking (VAD end).

def on_user_speech_start(self) ‑> None
Expand source code
def on_user_speech_start(self) -> None:
    """Called when user starts speaking (VAD start)."""
    if not self.current_turn:
        self.start_turn()

    logger.info(f"[metrics] on_user_speech_start() called | _is_user_speaking={self._is_user_speaking} | current_turn={'exists' if self.current_turn else 'None'}")
    if self._is_user_speaking:
        logger.info(f"[metrics] on_user_speech_start() early return — already speaking")
        return

    self._is_user_speaking = True
    self._user_input_start_time = time.perf_counter()

    if self.current_turn:
        if self.current_turn.user_speech_start_time is None:
            logger.info(f"[DEBUG_START] on_user_speech_start() | setting user_speech_start_time={self._user_input_start_time}")
            self.current_turn.user_speech_start_time = self._user_input_start_time

        if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics):
            self._start_timeline_event("user_speech", self._user_input_start_time)

        # Create new entry if none exists or last entry is already complete
        if not self.current_turn.vad_metrics or self.current_turn.vad_metrics[-1].user_speech_end_time is not None:
            self.current_turn.vad_metrics.append(VadMetrics())
        self.current_turn.vad_metrics[-1].user_speech_start_time = self._user_input_start_time

Called when user starts speaking (VAD start).

def on_wait_for_additional_speech(self, duration: float, eou_probability: float)
Expand source code
def on_wait_for_additional_speech(self, duration: float, eou_probability: float):
    if self.current_turn:
        if not self.current_turn.eou_metrics:
            self.current_turn.eou_metrics.append(EouMetrics())
        eou = self.current_turn.eou_metrics[-1]
        eou.wait_for_additional_speech_duration = duration
        eou.eou_probability = eou_probability
        eou.waited_for_additional_speech = True
        logger.info(f"wait for additional speech duration: {duration}ms")
        logger.info(f"wait for additional speech eou probability: {eou_probability}")
def schedule_turn_complete(self, timeout: float = 1.0) ‑> None
Expand source code
def schedule_turn_complete(self, timeout: float = 1.0) -> None:
    """Schedule turn completion after a timeout (for realtime modes)."""
    if self._agent_speech_end_timer:
        self._agent_speech_end_timer.cancel()

    try:
        loop = asyncio.get_event_loop()
        self._agent_speech_end_timer = loop.call_later(timeout, self._finalize_realtime_turn)
    except RuntimeError:
        # No event loop available, complete immediately
        self.complete_turn()

Schedule turn completion after a timeout (for realtime modes).

def set_a2a_handoff(self) ‑> None
Expand source code
def set_a2a_handoff(self) -> None:
    """Set the A2A enabled and handoff occurred flags."""
    if self.current_turn:
        self.current_turn.is_a2a_enabled = True
        self.current_turn.handoff_occurred = True

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={"handoff_occurred": self.current_turn.handoff_occurred}
            )

Set the A2A enabled and handoff occurred flags.

def set_agent_response(self, response: str) ‑> None
Expand source code
def set_agent_response(self, response: str) -> None:
    """Set the agent response for the current turn."""
    if not self.current_turn:
        self.start_turn()
    self.current_turn.agent_speech = response
    logger.info(f"agent output speech: {response}")
    global_event_emitter.emit("AGENT_TRANSCRIPT_ADDED", {"text": response})

    if not any(ev.event_type == "agent_speech" for ev in self.current_turn.timeline_event_metrics):
        current_time = time.perf_counter()
        self._start_timeline_event("agent_speech", current_time)

    self._update_timeline_event_text("agent_speech", response)

    if self.playground and self.playground_manager:
        self.playground_manager.send_cascading_metrics(
            metrics={"agent_speech": self.current_turn.agent_speech}
        )

Set the agent response for the current turn.

def set_eou_config(self, eou_config: Any) ‑> None
Expand source code
def set_eou_config(self, eou_config: Any) -> None:
    """Store EOU config on session for later use (internal tracking, not sent)."""
    self.session.eou_config = self._eou_config_to_dict(eou_config)

Store EOU config on session for later use (internal tracking, not sent).

def set_interrupt_config(self, interrupt_config: Any) ‑> None
Expand source code
def set_interrupt_config(self, interrupt_config: Any) -> None:
    """Store Interrupt config on session for later use (internal tracking, not sent)."""
    self.session.interrupt_config = self._interrupt_config_to_dict(interrupt_config)

Store Interrupt config on session for later use (internal tracking, not sent).

def set_llm_input(self, text: str) ‑> None
Expand source code
def set_llm_input(self, text: str) -> None:
    """Record the actual text sent to LLM."""
    if self.current_turn:
        if not self.current_turn.llm_metrics:
            self.current_turn.llm_metrics.append(LlmMetrics())
        self.current_turn.llm_metrics[-1].llm_input = text

Record the actual text sent to LLM.

def set_llm_usage(self, usage: Dict[str, Any]) ‑> None
Expand source code
def set_llm_usage(self, usage: Dict[str, Any]) -> None:
    """Set LLM token usage and calculate TPS."""
    if not self.current_turn or not usage:
        return

    if not self.current_turn.llm_metrics:
        self.current_turn.llm_metrics.append(LlmMetrics())

    llm = self.current_turn.llm_metrics[-1]
    llm.prompt_tokens = usage.get("prompt_tokens")
    llm.completion_tokens = usage.get("completion_tokens")
    llm.total_tokens = usage.get("total_tokens")
    llm.prompt_cached_tokens = usage.get("prompt_cached_tokens")

    if llm.llm_duration and llm.llm_duration > 0 and llm.completion_tokens and llm.completion_tokens > 0:
        latency_seconds = llm.llm_duration / 1000
        llm.tokens_per_second = round(llm.completion_tokens / latency_seconds, 2)

    if self.playground and self.playground_manager:
        self.playground_manager.send_cascading_metrics(metrics={
            "prompt_tokens": llm.prompt_tokens,
            "completion_tokens": llm.completion_tokens,
            "total_tokens": llm.total_tokens,
            "tokens_per_second": llm.tokens_per_second,
        })

Set LLM token usage and calculate TPS.

def set_playground_manager(self, manager: Optional[PlaygroundManager]) ‑> None
Expand source code
def set_playground_manager(self, manager: Optional[PlaygroundManager]) -> None:
    """Set the PlaygroundManager instance."""
    self.playground = True
    self.playground_manager = manager

Set the PlaygroundManager instance.

def set_preemptive_generation_enabled(self, enabled: bool) ‑> None
Expand source code
def set_preemptive_generation_enabled(self, enabled: bool) -> None:
    """Mark preemptive generation as enabled for current STT."""
    self.preemtive_generation_enabled = enabled

Mark preemptive generation as enabled for current STT.

def set_provider_info(self, component_type: str, provider_class: str, model_name: str) ‑> None
Expand source code
def set_provider_info(self, component_type: str, provider_class: str, model_name: str) -> None:
    """Set provider info for a specific component.

    Args:
        component_type: e.g. "stt", "llm", "tts", "vad", "eou", "realtime"
        provider_class: class name of the provider
        model_name: model identifier
    """
    self.session.provider_per_component[component_type] = {
        "provider_class": provider_class,
        "model_name": model_name,
    }

Set provider info for a specific component.

Args

component_type
e.g. "stt", "llm", "tts", "vad", "eou", "realtime"
provider_class
class name of the provider
model_name
model identifier
def set_realtime_model_error(self, error: Dict[str, Any]) ‑> None
Expand source code
def set_realtime_model_error(self, error: Dict[str, Any]) -> None:
    """Track a realtime model error."""
    if self.current_turn:
        logger.error(f"realtime model error: {error}")
        now = time.perf_counter()
        self.current_turn.errors.append({
            "source": "REALTIME_MODEL",
            "message": str(error.get("message", "Unknown error")),
            "timestamp": now,
            "timestamp_perf": now,
        })

Track a realtime model error.

def set_realtime_usage(self, usage: Dict[str, Any]) ‑> None
Expand source code
def set_realtime_usage(self, usage: Dict[str, Any]) -> None:
    """Set realtime model token usage from a flat dict (input_tokens, total_tokens, cached_*, thoughts_tokens, etc.)."""
    if not self.current_turn or not usage:
        return

    if not self.current_turn.realtime_metrics:
        self.current_turn.realtime_metrics.append(RealtimeMetrics())

    rt = self.current_turn.realtime_metrics[-1]
    rt.realtime_input_tokens = usage.get("input_tokens")
    rt.realtime_output_tokens = usage.get("output_tokens")
    rt.realtime_total_tokens = usage.get("total_tokens")
    rt.realtime_input_text_tokens = usage.get("input_text_tokens")
    rt.realtime_input_audio_tokens = usage.get("input_audio_tokens")
    rt.realtime_input_image_tokens = usage.get("input_image_tokens")
    rt.realtime_input_cached_tokens = usage.get("input_cached_tokens")
    rt.realtime_thoughts_tokens = usage.get("thoughts_tokens")
    rt.realtime_cached_text_tokens = usage.get("cached_text_tokens")
    rt.realtime_cached_audio_tokens = usage.get("cached_audio_tokens")
    rt.realtime_cached_image_tokens = usage.get("cached_image_tokens")
    rt.realtime_output_text_tokens = usage.get("output_text_tokens")
    rt.realtime_output_audio_tokens = usage.get("output_audio_tokens")
    rt.realtime_output_image_tokens = usage.get("output_image_tokens")

Set realtime model token usage from a flat dict (input_tokens, total_tokens, cached_*, thoughts_tokens, etc.).

def set_session_id(self, session_id: str) ‑> None
Expand source code
def set_session_id(self, session_id: str) -> None:
    """Set the session ID for metrics tracking."""
    self.session.session_id = session_id
    self.analytics_client.set_session_id(session_id)

Set the session ID for metrics tracking.

def set_stt_usage(self, input_tokens: int = 0, output_tokens: int = 0, total_tokens: int = 0) ‑> None
Expand source code
def set_stt_usage(self, input_tokens: int = 0, output_tokens: int = 0, total_tokens: int = 0) -> None:
    """Set STT token usage."""
    if self.current_turn:
        if not self.current_turn.stt_metrics:
            self.current_turn.stt_metrics.append(SttMetrics())
        stt = self.current_turn.stt_metrics[-1]
        stt.stt_input_tokens = input_tokens
        stt.stt_output_tokens = output_tokens
        stt.stt_total_tokens = total_tokens

Set STT token usage.

def set_system_instructions(self, instructions: str) ‑> None
Expand source code
def set_system_instructions(self, instructions: str) -> None:
    """Set the system instructions for this session."""
    self.session.system_instruction = instructions

Set the system instructions for this session.

def set_traces_flow_manager(self,
manager: TracesFlowManager) ‑> None
Expand source code
def set_traces_flow_manager(self, manager: TracesFlowManager) -> None:
    """Set the TracesFlowManager instance."""
    self.traces_flow_manager = manager

Set the TracesFlowManager instance.

def set_user_transcript(self, transcript: str) ‑> None
Expand source code
def set_user_transcript(self, transcript: str) -> None:
    """Set the user transcript for the current turn."""
    if self.current_turn:
        # Guard: skip if the turn already has user_speech AND the pipeline
        # is actively processing (agent speaking OR LLM/TTS already started).
        # This prevents the interrupting speech's transcript from overwriting
        # the original transcript of the current turn.
        pipeline_active = (
            self._is_agent_speaking
            or bool(self.current_turn.llm_metrics)
            or bool(self.current_turn.tts_metrics)
        )
        if pipeline_active and self.current_turn.user_speech:
            logger.info(
                f"[metrics] Skipping set_user_transcript — pipeline active "
                f"and turn already has user_speech, new transcript "
                f"belongs to the next turn: {transcript}"
            )
            return

        self.current_turn.user_speech = transcript
        logger.info(f"user input speech: {transcript}")
        global_event_emitter.emit("USER_TRANSCRIPT_ADDED", {"text": transcript})

        # Update timeline
        user_events = [
            ev for ev in self.current_turn.timeline_event_metrics if ev.event_type == "user_speech"
        ]
        if user_events:
            user_events[-1].text = transcript
        else:
            current_time = time.perf_counter()
            self._start_timeline_event("user_speech", current_time)
            if self.current_turn.timeline_event_metrics:
                self.current_turn.timeline_event_metrics[-1].text = transcript

        if self.playground and self.playground_manager:
            self.playground_manager.send_cascading_metrics(
                metrics={"user_speech": self.current_turn.user_speech}
            )

Set the user transcript for the current turn.

def set_vad_config(self, vad_config: Dict[str, Any]) ‑> None
Expand source code
def set_vad_config(self, vad_config: Dict[str, Any]) -> None:
    """Set VAD configuration."""
    if not self.current_turn.vad_metrics:
        self.current_turn.vad_metrics.append(VadMetrics())

    vad = self.current_turn.vad_metrics[-1]
    vad.vad_threshold = vad_config.get("threshold")
    vad.vad_min_speech_duration = vad_config.get("min_speech_duration")
    vad.vad_min_silence_duration = vad_config.get("min_silence_duration")

Set VAD configuration.

def start_turn(self) ‑> None
Expand source code
def start_turn(self) -> None:
    """Start a new turn. Completes any existing turn first."""
    logger.info(f"[metrics] start_turn() called | current_turn={'exists' if self.current_turn else 'None'} | total_turns={self._total_turns}")
    if self.current_turn:
        logger.info(f"[metrics] start_turn() completing existing turn before creating new one")
        self.complete_turn()

    self._total_turns += 1
    self.current_turn = TurnMetrics(turn_id=self._generate_turn_id(), preemtive_generation_enabled=self.preemtive_generation_enabled)

    # Carry over pending user start time from a previously discarded turn
    if self._pending_user_start_time is not None:
        self.current_turn.user_speech_start_time = self._pending_user_start_time
        self._start_timeline_event("user_speech", self._pending_user_start_time)

    # Carry over buffered data from an interrupted turn
    if self._pending_interrupt_timeline is not None:
        tl = self._pending_interrupt_timeline
        event = TimelineEvent(
            event_type="user_speech",
            start_time=tl["start_time"],
            end_time=tl.get("end_time"),
            duration_ms=tl.get("duration_ms"),
            text=tl.get("text")
        )
        self.current_turn.timeline_event_metrics.append(event)
        logger.info(f"[metrics] start_turn() carried over interrupt timeline: {tl.get('text')}")
        self._pending_interrupt_timeline = None
    if self._pending_interrupt_stt is not None:
        self.current_turn.stt_metrics.append(self._pending_interrupt_stt)
        logger.info(f"[metrics] start_turn() carried over interrupt STT: {self._pending_interrupt_stt.stt_transcript}")
        self._pending_interrupt_stt = None
    if self._pending_interrupt_eou is not None:
        self.current_turn.eou_metrics.append(self._pending_interrupt_eou)
        logger.info(f"[metrics] start_turn() carried over interrupt EOU: {self._pending_interrupt_eou.eou_latency}")
        self._pending_interrupt_eou = None
    if self._pending_interrupt_vad is not None:
        self.current_turn.vad_metrics.append(self._pending_interrupt_vad)
        logger.info(f"[metrics] start_turn() carried over interrupt VAD")
        self._pending_interrupt_vad = None

    # If user is currently speaking, capture the start time
    if self._is_user_speaking and self._user_input_start_time:
        if self.current_turn.user_speech_start_time is None:
            self.current_turn.user_speech_start_time = self._user_input_start_time
            if not any(ev.event_type == "user_speech" for ev in self.current_turn.timeline_event_metrics):
                self._start_timeline_event("user_speech", self._user_input_start_time)

Start a new turn. Completes any existing turn first.

def update_provider_class(self, component_type: str, provider_class: str, model_name: str = '') ‑> None
Expand source code
def update_provider_class(self, component_type: str, provider_class: str, model_name: str = "") -> None:
    """Update the provider class and model name for a specific component when fallback occurs.
    
    Args:
        component_type: "STT", "LLM", "TTS", etc.
        provider_class: The new provider class name (e.g., "GoogleLLM")
        model_name: The new model name
    """
    target_type = component_type.lower()
    if target_type in self.session.provider_per_component:
        self.session.provider_per_component[target_type]["provider_class"] = provider_class
        if model_name:
            self.session.provider_per_component[target_type]["model_name"] = model_name
        logger.info(f"Updated {target_type} provider class to: {provider_class}, model: {model_name}")

Update the provider class and model name for a specific component when fallback occurs.

Args

component_type
"STT", "LLM", "TTS", etc.
provider_class
The new provider class name (e.g., "GoogleLLM")
model_name
The new model name
class SessionMetrics (session_id: str | None = None,
room_id: str | None = None,
system_instruction: str = '',
components: List[str] = <factory>,
pipeline_type: str | None = None,
pipeline_mode: str | None = None,
realtime_mode: str | None = None,
session_start_time: float = <factory>,
session_end_time: float | None = None,
participant_metrics: List[ParticipantMetrics] = <factory>,
errors: List[Dict[str, Any]] = <factory>,
provider_per_component: Dict[str, Dict[str, str]] = <factory>,
eou_config: Dict[str, Any] | None = None,
interrupt_config: Dict[str, Any] | None = None)
Expand source code
@dataclass
class SessionMetrics:
    """Session-level metrics."""
    session_id: Optional[str] = None
    room_id: Optional[str] = None
    system_instruction: str = ""
    components: List[str] = field(default_factory=list)
    pipeline_type: Optional[str] = None
    pipeline_mode: Optional[str] = None
    realtime_mode: Optional[str] = None
    session_start_time: float = field(default_factory=time.time)
    session_end_time: Optional[float] = None
    participant_metrics: List[ParticipantMetrics] = field(default_factory=list)
    errors: List[Dict[str, Any]] = field(default_factory=list)
    provider_per_component: Dict[str, Dict[str, str]] = field(default_factory=dict)
    eou_config: Optional[Dict[str, Any]] = None
    interrupt_config: Optional[Dict[str, Any]] = None

Session-level metrics.

Instance variables

var components : List[str]
var eou_config : Dict[str, Any] | None
var errors : List[Dict[str, Any]]
var interrupt_config : Dict[str, Any] | None
var participant_metrics : List[ParticipantMetrics]
var pipeline_mode : str | None
var pipeline_type : str | None
var provider_per_component : Dict[str, Dict[str, str]]
var realtime_mode : str | None
var room_id : str | None
var session_end_time : float | None
var session_id : str | None
var session_start_time : float
var system_instruction : str
class TracesFlowManager (room_id: str, session_id: str | None = None)
Expand source code
class TracesFlowManager:
    """
    Manages the flow of OpenTelemetry traces for agent Turns,
    ensuring correct parent-child relationships between spans.
    """

    def __init__(self, room_id: str, session_id: Optional[str] = None):
        self.room_id = room_id
        self.session_id = session_id
        self.root_span: Optional[Span] = None
        self.agent_session_span: Optional[Span] = None
        self.main_turn_span: Optional[Span] = None
        self.agent_session_config_span: Optional[Span] = None
        self.agent_session_closed_span: Optional[Span] = None
        self._turn_count = 0
        self.root_span_ready = asyncio.Event()
        self.a2a_span: Optional[Span] = None
        self._a2a_turn_count = 0
        self.session_metrics: Optional[SessionMetrics] = None
        self.participant_metrics: Optional[List[ParticipantMetrics]] = []

    def set_session_metrics(self, session_metrics: SessionMetrics):
        """Set the session metrics for the trace manager."""
        self.session_metrics = session_metrics

    def set_session_id(self, session_id: str):
        """Set the session ID for the trace manager."""
        self.session_id = session_id

    def start_agent_joined_meeting(self, attributes: Dict[str, Any]):
        """Starts the root span for the agent joining a meeting."""
        if self.root_span:
            return
        
        agent_name = attributes.get('agent_name', 'UnknownAgent')
        agent_id = attributes.get('peerId', 'UnknownID')

        span_name = f"Agent Session: agentName_{agent_name}_agentId_{agent_id}"

        start_time = attributes.get('start_time', time.perf_counter())
        self.root_span = create_span(span_name, attributes, start_time=start_time)

        # Always set root_span_ready so downstream awaits don't hang
        # when telemetry is not initialized (create_span returns None)
        self.root_span_ready.set()

    async def start_agent_session_config(self, attributes: Dict[str, Any]):
        """Starts the span for the agent's session configuration, child of the root span."""
        await self.root_span_ready.wait()
        if not self.root_span:
            return

        if self.agent_session_config_span:
            return

        start_time = attributes.get('start_time', time.perf_counter())
        self.agent_session_config_span = create_span("Session Configuration", attributes, parent_span=self.root_span, start_time=start_time)
        if self.agent_session_config_span:
            with trace.use_span(self.agent_session_config_span):
                self.end_agent_session_config()

    def end_agent_session_config(self):
        """Completes the agent session config span."""
        end_time = time.perf_counter()
        self.end_span(self.agent_session_config_span, "Agent session config ended", end_time=end_time)
        self.agent_session_config_span = None

    async def start_agent_session_closed(self, attributes: Dict[str, Any]):
        """Starts the span for agent session closed."""
        await self.root_span_ready.wait()
        if not self.root_span:
            return

        if self.agent_session_closed_span:
            return

        start_time = attributes.get('start_time', time.perf_counter())
        self.agent_session_closed_span = create_span("Agent Session Closed", attributes, parent_span=self.root_span, start_time=start_time)

    def end_agent_session_closed(self):
        """Completes the agent session closed span."""
        end_time = time.perf_counter()
        self.end_span(self.agent_session_closed_span, "Agent session closed", end_time=end_time)
        self.agent_session_closed_span = None

    async def start_agent_session(self, attributes: Dict[str, Any]):
        """Starts the span for the agent's session, child of the root span."""
        await self.root_span_ready.wait()
        if not self.root_span:
            return

        if self.agent_session_span:
            return

        start_time = attributes.get('start_time', time.perf_counter())
        p_m =[]
        a_p_m = []
        for p in self.participant_metrics:
            if p.kind == "user":
                p_m.append(asdict(p))
            else:
                a_p_m.append(asdict(p))
        attributes["participant_metrics"] = p_m
        attributes["agent_participant_metrics"] = a_p_m
        self.agent_session_span = create_span("Session Started", attributes, parent_span=self.root_span, start_time=start_time)
        
        self.start_main_turn()

    def start_main_turn(self):
        """Starts a parent span for all user-agent turn."""
        if not self.agent_session_span:
            return

        if self.main_turn_span:
            return
            
        start_time = time.perf_counter()
        self.main_turn_span = create_span("User & Agent Turns", parent_span=self.agent_session_span, start_time=start_time)
    
    def create_unified_turn_trace(self, turn: TurnMetrics, session: Any = None) -> None:
        """
        Creates a full trace for a single turn from the unified TurnMetrics schema.
        Handles both cascading and realtime component spans based on what data is present.
        """
        if not self.main_turn_span:
            return
        self._turn_count += 1
        turn_name = f"Turn #{self._turn_count}"

        # Determine turn start time dynamically to encompass all child spans
        start_times = []
        if turn.user_speech_start_time:
            start_times.append(turn.user_speech_start_time)
        if turn.stt_metrics and turn.stt_metrics[0].stt_start_time:
            start_times.append(turn.stt_metrics[0].stt_start_time)
        if turn.llm_metrics and turn.llm_metrics[0].llm_start_time:
            start_times.append(turn.llm_metrics[0].llm_start_time)
        if turn.tts_metrics and turn.tts_metrics[0].tts_start_time:
            start_times.append(turn.tts_metrics[0].tts_start_time)
        if turn.eou_metrics and turn.eou_metrics[0].eou_start_time:
            start_times.append(turn.eou_metrics[0].eou_start_time)
        if turn.timeline_event_metrics:
            for ev in turn.timeline_event_metrics:
                if ev.start_time:
                    start_times.append(ev.start_time)
                    
        turn_span_start_time = min(start_times) if start_times else None

        turn_span = create_span(turn_name, parent_span=self.main_turn_span, start_time=turn_span_start_time)

        if not turn_span:
            return

        with trace.use_span(turn_span, end_on_exit=False):

            # --- VAD errors ---
            def create_vad_span(vad: VadMetrics):
                vad_errors = [e for e in turn.errors if e.get("source") == "VAD"]
                if vad_errors or turn.vad_metrics:
                    vad_class = turn.session_metrics.provider_per_component.get("vad", {}).get("provider_class")
                    vad_model = turn.session_metrics.provider_per_component.get("vad", {}).get("model_name")
                    vad_span_name = f"{vad_class}: VAD Processing"
                    
                    vad_attrs = {}
                    if not vad_class:
                        return
                    if vad_class:
                        vad_attrs["provider_class"] = vad_class
                    if vad_model:
                        vad_attrs["model_name"] = vad_model
                    if vad.vad_min_silence_duration:
                        vad_attrs["min_silence_duration"] = vad.vad_min_silence_duration
                    if vad.vad_min_speech_duration:
                        vad_attrs["min_speech_duration"] = vad.vad_min_speech_duration
                    if vad.vad_threshold:
                        vad_attrs["threshold"] = vad.vad_threshold

                # Calculate span start time: end_of_speech_time - min_silence_duration
                if vad.user_speech_start_time is None and vad.user_speech_end_time is not None:
                    vad.user_speech_start_time = vad.user_speech_end_time - vad.vad_min_silence_duration
                elif vad.user_speech_start_time is not None and vad.user_speech_end_time is None:
                    vad.user_speech_end_time = vad.user_speech_start_time + vad.vad_min_silence_duration

                vad_start_time = vad.user_speech_start_time
                vad_end_time = vad.user_speech_end_time
                vad_span = create_span(vad_span_name, vad_attrs, parent_span=turn_span, start_time=vad_start_time)
                
                if vad_span:
                    for error in vad_errors:
                        vad_span.add_event("error", attributes={
                            "message": error["message"],
                            "timestamp": error["timestamp"],
                            "source": error["source"]
                        })
                        with trace.use_span(vad_span):
                            vad_error_span = create_span("VAD Error", {"message": error["message"]}, parent_span=vad_span, start_time=error["timestamp"])
                            self.end_span(vad_error_span, end_time=error["timestamp"]+0.100)
                            vad_errors.remove(error)
                    
                    vad_status = StatusCode.ERROR if vad_errors else StatusCode.OK
                    self.end_span(vad_span, status_code=vad_status, end_time=vad_end_time)

            vad_list = turn.vad_metrics if turn.vad_metrics else None
            if vad_list:
                for vad in vad_list:
                    try:
                        create_vad_span(vad)
                    except Exception as e:
                        logger.error(f"Error creating VAD span: {e}")

            # --- Interruption span ---
            def create_interruption_span(interruption: InterruptionMetrics):
                if interruption.false_interrupt_start_time and interruption.false_interrupt_end_time:
                    false_interrupt_attrs = {}
                    if interruption.interrupt_mode:
                        false_interrupt_attrs["interrupt_mode"] = interruption.interrupt_mode
                    if interruption.false_interrupt_pause_duration:
                        false_interrupt_attrs["pause_duration_config"] = interruption.false_interrupt_pause_duration
                    if interruption.false_interrupt_duration:
                        false_interrupt_attrs["false_interrupt_duration"] = interruption.false_interrupt_duration
                    if interruption.resumed_after_false_interrupt:
                        false_interrupt_attrs["resumed_after_false_interrupt"] = True
                    if interruption.false_interrupt_duration:
                        false_interrupt_attrs["actual_duration"] = interruption.false_interrupt_duration
                    
                    false_interrupt_end = interruption.false_interrupt_end_time
                    if false_interrupt_end is None:
                        # False interrupt was followed by true interrupt
                        false_interrupt_end = interruption.interrupt_start_time
                    
                    false_interrupt_span_name = "False Interruption (Resumed)" if interruption.resumed_after_false_interrupt else "False Interruption (Escalated)"
                    false_interrupt_span = create_span(false_interrupt_span_name, false_interrupt_attrs, parent_span=turn_span, start_time=interruption.false_interrupt_start_time)
                    self.end_span(false_interrupt_span, message="False interruption detected", end_time=false_interrupt_end)

                if turn.is_interrupted:
                    interrupted_attrs = {}
                    if interruption.interrupt_mode:
                        interrupted_attrs["interrupt_mode"] = interruption.interrupt_mode
                    if interruption.interrupt_min_duration:
                        interrupted_attrs["interrupt_min_duration"] = interruption.interrupt_min_duration
                    if interruption.interrupt_min_words:
                        interrupted_attrs["interrupt_min_words"] = interruption.interrupt_min_words
                    if interruption.false_interrupt_pause_duration:
                        interrupted_attrs["false_interrupt_pause_duration"] = interruption.false_interrupt_pause_duration
                    if interruption.resume_on_false_interrupt:
                        interrupted_attrs["resume_on_false_interrupt"] = interruption.resume_on_false_interrupt
                    if interruption.interrupt_reason:
                        interrupted_attrs["interrupt_reason"] = interruption.interrupt_reason
                    if interruption.interrupt_words:
                        interrupted_attrs["interrupt_words"] = interruption.interrupt_words
                    if interruption.interrupt_duration:
                        interrupted_attrs["interrupt_duration"] = interruption.interrupt_duration
                    # Mark if this was preceded by a false interrupt
                    if interruption.false_interrupt_start_time is not None:
                        interrupted_attrs["preceded_by_false_interrupt"] = True
                    
                    interrupted_span = create_span("Turn Interrupted", interrupted_attrs, parent_span=turn_span, start_time=interruption.interrupt_start_time)
            
                # Calculate interrupt end time with proper None checks
                if interruption.interrupt_start_time is not None:
                    if interruption.interrupt_duration is not None:
                        interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_duration
                    elif interruption.interrupt_min_duration is not None:
                        interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_min_duration
                    else:
                        interruption.interrupt_end_time = interruption.interrupt_start_time
            
                self.end_span(interrupted_span, message="Agent was interrupted", end_time=interruption.interrupt_end_time) 

            
            if turn.interruption_metrics:
                try:
                    create_interruption_span(turn.interruption_metrics)
                except Exception as e:
                    logger.error(f"Error creating interruption span: {e}")
                
            
            # --- Fallback spans ---
            def create_fallback_span(fallback: FallbackEvent):
                is_recovery = fallback.is_recovery
                if is_recovery:
                    fallback_span_name = f"Recovery: {fallback.component_type}"
                    fallback_attrs = {
                        "temporary_disable_sec": fallback.temporary_disable_sec,
                        "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts,
                        "recovery_attempt": fallback.recovery_attempt,
                        "message": fallback.message,
                        "restored_provider": fallback.new_provider_label,
                        "previous_provider": fallback.original_provider_label,
                    }
                    span_time = fallback.start_time
                    recovery_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time)
                    if recovery_span:
                        self.end_span(recovery_span, message="Recovery completed", end_time=fallback.end_time)
                        return
                
                fallback_span_name = f"Fallback: {fallback.component_type}"
                
                fallback_attrs = {
                    "temporary_disable_sec": fallback.temporary_disable_sec,
                    "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts,
                    "recovery_attempt": fallback.recovery_attempt,
                    "message": fallback.message,
                }
                
                # Use same start_time for all spans (instant spans)
                span_time = fallback.start_time
                fallback_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time)
                
                if fallback_span:
                    # Child trace for original connection attempt (if exists)
                    if fallback.original_provider_label:
                        original_conn_attrs = {
                            "provider": fallback.original_provider_label,
                            "status": "failed"
                        }
                        
                        original_conn_span = create_span(
                            f"Connection: {fallback.original_provider_label}",
                            original_conn_attrs,
                            parent_span=fallback_span,
                            start_time=span_time
                        )
                        self.end_span(original_conn_span, status_code=StatusCode.ERROR, end_time=span_time)
                    
                    # Child trace for new connection attempt (if switched successfully)
                    if fallback.new_provider_label:
                        new_conn_attrs = {
                            "provider": fallback.new_provider_label,
                            "status": "success"
                        }
                        
                        new_conn_span = create_span(
                            f"Connection: {fallback.new_provider_label}",
                            new_conn_attrs,
                            parent_span=fallback_span,
                            start_time=span_time
                        )
                        self.end_span(new_conn_span, status_code=StatusCode.OK, end_time=span_time)
                    
                    # End the fallback span - status depends on whether we successfully switched
                    fallback_status = StatusCode.OK if fallback.new_provider_label else StatusCode.ERROR
                    self.end_span(fallback_span, status_code=fallback_status, end_time=span_time)

            if turn.fallback_events:
                for fallback in turn.fallback_events:
                    try:
                        create_fallback_span(fallback)
                    except Exception as e:
                        logger.error(f"Error creating fallback span: {e}")

            # --- STT spans ---
            def create_stt_span(stt: SttMetrics):
                stt_errors = [e for e in turn.errors if e.get("source") == "STT"]
                if stt or stt_errors:

                    stt_attrs = {}
                    if stt:
                        stt_class = turn.session_metrics.provider_per_component.get("stt", {}).get("provider_class")
                        if stt_class:
                            stt_attrs["provider_class"] = stt_class
                        
                        stt_model = turn.session_metrics.provider_per_component.get("stt", {}).get("model_name")
                        stt_attrs["input"] = "N/A"
                        if stt_model:
                            stt_attrs["model_name"] = stt_model
                        if stt.stt_latency is not None:
                            stt_attrs["duration_ms"] = stt.stt_latency
                        if stt.stt_start_time:
                            stt_attrs["start_timestamp"] = stt.stt_start_time
                        if stt.stt_end_time:
                            stt_attrs["end_timestamp"] = stt.stt_end_time
                        if stt.stt_transcript:
                            stt_attrs["output"] = stt.stt_transcript
                        if stt_class =="DeepgramSTTV2" and turn.preemtive_generation_enabled:
                            stt_attrs["stt_preemptive_generation_enabled"] = turn.preemtive_generation_enabled
                    
                    stt_span_name = f"{stt_class}: Speech to Text Processing"
                    stt_span = create_span(
                        stt_span_name, stt_attrs,
                        parent_span=turn_span,
                        start_time=stt.stt_start_time if stt else None,
                    )

                    if stt.stt_preemptive_generation_enabled:
                        with trace.use_span(stt_span):
                            preemptive_attributes = {
                                "preemptive_generation_occurred": stt.stt_preemptive_generation_occurred,
                                "partial_text": stt.stt_preflight_transcript,
                                "final_text": stt.stt_transcript,
                            }
                        if stt.stt_preemptive_generation_occurred:
                            preemptive_attributes["preemptive_generation_latency"] = stt.stt_preflight_latency
                        preemptive_span = create_span("Preemptive Generation", preemptive_attributes, parent_span=stt_span, start_time=stt.stt_start_time)
                        preemptive_end_time = stt.stt_preflight_end_time or stt.stt_end_time
                        self.end_span(preemptive_span, end_time=preemptive_end_time)


                    if stt_span:
                        for error in stt_errors:
                            stt_span.add_event("error", attributes={
                                "message": error.get("message", ""),
                                "timestamp": error.get("timestamp", ""),
                            })
                            if stt.stt_start_time <= error.get("timestamp") <= stt.stt_end_time:
                                with trace.use_span(stt_span):
                                    stt_error_span = create_span("STT Error", {"message": error.get("message", "")}, parent_span=stt_span, start_time=error.get("timestamp"))
                                    self.end_span(stt_error_span, end_time=error.get("timestamp")+0.100)
                                    stt_errors.remove(error)
                        status = StatusCode.ERROR if stt_errors else StatusCode.OK
                        self.end_span(stt_span, status_code=status, end_time=stt.stt_end_time if stt else None)
            
            stt_list = turn.stt_metrics if turn.stt_metrics else None
            if stt_list:
                for stt in stt_list:
                    try:
                        create_stt_span(stt)
                    except Exception as e:
                        logger.error(f"Error creating STT span: {e}")

            # --- EOU spans ---
            def create_eou_span(eou: EouMetrics):
                eou_errors = [e for e in turn.errors if e.get("source") == "TURN-D"]

                eou_attrs = {}
                if eou:
                    eou_class = turn.session_metrics.provider_per_component.get("eou", {}).get("provider_class")
                    eou_model = turn.session_metrics.provider_per_component.get("eou", {}).get("model_name")
                    
                    if eou_class:
                        eou_attrs["provider_class"] = eou_class
                    if eou_model:
                        eou_attrs["model_name"] = eou_model
                    if turn.user_speech:
                        eou_attrs["input"] = turn.user_speech
                    if eou.eou_latency is not None:
                        eou_attrs["duration_ms"] = eou.eou_latency
                    if eou.eou_start_time:
                        eou_attrs["start_timestamp"] = eou.eou_start_time
                    if eou.eou_end_time:
                        eou_attrs["end_timestamp"] = eou.eou_end_time
                    if eou.waited_for_additional_speech:
                        eou_attrs["waited_for_additional_speech"] = eou.waited_for_additional_speech
                    if eou.eou_probability:
                        eou_attrs["eou_probability"] = round(eou.eou_probability, 4)
                    if turn.session_metrics.eou_config.get("min_speech_wait_timeout"):
                        eou_attrs["min_speech_wait_timeout"] = turn.session_metrics.eou_config.get("min_speech_wait_timeout")
                    if turn.session_metrics.eou_config.get("max_speech_wait_timeout"):
                        eou_attrs["max_speech_wait_timeout"] = turn.session_metrics.eou_config.get("max_speech_wait_timeout")

                    eou_span_name = f"{eou_class}: End-Of-Utterance Detection"

                eou_span = create_span(
                    eou_span_name, eou_attrs,
                    parent_span=turn_span,
                    start_time=eou.eou_start_time if eou else None,
                )
                if eou.waited_for_additional_speech:
                    delay = round(eou.wait_for_additional_speech_duration, 4)
                    with trace.use_span(eou_span):
                        wait_for_additional_speech_span =create_span (
                            "Wait for Additional Speech",
                            {
                                "wait_for_additional_speech_duration":delay,
                                "eou_probability": round(eou.eou_probability, 4),
                            },
                            start_time=eou.eou_end_time,
                        )
                        self.end_span(wait_for_additional_speech_span, status_code=StatusCode.OK, end_time=eou.eou_end_time + delay)
                
                if eou_span:
                    for error in eou_errors:
                        eou_span.add_event("error", attributes={
                            "message": error.get("message", ""),
                            "timestamp": error.get("timestamp", ""),
                        })
                        with trace.use_span(eou_span):
                            eou_error_span = create_span("EOU Error", {"message": error.get("message", "")}, parent_span=eou_span, start_time=error.get("timestamp"))
                            self.end_span(eou_error_span, end_time=error.get("timestamp")+0.100)
                            eou_errors.remove(error)
                    eou_status = StatusCode.ERROR if eou_errors else StatusCode.OK
                    self.end_span(eou_span, status_code=eou_status, end_time=eou.eou_end_time if eou else None)

            eou_list = turn.eou_metrics if turn.eou_metrics else None
            if eou_list:
                for eou in eou_list:
                    try:
                        create_eou_span(eou)
                    except Exception as e:
                        logger.error(f"Error creating EOU span: {e}")


            # --- LLM spans ---
            def create_llm_span(llm: LlmMetrics):
                llm_errors = [e for e in turn.errors if e.get("source") == "LLM"]
                llm_attrs = {}
                if llm:
                    llm_class = turn.session_metrics.provider_per_component.get("llm", {}).get("provider_class")
                    if llm_class:
                        llm_attrs["provider_class"] = llm_class
                    llm_model = turn.session_metrics.provider_per_component.get("llm", {}).get("model_name")
                    if llm_model:
                        llm_attrs["model_name"] = llm_model
                    if llm.llm_input:
                        llm_attrs["input"] = llm.llm_input
                    if llm.llm_duration:
                        llm_attrs["duration_ms"] = llm.llm_duration
                    if llm.llm_start_time:
                        llm_attrs["start_timestamp"] = llm.llm_start_time
                    if llm.llm_end_time:
                        llm_attrs["end_timestamp"] = llm.llm_end_time
                    if turn.agent_speech:
                        llm_attrs["output"] = turn.agent_speech
                    if llm.prompt_tokens:
                        llm_attrs["input_tokens"] = llm.prompt_tokens
                    if llm.completion_tokens:
                        llm_attrs["output_tokens"] = llm.completion_tokens
                    if llm.prompt_cached_tokens:
                        llm_attrs["cached_input_tokens"] = llm.prompt_cached_tokens
                    if llm.total_tokens:
                        llm_attrs["total_tokens"] = llm.total_tokens

                llm_span_name = f"{llm_class}: LLM Processing"
                llm_span = create_span(
                    llm_span_name, llm_attrs,
                    parent_span=turn_span,
                    start_time=llm.llm_start_time if llm else None,
                )
                if llm_span:
                    # Tool call sub-spans
                    if turn.function_tool_timestamps:
                        for tool_data in turn.function_tool_timestamps:
                            tool_timestamp = tool_data.get("timestamp")
                            tool_span = create_span(
                                f"Invoked Tool: {tool_data.get('tool_name', 'unknown')}",
                                parent_span=llm_span,
                                start_time=tool_timestamp,
                            )
                            self.end_span(tool_span, end_time=tool_timestamp)

                    for error in llm_errors:
                        llm_span.add_event("error", attributes={
                            "message": error.get("message", ""),
                            "timestamp": error.get("timestamp", ""),
                        })
                        with trace.use_span(llm_span):
                            llm_error_span = create_span("LLM Error", {"message": error.get("message", "")}, parent_span=llm_span, start_time=error.get("timestamp"))
                            self.end_span(llm_error_span, end_time=error.get("timestamp")+0.100)
                            llm_errors.remove(error)

                    # TTFT sub-span
                    if llm and llm.llm_ttft is not None and llm.llm_start_time is not None:
                        ttft_span = create_span(
                            "Time to First Token",
                            attributes={"llm_ttft": llm.llm_ttft},
                            parent_span=llm_span,
                            start_time=llm.llm_start_time,
                        )
                        ttft_end = llm.llm_start_time + (llm.llm_ttft / 1000)
                        self.end_span(ttft_span, end_time=ttft_end)

                    llm_status = StatusCode.ERROR if llm_errors else StatusCode.OK
                    self.end_span(llm_span, status_code=llm_status, end_time=llm.llm_end_time if llm else None)

            llm_list = turn.llm_metrics if turn.llm_metrics else None
            if llm_list:
                for llm in llm_list:
                    try:
                        create_llm_span(llm)
                    except Exception as e:
                        logger.error(f"Error creating LLM span: {e}")

            # --- TTS spans ---
            def create_tts_span(tts: TtsMetrics):
                tts_errors = [e for e in turn.errors if e.get("source") == "TTS"]
                tts_attrs = {}
                if tts:
                    tts_class = turn.session_metrics.provider_per_component.get("tts", {}).get("provider_class")
                    tts_model = turn.session_metrics.provider_per_component.get("tts", {}).get("model_name")
                    if tts_class:
                        tts_attrs["provider_class"] = tts_class
                    if tts_model:
                        tts_attrs["model_name"] = tts_model
                    if turn.agent_speech:
                        tts_attrs["input"] = turn.agent_speech
                    if tts.tts_duration:
                        tts_attrs["duration_ms"] = tts.tts_duration
                    if tts.tts_start_time:
                        tts_attrs["start_timestamp"] = tts.tts_start_time
                    if tts.tts_end_time:
                        tts_attrs["end_timestamp"] = tts.tts_end_time
                    if tts.tts_characters:
                        tts_attrs["characters"] = tts.tts_characters
                    if turn.agent_speech_duration:
                        tts_attrs["audio_duration_ms"] = turn.agent_speech_duration
                    tts_attrs["output"] = "N/A"

                tts_span_name = f"{tts_class}: Text to Speech Processing"
                tts_span = create_span(
                    tts_span_name, tts_attrs,
                    parent_span=turn_span,
                    start_time=tts.tts_start_time if tts else None,
                )

                if tts_span:
                    # TTFB sub-span
                    if tts and tts.tts_first_byte_time is not None:
                        ttfb_span = create_span(
                            "Time to First Byte",
                            parent_span=tts_span,
                            start_time=tts.tts_start_time,
                        )
                        self.end_span(ttfb_span, end_time=tts.tts_first_byte_time)

                    for error in tts_errors:
                        tts_span.add_event("error", attributes={
                            "message": error.get("message", ""),
                            "timestamp": error.get("timestamp", ""),
                        })
                        with trace.use_span(tts_span):
                            tts_error_span = create_span("TTS Error", {"message": error.get("message", "")}, parent_span=tts_span, start_time=error.get("timestamp"))
                            self.end_span(tts_error_span, end_time=error.get("timestamp")+0.100)
                            tts_errors.remove(error)

                    tts_status = StatusCode.ERROR if tts_errors else StatusCode.OK
                    self.end_span(tts_span, status_code=tts_status, end_time=tts.tts_end_time if tts else None)

            tts_list = turn.tts_metrics if turn.tts_metrics else None
            if tts_list:
                for tts in tts_list:
                    try:
                        create_tts_span(tts)
                    except Exception as e:
                        logger.error(f"Error creating TTS span: {e}")

            # --- KB spans ---
            def create_kb_span(kb: KbMetrics):
                kb_span_name = "Knowledge Base: Retrieval"
                kb_attrs = {}
                if turn.user_speech:
                    kb_attrs["input"] = turn.user_speech
                if kb.kb_retrieval_latency:
                    kb_attrs["retrieval_latency_ms"] = kb.kb_retrieval_latency
                if kb.kb_start_time:
                    kb_attrs["start_timestamp"] = kb.kb_start_time
                if kb.kb_end_time:
                    kb_attrs["end_timestamp"] = kb.kb_end_time
                if kb.kb_documents:
                    # Join documents as comma-separated string for readability
                    kb_attrs["documents"] = ", ".join(kb.kb_documents) if len(kb.kb_documents) <= 5 else f"{len(kb.kb_documents)} documents"
                    kb_attrs["document_count"] = len(kb.kb_documents)
                if kb.kb_scores:
                    # Include scores as comma-separated string
                    kb_attrs["scores"] = ", ".join([str(round(s, 4)) for s in kb.kb_scores[:5]])
                
                kb_span = create_span(kb_span_name, kb_attrs, parent_span=turn_span, start_time=kb.kb_start_time)
                if kb_span:
                    self.end_span(kb_span, status_code=StatusCode.OK, end_time=kb.kb_end_time)

            if turn.kb_metrics:
                for kb in turn.kb_metrics:
                    try:
                        create_kb_span(kb)
                    except Exception as e:
                        logger.error(f"Error creating KB span: {e}")

            # --- Realtime spans (for S2S modes) ---
            def create_rt_span(rt: RealtimeMetrics):
                rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"]

                rt_attrs = {}
                if rt:
                    rt_class = turn.session_metrics.provider_per_component.get("realtime", {}).get("provider_class")
                    if rt_class:
                        rt_attrs["provider_class"] = rt_class
                    rt_model = turn.session_metrics.provider_per_component.get("realtime", {}).get("model_name")
                    if rt_model:
                        rt_attrs["model_name"] = rt_model

                rt_start_time = turn.user_speech_end_time if turn.user_speech_end_time else turn.agent_speech_start_time
                rt_end_time = turn.agent_speech_start_time
                
                # if turn.timeline_event_metrics:
                #     for event in turn.timeline_event_metrics:
                #         if event.event_type == "user_speech":
                #             rt_start_time = event.end_time
                #             break

                #     for event in turn.timeline_event_metrics:
                #         if event.event_type == "agent_speech":
                #             rt_end_time = event.start_time
                #             break
                
                
                rt_span_name = f"{rt_class}: Realtime Processing"
                rt_span = create_span(
                    rt_span_name, rt_attrs,
                    parent_span=turn_span,
                    start_time=rt_start_time,
                )
                if rt_span:
                    # Realtime tool calls
                    if turn.function_tools_called:
                        for tool_name in turn.function_tools_called:
                            tool_span = create_span(
                                f"Invoked Tool: {tool_name}",
                                parent_span=turn_span,
                                start_time=time.perf_counter(),
                            )
                            self.end_span(tool_span, end_time=time.perf_counter())

                # TTFB span for realtime
                if turn.e2e_latency is not None:
                    ttfb_span = create_span(
                        "Time to First Word",
                        {"duration_ms": turn.e2e_latency},
                        parent_span=rt_span,
                        start_time=rt_start_time,
                    )
                    self.end_span(ttfb_span, end_time=rt_end_time)

                # --- Realtime model errors ---
                rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"]
                if rt_errors:
                    for error in rt_errors:
                        turn_span.add_event("Errors", attributes={
                            "message": error.get("message", "Unknown error"),
                            "timestamp": error.get("timestamp", "N/A"),
                        })
                        with trace.use_span(turn_span):
                            rt_error_span = create_span("Realtime Error", {"message": error.get("message", "")}, parent_span=turn_span, start_time=error.get("timestamp"))
                            self.end_span(rt_error_span, end_time=error.get("timestamp")+0.100)
                            rt_errors.remove(error)
                self.end_span(rt_span, status_code=StatusCode.ERROR if rt_errors else StatusCode.OK, end_time=rt_end_time)
            

            rt_list = turn.realtime_metrics if turn.realtime_metrics else None
            if rt_list: 
                for rt in rt_list:
                    try:
                        create_rt_span(rt)
                    except Exception as e:
                        logger.error(f"Error creating RT span: {e}")

            def create_error_spans(errors:Dict[str, Any]):
                error_span_name = f"{errors.get('source', 'Unknown')} Error span"
                attr={}
                if errors.get('message'):
                    attr['error message'] = errors.get('message')
                span_start_time = errors.get('timestamp_perf')
                error_span = create_span(error_span_name, attributes=attr, parent_span=turn_span, start_time=span_start_time)
                self.end_span(error_span, status_code=StatusCode.ERROR, end_time=span_start_time + 0.001)
            
            for e in turn.errors:
                try:
                    create_error_spans(e)
                except Exception as e:
                    logger.error(f"Error creating error span: {e}")

            # Determine turn end time first for unbounded children spans
            end_times = []
            if turn.tts_metrics and turn.tts_metrics[-1].tts_end_time:
                end_times.append(turn.tts_metrics[-1].tts_end_time)
            if turn.llm_metrics and turn.llm_metrics[-1].llm_end_time:
                end_times.append(turn.llm_metrics[-1].llm_end_time)
            if turn.agent_speech_end_time:
                end_times.append(turn.agent_speech_end_time)
            if turn.eou_metrics and turn.eou_metrics[-1].eou_end_time:
                end_times.append(turn.eou_metrics[-1].eou_end_time)
            if turn.stt_metrics and turn.stt_metrics[-1].stt_end_time:
                end_times.append(turn.stt_metrics[-1].stt_end_time)
            if turn.user_speech_end_time:
                end_times.append(turn.user_speech_end_time)
            if turn.interruption_metrics and turn.interruption_metrics.false_interrupt_end_time:
                end_times.append(turn.interruption_metrics.false_interrupt_end_time)
                
            turn_end_time = max(end_times) if end_times else None

            if turn.is_interrupted or turn_end_time is None:
                turn_end_time = time.perf_counter()


                
                        

            # --- Timeline events ---
            if turn.timeline_event_metrics:
                for event in turn.timeline_event_metrics:
                    if event.event_type == "user_speech":
                        user_speech_span = create_span(
                            "User Input Speech",
                            {"Transcript": event.text, "duration_ms": event.duration_ms},
                            parent_span=turn_span,
                            start_time=event.start_time,
                        )
                        self.end_span(user_speech_span, end_time=event.end_time if event.end_time else turn_end_time)
                    elif event.event_type == "agent_speech":
                        agent_speech_span = create_span(
                            "Agent Output Speech",
                            {"Transcript": event.text, "duration_ms": event.duration_ms},
                            parent_span=turn_span,
                            start_time=event.start_time,
                        )
                        self.end_span(agent_speech_span, end_time=event.end_time if event.end_time else turn_end_time)
                    elif event.event_type == "thinking_audio":
                        thinking_attrs = {}
                        if turn.thinking_audio_file_path:
                            thinking_attrs["file_path"] = turn.thinking_audio_file_path
                        if turn.thinking_audio_looping is not None:
                            thinking_attrs["looping"] = turn.thinking_audio_looping
                        if turn.thinking_audio_override_thinking is not None:
                            thinking_attrs["override_thinking"] = turn.thinking_audio_override_thinking
                        if event.duration_ms is not None:
                            thinking_attrs["duration_ms"] = event.duration_ms
                        thinking_span = create_span(
                            "Thinking Audio",
                            thinking_attrs,
                            parent_span=turn_span,
                            start_time=event.start_time,
                        )
                        self.end_span(thinking_span, end_time=event.end_time if event.end_time else turn_end_time)
                    elif event.event_type == "background_audio":
                        bg_attrs = {}
                        if turn.background_audio_file_path:
                            bg_attrs["file_path"] = turn.background_audio_file_path
                        if turn.background_audio_looping is not None:
                            bg_attrs["looping"] = turn.background_audio_looping
                        if event.duration_ms is not None:
                            bg_attrs["duration_ms"] = event.duration_ms
                        bg_span = create_span(
                            "Background Audio",
                            bg_attrs,
                            parent_span=turn_span,
                            start_time=event.start_time,
                        )
                        self.end_span(bg_span, end_time=event.end_time if event.end_time else turn_end_time)



        self.end_span(turn_span, message="End of turn trace.", end_time=turn_end_time)

    def end_main_turn(self):
        """Completes the main turn span."""
        self.end_span(self.main_turn_span, "All turns processed", end_time=time.perf_counter())
        self.main_turn_span = None

    def agent_say_called(self, message: str):
        """Creates a span for the agent's say method."""
        if not self.agent_session_span:
            return

        current_span = trace.get_current_span()

        agent_say_span = create_span(
            "Agent Say",
            {"Agent Say Message": message},
            parent_span=current_span if current_span else self.agent_session_span,
            start_time=time.perf_counter()
        )

        self.end_span(agent_say_span, "Agent say span created", end_time=time.perf_counter())    

    def agent_reply_called(self, instructions: str):
        """Creates a span for an agent reply invocation."""
        if not self.agent_session_span:
            return

        current_span = trace.get_current_span()

        agent_reply_span = create_span(
            "Agent Reply",
            {"Agent Reply Instructions": instructions},
            parent_span=current_span if current_span else self.agent_session_span,
            start_time=time.perf_counter()
        )

        self.end_span(agent_reply_span, "Agent reply span created", end_time=time.perf_counter())

    def create_components_change_trace(self, components_change_status: Dict[str, Any], components_change_data: Dict[str, Any], time_data: Dict[str, Any]) -> None:
        """
        Creates a span for the agent's components change.
        Args:
            components_change_status: Status of the components change.
            components_change_data: Data of the components change.
            time_data: Time data of the components change.
        """
        if not self.main_turn_span:
            return
        
        attr = {}

        if components_change_data.get("new_stt") is not None:
            attr["new_stt"] = components_change_data["new_stt"]
        if components_change_data.get("new_tts") is not None:
            attr["new_tts"] = components_change_data["new_tts"]
        if components_change_data.get("new_llm") is not None:
            attr["new_llm"] = components_change_data["new_llm"]
        if components_change_data.get("new_vad") is not None:
            attr["new_vad"] = components_change_data["new_vad"]
        if components_change_data.get("new_turn_detector") is not None:
            attr["new_turn_detector"] = components_change_data["new_turn_detector"]
        if components_change_data.get("new_denoise") is not None:
            attr["new_denoise"] = components_change_data["new_denoise"]
        if components_change_status:
            attr["components_change_status"] = components_change_status

        self.components_change_span = create_span(
            "Components Change",
            attr,
            parent_span=self.main_turn_span,
            start_time=time_data.get("start_time", time.perf_counter())
        )

        self.end_span(self.components_change_span, "Components change span created", end_time=time_data.get("end_time", time.perf_counter()))
        self.components_change_span = None

    def create_pipeline_change_trace(self, time_data: Dict[str, Any], original_pipeline_config: Dict[str, Any], new_pipeline_config: Dict[str, Any]) -> None:
        """
        Creates a span for the agent's pipeline change.
        Args:
            time_data: Time data of the pipeline change.
            original_pipeline_config: Original pipeline configuration.
            new_pipeline_config: New pipeline configuration.
        """
        if not self.main_turn_span:
            return
        
        attr = {
            "original_pipeline_config": original_pipeline_config,
            "new_pipeline_config": new_pipeline_config
        }
        pipeline_change_span = create_span(
            "Pipeline Change",
            attr,
            parent_span=self.main_turn_span,
            start_time=time_data.get("start_time", time.perf_counter())
        )

        self.end_span(pipeline_change_span, "Pipeline change span created", end_time=time_data.get("end_time", time.perf_counter()))

    def create_a2a_trace(self, name: str, attributes: Dict[str, Any]) -> Optional[Span]:
        """Creates an A2A trace under the main turn span."""
        if not self.main_turn_span:
            return None

        if not self.a2a_span:
            self.a2a_span = create_span(
                "Agent-to-Agent Communications",
                {"total_a2a_turns": self._a2a_turn_count},
                parent_span=self.main_turn_span
            )

        if not self.a2a_span:
            return None

        self._a2a_turn_count += 1
        span_name = f"A2A {self._a2a_turn_count}: {name}"
        
        a2a_span = create_span(
            span_name, 
            {
                **attributes,
                "a2a_turn_number": self._a2a_turn_count,
                "parent_span": "Agent-to-Agent Communications"
            }, 
            parent_span=self.a2a_span,
            start_time=time.perf_counter()
        )
        
        return a2a_span

    def end_a2a_trace(self, span: Optional[Span], message: str = ""):
        """Ends an A2A trace span."""
        complete_span(span, StatusCode.OK, end_time=time.perf_counter())

    def end_a2a_communication(self):
        """Ends the A2A communication parent span."""
        complete_span(self.a2a_span, StatusCode.OK, end_time=time.perf_counter())
        self.a2a_span = None
        self._a2a_turn_count = 0  

    def create_thinking_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None):
        """Creates a 'Playing Thinking Audio' point-in-time span at session level."""
        if not self.main_turn_span:
            return None

        attrs = {"event": "start", "looping": looping}
        if file_path:
            attrs["file_path"] = file_path

        t = start_time or time.perf_counter()
        span = create_span("Playing Thinking Audio", attrs, parent_span=self.main_turn_span, start_time=t)
        self.end_span(span, message="Thinking audio started", end_time=t)
        return span

    def create_thinking_audio_stop_span(self, end_time: float = None):
        """Creates a 'Stopped Thinking Audio' point-in-time span at session level."""
        if not self.main_turn_span:
            return None

        t = end_time or time.perf_counter()
        span = create_span("Stopped Thinking Audio", {"event": "stop"}, parent_span=self.main_turn_span, start_time=t)
        self.end_span(span, message="Thinking audio stopped", end_time=t)
        return span

    def create_background_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None):
        """Creates a 'Playing Background Audio' span at session level (same level as turn spans)."""
        if not self.main_turn_span:
            return None
        
        bg_audio_attrs = {}
        if file_path:
            bg_audio_attrs["file_path"] = file_path
        bg_audio_attrs["looping"] = looping
        bg_audio_attrs["event"] = "start"
        
        start_span = create_span("Playing Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=start_time or time.perf_counter())
        # End immediately as a point-in-time event
        self.end_span(start_span, message="Background audio started", end_time=start_time or time.perf_counter())
        return start_span

    def create_background_audio_stop_span(self, file_path: str = None, looping: bool = False, end_time: float = None):
        """Creates a 'Stopped Background Audio' span at session level (same level as turn spans)."""
        if not self.main_turn_span:
            return None
        
        bg_audio_attrs = {}
        if file_path:
            bg_audio_attrs["file_path"] = file_path
        bg_audio_attrs["looping"] = looping
        bg_audio_attrs["event"] = "stop"
        
        stop_span = create_span("Stopped Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=end_time or time.perf_counter())
        # End immediately as a point-in-time event
        self.end_span(stop_span, message="Background audio stopped", end_time=end_time or time.perf_counter())
        return stop_span

    def end_agent_session(self):
        """Completes the agent session span."""
        if self.main_turn_span:
            self.end_main_turn()
        self.end_span(self.agent_session_span, "Agent session ended", end_time=time.perf_counter())
        self.agent_session_span = None

    def agent_meeting_end(self):
        """Completes the root span."""
        if self.agent_session_span:
            self.end_agent_session()
        if self.agent_session_config_span:
            self.end_agent_session_config()
        if self.agent_session_closed_span:
            self.end_agent_session_closed()
        self.end_span(self.root_span, "Agent left meeting", end_time=time.perf_counter())
        self.root_span = None
            
    def end_span(self, span: Optional[Span], message: str = "", status_code: StatusCode = StatusCode.OK, end_time: Optional[float] = None):
        """Completes a given span with a status."""
        if span:
            if end_time is None:
                end_time = time.perf_counter()
            desc = message if status_code == StatusCode.ERROR else ""
            complete_span(span, status_code, desc, end_time)

Manages the flow of OpenTelemetry traces for agent Turns, ensuring correct parent-child relationships between spans.

Methods

def agent_meeting_end(self)
Expand source code
def agent_meeting_end(self):
    """Completes the root span."""
    if self.agent_session_span:
        self.end_agent_session()
    if self.agent_session_config_span:
        self.end_agent_session_config()
    if self.agent_session_closed_span:
        self.end_agent_session_closed()
    self.end_span(self.root_span, "Agent left meeting", end_time=time.perf_counter())
    self.root_span = None

Completes the root span.

def agent_reply_called(self, instructions: str)
Expand source code
def agent_reply_called(self, instructions: str):
    """Creates a span for an agent reply invocation."""
    if not self.agent_session_span:
        return

    current_span = trace.get_current_span()

    agent_reply_span = create_span(
        "Agent Reply",
        {"Agent Reply Instructions": instructions},
        parent_span=current_span if current_span else self.agent_session_span,
        start_time=time.perf_counter()
    )

    self.end_span(agent_reply_span, "Agent reply span created", end_time=time.perf_counter())

Creates a span for an agent reply invocation.

def agent_say_called(self, message: str)
Expand source code
def agent_say_called(self, message: str):
    """Creates a span for the agent's say method."""
    if not self.agent_session_span:
        return

    current_span = trace.get_current_span()

    agent_say_span = create_span(
        "Agent Say",
        {"Agent Say Message": message},
        parent_span=current_span if current_span else self.agent_session_span,
        start_time=time.perf_counter()
    )

    self.end_span(agent_say_span, "Agent say span created", end_time=time.perf_counter())    

Creates a span for the agent's say method.

def create_a2a_trace(self, name: str, attributes: Dict[str, Any]) ‑> opentelemetry.trace.span.Span | None
Expand source code
def create_a2a_trace(self, name: str, attributes: Dict[str, Any]) -> Optional[Span]:
    """Creates an A2A trace under the main turn span."""
    if not self.main_turn_span:
        return None

    if not self.a2a_span:
        self.a2a_span = create_span(
            "Agent-to-Agent Communications",
            {"total_a2a_turns": self._a2a_turn_count},
            parent_span=self.main_turn_span
        )

    if not self.a2a_span:
        return None

    self._a2a_turn_count += 1
    span_name = f"A2A {self._a2a_turn_count}: {name}"
    
    a2a_span = create_span(
        span_name, 
        {
            **attributes,
            "a2a_turn_number": self._a2a_turn_count,
            "parent_span": "Agent-to-Agent Communications"
        }, 
        parent_span=self.a2a_span,
        start_time=time.perf_counter()
    )
    
    return a2a_span

Creates an A2A trace under the main turn span.

def create_background_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None)
Expand source code
def create_background_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None):
    """Creates a 'Playing Background Audio' span at session level (same level as turn spans)."""
    if not self.main_turn_span:
        return None
    
    bg_audio_attrs = {}
    if file_path:
        bg_audio_attrs["file_path"] = file_path
    bg_audio_attrs["looping"] = looping
    bg_audio_attrs["event"] = "start"
    
    start_span = create_span("Playing Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=start_time or time.perf_counter())
    # End immediately as a point-in-time event
    self.end_span(start_span, message="Background audio started", end_time=start_time or time.perf_counter())
    return start_span

Creates a 'Playing Background Audio' span at session level (same level as turn spans).

def create_background_audio_stop_span(self, file_path: str = None, looping: bool = False, end_time: float = None)
Expand source code
def create_background_audio_stop_span(self, file_path: str = None, looping: bool = False, end_time: float = None):
    """Creates a 'Stopped Background Audio' span at session level (same level as turn spans)."""
    if not self.main_turn_span:
        return None
    
    bg_audio_attrs = {}
    if file_path:
        bg_audio_attrs["file_path"] = file_path
    bg_audio_attrs["looping"] = looping
    bg_audio_attrs["event"] = "stop"
    
    stop_span = create_span("Stopped Background Audio", bg_audio_attrs, parent_span=self.main_turn_span, start_time=end_time or time.perf_counter())
    # End immediately as a point-in-time event
    self.end_span(stop_span, message="Background audio stopped", end_time=end_time or time.perf_counter())
    return stop_span

Creates a 'Stopped Background Audio' span at session level (same level as turn spans).

def create_components_change_trace(self,
components_change_status: Dict[str, Any],
components_change_data: Dict[str, Any],
time_data: Dict[str, Any]) ‑> None
Expand source code
def create_components_change_trace(self, components_change_status: Dict[str, Any], components_change_data: Dict[str, Any], time_data: Dict[str, Any]) -> None:
    """
    Creates a span for the agent's components change.
    Args:
        components_change_status: Status of the components change.
        components_change_data: Data of the components change.
        time_data: Time data of the components change.
    """
    if not self.main_turn_span:
        return
    
    attr = {}

    if components_change_data.get("new_stt") is not None:
        attr["new_stt"] = components_change_data["new_stt"]
    if components_change_data.get("new_tts") is not None:
        attr["new_tts"] = components_change_data["new_tts"]
    if components_change_data.get("new_llm") is not None:
        attr["new_llm"] = components_change_data["new_llm"]
    if components_change_data.get("new_vad") is not None:
        attr["new_vad"] = components_change_data["new_vad"]
    if components_change_data.get("new_turn_detector") is not None:
        attr["new_turn_detector"] = components_change_data["new_turn_detector"]
    if components_change_data.get("new_denoise") is not None:
        attr["new_denoise"] = components_change_data["new_denoise"]
    if components_change_status:
        attr["components_change_status"] = components_change_status

    self.components_change_span = create_span(
        "Components Change",
        attr,
        parent_span=self.main_turn_span,
        start_time=time_data.get("start_time", time.perf_counter())
    )

    self.end_span(self.components_change_span, "Components change span created", end_time=time_data.get("end_time", time.perf_counter()))
    self.components_change_span = None

Creates a span for the agent's components change.

Args

components_change_status
Status of the components change.
components_change_data
Data of the components change.
time_data
Time data of the components change.
def create_pipeline_change_trace(self,
time_data: Dict[str, Any],
original_pipeline_config: Dict[str, Any],
new_pipeline_config: Dict[str, Any]) ‑> None
Expand source code
def create_pipeline_change_trace(self, time_data: Dict[str, Any], original_pipeline_config: Dict[str, Any], new_pipeline_config: Dict[str, Any]) -> None:
    """
    Creates a span for the agent's pipeline change.
    Args:
        time_data: Time data of the pipeline change.
        original_pipeline_config: Original pipeline configuration.
        new_pipeline_config: New pipeline configuration.
    """
    if not self.main_turn_span:
        return
    
    attr = {
        "original_pipeline_config": original_pipeline_config,
        "new_pipeline_config": new_pipeline_config
    }
    pipeline_change_span = create_span(
        "Pipeline Change",
        attr,
        parent_span=self.main_turn_span,
        start_time=time_data.get("start_time", time.perf_counter())
    )

    self.end_span(pipeline_change_span, "Pipeline change span created", end_time=time_data.get("end_time", time.perf_counter()))

Creates a span for the agent's pipeline change.

Args

time_data
Time data of the pipeline change.
original_pipeline_config
Original pipeline configuration.
new_pipeline_config
New pipeline configuration.
def create_thinking_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None)
Expand source code
def create_thinking_audio_start_span(self, file_path: str = None, looping: bool = False, start_time: float = None):
    """Creates a 'Playing Thinking Audio' point-in-time span at session level."""
    if not self.main_turn_span:
        return None

    attrs = {"event": "start", "looping": looping}
    if file_path:
        attrs["file_path"] = file_path

    t = start_time or time.perf_counter()
    span = create_span("Playing Thinking Audio", attrs, parent_span=self.main_turn_span, start_time=t)
    self.end_span(span, message="Thinking audio started", end_time=t)
    return span

Creates a 'Playing Thinking Audio' point-in-time span at session level.

def create_thinking_audio_stop_span(self, end_time: float = None)
Expand source code
def create_thinking_audio_stop_span(self, end_time: float = None):
    """Creates a 'Stopped Thinking Audio' point-in-time span at session level."""
    if not self.main_turn_span:
        return None

    t = end_time or time.perf_counter()
    span = create_span("Stopped Thinking Audio", {"event": "stop"}, parent_span=self.main_turn_span, start_time=t)
    self.end_span(span, message="Thinking audio stopped", end_time=t)
    return span

Creates a 'Stopped Thinking Audio' point-in-time span at session level.

def create_unified_turn_trace(self,
turn: TurnMetrics,
session: Any = None) ‑> None
Expand source code
def create_unified_turn_trace(self, turn: TurnMetrics, session: Any = None) -> None:
    """
    Creates a full trace for a single turn from the unified TurnMetrics schema.
    Handles both cascading and realtime component spans based on what data is present.
    """
    if not self.main_turn_span:
        return
    self._turn_count += 1
    turn_name = f"Turn #{self._turn_count}"

    # Determine turn start time dynamically to encompass all child spans
    start_times = []
    if turn.user_speech_start_time:
        start_times.append(turn.user_speech_start_time)
    if turn.stt_metrics and turn.stt_metrics[0].stt_start_time:
        start_times.append(turn.stt_metrics[0].stt_start_time)
    if turn.llm_metrics and turn.llm_metrics[0].llm_start_time:
        start_times.append(turn.llm_metrics[0].llm_start_time)
    if turn.tts_metrics and turn.tts_metrics[0].tts_start_time:
        start_times.append(turn.tts_metrics[0].tts_start_time)
    if turn.eou_metrics and turn.eou_metrics[0].eou_start_time:
        start_times.append(turn.eou_metrics[0].eou_start_time)
    if turn.timeline_event_metrics:
        for ev in turn.timeline_event_metrics:
            if ev.start_time:
                start_times.append(ev.start_time)
                
    turn_span_start_time = min(start_times) if start_times else None

    turn_span = create_span(turn_name, parent_span=self.main_turn_span, start_time=turn_span_start_time)

    if not turn_span:
        return

    with trace.use_span(turn_span, end_on_exit=False):

        # --- VAD errors ---
        def create_vad_span(vad: VadMetrics):
            vad_errors = [e for e in turn.errors if e.get("source") == "VAD"]
            if vad_errors or turn.vad_metrics:
                vad_class = turn.session_metrics.provider_per_component.get("vad", {}).get("provider_class")
                vad_model = turn.session_metrics.provider_per_component.get("vad", {}).get("model_name")
                vad_span_name = f"{vad_class}: VAD Processing"
                
                vad_attrs = {}
                if not vad_class:
                    return
                if vad_class:
                    vad_attrs["provider_class"] = vad_class
                if vad_model:
                    vad_attrs["model_name"] = vad_model
                if vad.vad_min_silence_duration:
                    vad_attrs["min_silence_duration"] = vad.vad_min_silence_duration
                if vad.vad_min_speech_duration:
                    vad_attrs["min_speech_duration"] = vad.vad_min_speech_duration
                if vad.vad_threshold:
                    vad_attrs["threshold"] = vad.vad_threshold

            # Calculate span start time: end_of_speech_time - min_silence_duration
            if vad.user_speech_start_time is None and vad.user_speech_end_time is not None:
                vad.user_speech_start_time = vad.user_speech_end_time - vad.vad_min_silence_duration
            elif vad.user_speech_start_time is not None and vad.user_speech_end_time is None:
                vad.user_speech_end_time = vad.user_speech_start_time + vad.vad_min_silence_duration

            vad_start_time = vad.user_speech_start_time
            vad_end_time = vad.user_speech_end_time
            vad_span = create_span(vad_span_name, vad_attrs, parent_span=turn_span, start_time=vad_start_time)
            
            if vad_span:
                for error in vad_errors:
                    vad_span.add_event("error", attributes={
                        "message": error["message"],
                        "timestamp": error["timestamp"],
                        "source": error["source"]
                    })
                    with trace.use_span(vad_span):
                        vad_error_span = create_span("VAD Error", {"message": error["message"]}, parent_span=vad_span, start_time=error["timestamp"])
                        self.end_span(vad_error_span, end_time=error["timestamp"]+0.100)
                        vad_errors.remove(error)
                
                vad_status = StatusCode.ERROR if vad_errors else StatusCode.OK
                self.end_span(vad_span, status_code=vad_status, end_time=vad_end_time)

        vad_list = turn.vad_metrics if turn.vad_metrics else None
        if vad_list:
            for vad in vad_list:
                try:
                    create_vad_span(vad)
                except Exception as e:
                    logger.error(f"Error creating VAD span: {e}")

        # --- Interruption span ---
        def create_interruption_span(interruption: InterruptionMetrics):
            if interruption.false_interrupt_start_time and interruption.false_interrupt_end_time:
                false_interrupt_attrs = {}
                if interruption.interrupt_mode:
                    false_interrupt_attrs["interrupt_mode"] = interruption.interrupt_mode
                if interruption.false_interrupt_pause_duration:
                    false_interrupt_attrs["pause_duration_config"] = interruption.false_interrupt_pause_duration
                if interruption.false_interrupt_duration:
                    false_interrupt_attrs["false_interrupt_duration"] = interruption.false_interrupt_duration
                if interruption.resumed_after_false_interrupt:
                    false_interrupt_attrs["resumed_after_false_interrupt"] = True
                if interruption.false_interrupt_duration:
                    false_interrupt_attrs["actual_duration"] = interruption.false_interrupt_duration
                
                false_interrupt_end = interruption.false_interrupt_end_time
                if false_interrupt_end is None:
                    # False interrupt was followed by true interrupt
                    false_interrupt_end = interruption.interrupt_start_time
                
                false_interrupt_span_name = "False Interruption (Resumed)" if interruption.resumed_after_false_interrupt else "False Interruption (Escalated)"
                false_interrupt_span = create_span(false_interrupt_span_name, false_interrupt_attrs, parent_span=turn_span, start_time=interruption.false_interrupt_start_time)
                self.end_span(false_interrupt_span, message="False interruption detected", end_time=false_interrupt_end)

            if turn.is_interrupted:
                interrupted_attrs = {}
                if interruption.interrupt_mode:
                    interrupted_attrs["interrupt_mode"] = interruption.interrupt_mode
                if interruption.interrupt_min_duration:
                    interrupted_attrs["interrupt_min_duration"] = interruption.interrupt_min_duration
                if interruption.interrupt_min_words:
                    interrupted_attrs["interrupt_min_words"] = interruption.interrupt_min_words
                if interruption.false_interrupt_pause_duration:
                    interrupted_attrs["false_interrupt_pause_duration"] = interruption.false_interrupt_pause_duration
                if interruption.resume_on_false_interrupt:
                    interrupted_attrs["resume_on_false_interrupt"] = interruption.resume_on_false_interrupt
                if interruption.interrupt_reason:
                    interrupted_attrs["interrupt_reason"] = interruption.interrupt_reason
                if interruption.interrupt_words:
                    interrupted_attrs["interrupt_words"] = interruption.interrupt_words
                if interruption.interrupt_duration:
                    interrupted_attrs["interrupt_duration"] = interruption.interrupt_duration
                # Mark if this was preceded by a false interrupt
                if interruption.false_interrupt_start_time is not None:
                    interrupted_attrs["preceded_by_false_interrupt"] = True
                
                interrupted_span = create_span("Turn Interrupted", interrupted_attrs, parent_span=turn_span, start_time=interruption.interrupt_start_time)
        
            # Calculate interrupt end time with proper None checks
            if interruption.interrupt_start_time is not None:
                if interruption.interrupt_duration is not None:
                    interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_duration
                elif interruption.interrupt_min_duration is not None:
                    interruption.interrupt_end_time = interruption.interrupt_start_time + interruption.interrupt_min_duration
                else:
                    interruption.interrupt_end_time = interruption.interrupt_start_time
        
            self.end_span(interrupted_span, message="Agent was interrupted", end_time=interruption.interrupt_end_time) 

        
        if turn.interruption_metrics:
            try:
                create_interruption_span(turn.interruption_metrics)
            except Exception as e:
                logger.error(f"Error creating interruption span: {e}")
            
        
        # --- Fallback spans ---
        def create_fallback_span(fallback: FallbackEvent):
            is_recovery = fallback.is_recovery
            if is_recovery:
                fallback_span_name = f"Recovery: {fallback.component_type}"
                fallback_attrs = {
                    "temporary_disable_sec": fallback.temporary_disable_sec,
                    "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts,
                    "recovery_attempt": fallback.recovery_attempt,
                    "message": fallback.message,
                    "restored_provider": fallback.new_provider_label,
                    "previous_provider": fallback.original_provider_label,
                }
                span_time = fallback.start_time
                recovery_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time)
                if recovery_span:
                    self.end_span(recovery_span, message="Recovery completed", end_time=fallback.end_time)
                    return
            
            fallback_span_name = f"Fallback: {fallback.component_type}"
            
            fallback_attrs = {
                "temporary_disable_sec": fallback.temporary_disable_sec,
                "permanent_disable_after_attempts": fallback.permanent_disable_after_attempts,
                "recovery_attempt": fallback.recovery_attempt,
                "message": fallback.message,
            }
            
            # Use same start_time for all spans (instant spans)
            span_time = fallback.start_time
            fallback_span = create_span(fallback_span_name, fallback_attrs, parent_span=turn_span, start_time=span_time)
            
            if fallback_span:
                # Child trace for original connection attempt (if exists)
                if fallback.original_provider_label:
                    original_conn_attrs = {
                        "provider": fallback.original_provider_label,
                        "status": "failed"
                    }
                    
                    original_conn_span = create_span(
                        f"Connection: {fallback.original_provider_label}",
                        original_conn_attrs,
                        parent_span=fallback_span,
                        start_time=span_time
                    )
                    self.end_span(original_conn_span, status_code=StatusCode.ERROR, end_time=span_time)
                
                # Child trace for new connection attempt (if switched successfully)
                if fallback.new_provider_label:
                    new_conn_attrs = {
                        "provider": fallback.new_provider_label,
                        "status": "success"
                    }
                    
                    new_conn_span = create_span(
                        f"Connection: {fallback.new_provider_label}",
                        new_conn_attrs,
                        parent_span=fallback_span,
                        start_time=span_time
                    )
                    self.end_span(new_conn_span, status_code=StatusCode.OK, end_time=span_time)
                
                # End the fallback span - status depends on whether we successfully switched
                fallback_status = StatusCode.OK if fallback.new_provider_label else StatusCode.ERROR
                self.end_span(fallback_span, status_code=fallback_status, end_time=span_time)

        if turn.fallback_events:
            for fallback in turn.fallback_events:
                try:
                    create_fallback_span(fallback)
                except Exception as e:
                    logger.error(f"Error creating fallback span: {e}")

        # --- STT spans ---
        def create_stt_span(stt: SttMetrics):
            stt_errors = [e for e in turn.errors if e.get("source") == "STT"]
            if stt or stt_errors:

                stt_attrs = {}
                if stt:
                    stt_class = turn.session_metrics.provider_per_component.get("stt", {}).get("provider_class")
                    if stt_class:
                        stt_attrs["provider_class"] = stt_class
                    
                    stt_model = turn.session_metrics.provider_per_component.get("stt", {}).get("model_name")
                    stt_attrs["input"] = "N/A"
                    if stt_model:
                        stt_attrs["model_name"] = stt_model
                    if stt.stt_latency is not None:
                        stt_attrs["duration_ms"] = stt.stt_latency
                    if stt.stt_start_time:
                        stt_attrs["start_timestamp"] = stt.stt_start_time
                    if stt.stt_end_time:
                        stt_attrs["end_timestamp"] = stt.stt_end_time
                    if stt.stt_transcript:
                        stt_attrs["output"] = stt.stt_transcript
                    if stt_class =="DeepgramSTTV2" and turn.preemtive_generation_enabled:
                        stt_attrs["stt_preemptive_generation_enabled"] = turn.preemtive_generation_enabled
                
                stt_span_name = f"{stt_class}: Speech to Text Processing"
                stt_span = create_span(
                    stt_span_name, stt_attrs,
                    parent_span=turn_span,
                    start_time=stt.stt_start_time if stt else None,
                )

                if stt.stt_preemptive_generation_enabled:
                    with trace.use_span(stt_span):
                        preemptive_attributes = {
                            "preemptive_generation_occurred": stt.stt_preemptive_generation_occurred,
                            "partial_text": stt.stt_preflight_transcript,
                            "final_text": stt.stt_transcript,
                        }
                    if stt.stt_preemptive_generation_occurred:
                        preemptive_attributes["preemptive_generation_latency"] = stt.stt_preflight_latency
                    preemptive_span = create_span("Preemptive Generation", preemptive_attributes, parent_span=stt_span, start_time=stt.stt_start_time)
                    preemptive_end_time = stt.stt_preflight_end_time or stt.stt_end_time
                    self.end_span(preemptive_span, end_time=preemptive_end_time)


                if stt_span:
                    for error in stt_errors:
                        stt_span.add_event("error", attributes={
                            "message": error.get("message", ""),
                            "timestamp": error.get("timestamp", ""),
                        })
                        if stt.stt_start_time <= error.get("timestamp") <= stt.stt_end_time:
                            with trace.use_span(stt_span):
                                stt_error_span = create_span("STT Error", {"message": error.get("message", "")}, parent_span=stt_span, start_time=error.get("timestamp"))
                                self.end_span(stt_error_span, end_time=error.get("timestamp")+0.100)
                                stt_errors.remove(error)
                    status = StatusCode.ERROR if stt_errors else StatusCode.OK
                    self.end_span(stt_span, status_code=status, end_time=stt.stt_end_time if stt else None)
        
        stt_list = turn.stt_metrics if turn.stt_metrics else None
        if stt_list:
            for stt in stt_list:
                try:
                    create_stt_span(stt)
                except Exception as e:
                    logger.error(f"Error creating STT span: {e}")

        # --- EOU spans ---
        def create_eou_span(eou: EouMetrics):
            eou_errors = [e for e in turn.errors if e.get("source") == "TURN-D"]

            eou_attrs = {}
            if eou:
                eou_class = turn.session_metrics.provider_per_component.get("eou", {}).get("provider_class")
                eou_model = turn.session_metrics.provider_per_component.get("eou", {}).get("model_name")
                
                if eou_class:
                    eou_attrs["provider_class"] = eou_class
                if eou_model:
                    eou_attrs["model_name"] = eou_model
                if turn.user_speech:
                    eou_attrs["input"] = turn.user_speech
                if eou.eou_latency is not None:
                    eou_attrs["duration_ms"] = eou.eou_latency
                if eou.eou_start_time:
                    eou_attrs["start_timestamp"] = eou.eou_start_time
                if eou.eou_end_time:
                    eou_attrs["end_timestamp"] = eou.eou_end_time
                if eou.waited_for_additional_speech:
                    eou_attrs["waited_for_additional_speech"] = eou.waited_for_additional_speech
                if eou.eou_probability:
                    eou_attrs["eou_probability"] = round(eou.eou_probability, 4)
                if turn.session_metrics.eou_config.get("min_speech_wait_timeout"):
                    eou_attrs["min_speech_wait_timeout"] = turn.session_metrics.eou_config.get("min_speech_wait_timeout")
                if turn.session_metrics.eou_config.get("max_speech_wait_timeout"):
                    eou_attrs["max_speech_wait_timeout"] = turn.session_metrics.eou_config.get("max_speech_wait_timeout")

                eou_span_name = f"{eou_class}: End-Of-Utterance Detection"

            eou_span = create_span(
                eou_span_name, eou_attrs,
                parent_span=turn_span,
                start_time=eou.eou_start_time if eou else None,
            )
            if eou.waited_for_additional_speech:
                delay = round(eou.wait_for_additional_speech_duration, 4)
                with trace.use_span(eou_span):
                    wait_for_additional_speech_span =create_span (
                        "Wait for Additional Speech",
                        {
                            "wait_for_additional_speech_duration":delay,
                            "eou_probability": round(eou.eou_probability, 4),
                        },
                        start_time=eou.eou_end_time,
                    )
                    self.end_span(wait_for_additional_speech_span, status_code=StatusCode.OK, end_time=eou.eou_end_time + delay)
            
            if eou_span:
                for error in eou_errors:
                    eou_span.add_event("error", attributes={
                        "message": error.get("message", ""),
                        "timestamp": error.get("timestamp", ""),
                    })
                    with trace.use_span(eou_span):
                        eou_error_span = create_span("EOU Error", {"message": error.get("message", "")}, parent_span=eou_span, start_time=error.get("timestamp"))
                        self.end_span(eou_error_span, end_time=error.get("timestamp")+0.100)
                        eou_errors.remove(error)
                eou_status = StatusCode.ERROR if eou_errors else StatusCode.OK
                self.end_span(eou_span, status_code=eou_status, end_time=eou.eou_end_time if eou else None)

        eou_list = turn.eou_metrics if turn.eou_metrics else None
        if eou_list:
            for eou in eou_list:
                try:
                    create_eou_span(eou)
                except Exception as e:
                    logger.error(f"Error creating EOU span: {e}")


        # --- LLM spans ---
        def create_llm_span(llm: LlmMetrics):
            llm_errors = [e for e in turn.errors if e.get("source") == "LLM"]
            llm_attrs = {}
            if llm:
                llm_class = turn.session_metrics.provider_per_component.get("llm", {}).get("provider_class")
                if llm_class:
                    llm_attrs["provider_class"] = llm_class
                llm_model = turn.session_metrics.provider_per_component.get("llm", {}).get("model_name")
                if llm_model:
                    llm_attrs["model_name"] = llm_model
                if llm.llm_input:
                    llm_attrs["input"] = llm.llm_input
                if llm.llm_duration:
                    llm_attrs["duration_ms"] = llm.llm_duration
                if llm.llm_start_time:
                    llm_attrs["start_timestamp"] = llm.llm_start_time
                if llm.llm_end_time:
                    llm_attrs["end_timestamp"] = llm.llm_end_time
                if turn.agent_speech:
                    llm_attrs["output"] = turn.agent_speech
                if llm.prompt_tokens:
                    llm_attrs["input_tokens"] = llm.prompt_tokens
                if llm.completion_tokens:
                    llm_attrs["output_tokens"] = llm.completion_tokens
                if llm.prompt_cached_tokens:
                    llm_attrs["cached_input_tokens"] = llm.prompt_cached_tokens
                if llm.total_tokens:
                    llm_attrs["total_tokens"] = llm.total_tokens

            llm_span_name = f"{llm_class}: LLM Processing"
            llm_span = create_span(
                llm_span_name, llm_attrs,
                parent_span=turn_span,
                start_time=llm.llm_start_time if llm else None,
            )
            if llm_span:
                # Tool call sub-spans
                if turn.function_tool_timestamps:
                    for tool_data in turn.function_tool_timestamps:
                        tool_timestamp = tool_data.get("timestamp")
                        tool_span = create_span(
                            f"Invoked Tool: {tool_data.get('tool_name', 'unknown')}",
                            parent_span=llm_span,
                            start_time=tool_timestamp,
                        )
                        self.end_span(tool_span, end_time=tool_timestamp)

                for error in llm_errors:
                    llm_span.add_event("error", attributes={
                        "message": error.get("message", ""),
                        "timestamp": error.get("timestamp", ""),
                    })
                    with trace.use_span(llm_span):
                        llm_error_span = create_span("LLM Error", {"message": error.get("message", "")}, parent_span=llm_span, start_time=error.get("timestamp"))
                        self.end_span(llm_error_span, end_time=error.get("timestamp")+0.100)
                        llm_errors.remove(error)

                # TTFT sub-span
                if llm and llm.llm_ttft is not None and llm.llm_start_time is not None:
                    ttft_span = create_span(
                        "Time to First Token",
                        attributes={"llm_ttft": llm.llm_ttft},
                        parent_span=llm_span,
                        start_time=llm.llm_start_time,
                    )
                    ttft_end = llm.llm_start_time + (llm.llm_ttft / 1000)
                    self.end_span(ttft_span, end_time=ttft_end)

                llm_status = StatusCode.ERROR if llm_errors else StatusCode.OK
                self.end_span(llm_span, status_code=llm_status, end_time=llm.llm_end_time if llm else None)

        llm_list = turn.llm_metrics if turn.llm_metrics else None
        if llm_list:
            for llm in llm_list:
                try:
                    create_llm_span(llm)
                except Exception as e:
                    logger.error(f"Error creating LLM span: {e}")

        # --- TTS spans ---
        def create_tts_span(tts: TtsMetrics):
            tts_errors = [e for e in turn.errors if e.get("source") == "TTS"]
            tts_attrs = {}
            if tts:
                tts_class = turn.session_metrics.provider_per_component.get("tts", {}).get("provider_class")
                tts_model = turn.session_metrics.provider_per_component.get("tts", {}).get("model_name")
                if tts_class:
                    tts_attrs["provider_class"] = tts_class
                if tts_model:
                    tts_attrs["model_name"] = tts_model
                if turn.agent_speech:
                    tts_attrs["input"] = turn.agent_speech
                if tts.tts_duration:
                    tts_attrs["duration_ms"] = tts.tts_duration
                if tts.tts_start_time:
                    tts_attrs["start_timestamp"] = tts.tts_start_time
                if tts.tts_end_time:
                    tts_attrs["end_timestamp"] = tts.tts_end_time
                if tts.tts_characters:
                    tts_attrs["characters"] = tts.tts_characters
                if turn.agent_speech_duration:
                    tts_attrs["audio_duration_ms"] = turn.agent_speech_duration
                tts_attrs["output"] = "N/A"

            tts_span_name = f"{tts_class}: Text to Speech Processing"
            tts_span = create_span(
                tts_span_name, tts_attrs,
                parent_span=turn_span,
                start_time=tts.tts_start_time if tts else None,
            )

            if tts_span:
                # TTFB sub-span
                if tts and tts.tts_first_byte_time is not None:
                    ttfb_span = create_span(
                        "Time to First Byte",
                        parent_span=tts_span,
                        start_time=tts.tts_start_time,
                    )
                    self.end_span(ttfb_span, end_time=tts.tts_first_byte_time)

                for error in tts_errors:
                    tts_span.add_event("error", attributes={
                        "message": error.get("message", ""),
                        "timestamp": error.get("timestamp", ""),
                    })
                    with trace.use_span(tts_span):
                        tts_error_span = create_span("TTS Error", {"message": error.get("message", "")}, parent_span=tts_span, start_time=error.get("timestamp"))
                        self.end_span(tts_error_span, end_time=error.get("timestamp")+0.100)
                        tts_errors.remove(error)

                tts_status = StatusCode.ERROR if tts_errors else StatusCode.OK
                self.end_span(tts_span, status_code=tts_status, end_time=tts.tts_end_time if tts else None)

        tts_list = turn.tts_metrics if turn.tts_metrics else None
        if tts_list:
            for tts in tts_list:
                try:
                    create_tts_span(tts)
                except Exception as e:
                    logger.error(f"Error creating TTS span: {e}")

        # --- KB spans ---
        def create_kb_span(kb: KbMetrics):
            kb_span_name = "Knowledge Base: Retrieval"
            kb_attrs = {}
            if turn.user_speech:
                kb_attrs["input"] = turn.user_speech
            if kb.kb_retrieval_latency:
                kb_attrs["retrieval_latency_ms"] = kb.kb_retrieval_latency
            if kb.kb_start_time:
                kb_attrs["start_timestamp"] = kb.kb_start_time
            if kb.kb_end_time:
                kb_attrs["end_timestamp"] = kb.kb_end_time
            if kb.kb_documents:
                # Join documents as comma-separated string for readability
                kb_attrs["documents"] = ", ".join(kb.kb_documents) if len(kb.kb_documents) <= 5 else f"{len(kb.kb_documents)} documents"
                kb_attrs["document_count"] = len(kb.kb_documents)
            if kb.kb_scores:
                # Include scores as comma-separated string
                kb_attrs["scores"] = ", ".join([str(round(s, 4)) for s in kb.kb_scores[:5]])
            
            kb_span = create_span(kb_span_name, kb_attrs, parent_span=turn_span, start_time=kb.kb_start_time)
            if kb_span:
                self.end_span(kb_span, status_code=StatusCode.OK, end_time=kb.kb_end_time)

        if turn.kb_metrics:
            for kb in turn.kb_metrics:
                try:
                    create_kb_span(kb)
                except Exception as e:
                    logger.error(f"Error creating KB span: {e}")

        # --- Realtime spans (for S2S modes) ---
        def create_rt_span(rt: RealtimeMetrics):
            rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"]

            rt_attrs = {}
            if rt:
                rt_class = turn.session_metrics.provider_per_component.get("realtime", {}).get("provider_class")
                if rt_class:
                    rt_attrs["provider_class"] = rt_class
                rt_model = turn.session_metrics.provider_per_component.get("realtime", {}).get("model_name")
                if rt_model:
                    rt_attrs["model_name"] = rt_model

            rt_start_time = turn.user_speech_end_time if turn.user_speech_end_time else turn.agent_speech_start_time
            rt_end_time = turn.agent_speech_start_time
            
            # if turn.timeline_event_metrics:
            #     for event in turn.timeline_event_metrics:
            #         if event.event_type == "user_speech":
            #             rt_start_time = event.end_time
            #             break

            #     for event in turn.timeline_event_metrics:
            #         if event.event_type == "agent_speech":
            #             rt_end_time = event.start_time
            #             break
            
            
            rt_span_name = f"{rt_class}: Realtime Processing"
            rt_span = create_span(
                rt_span_name, rt_attrs,
                parent_span=turn_span,
                start_time=rt_start_time,
            )
            if rt_span:
                # Realtime tool calls
                if turn.function_tools_called:
                    for tool_name in turn.function_tools_called:
                        tool_span = create_span(
                            f"Invoked Tool: {tool_name}",
                            parent_span=turn_span,
                            start_time=time.perf_counter(),
                        )
                        self.end_span(tool_span, end_time=time.perf_counter())

            # TTFB span for realtime
            if turn.e2e_latency is not None:
                ttfb_span = create_span(
                    "Time to First Word",
                    {"duration_ms": turn.e2e_latency},
                    parent_span=rt_span,
                    start_time=rt_start_time,
                )
                self.end_span(ttfb_span, end_time=rt_end_time)

            # --- Realtime model errors ---
            rt_errors = [e for e in turn.errors if e.get("source") == "REALTIME"]
            if rt_errors:
                for error in rt_errors:
                    turn_span.add_event("Errors", attributes={
                        "message": error.get("message", "Unknown error"),
                        "timestamp": error.get("timestamp", "N/A"),
                    })
                    with trace.use_span(turn_span):
                        rt_error_span = create_span("Realtime Error", {"message": error.get("message", "")}, parent_span=turn_span, start_time=error.get("timestamp"))
                        self.end_span(rt_error_span, end_time=error.get("timestamp")+0.100)
                        rt_errors.remove(error)
            self.end_span(rt_span, status_code=StatusCode.ERROR if rt_errors else StatusCode.OK, end_time=rt_end_time)
        

        rt_list = turn.realtime_metrics if turn.realtime_metrics else None
        if rt_list: 
            for rt in rt_list:
                try:
                    create_rt_span(rt)
                except Exception as e:
                    logger.error(f"Error creating RT span: {e}")

        def create_error_spans(errors:Dict[str, Any]):
            error_span_name = f"{errors.get('source', 'Unknown')} Error span"
            attr={}
            if errors.get('message'):
                attr['error message'] = errors.get('message')
            span_start_time = errors.get('timestamp_perf')
            error_span = create_span(error_span_name, attributes=attr, parent_span=turn_span, start_time=span_start_time)
            self.end_span(error_span, status_code=StatusCode.ERROR, end_time=span_start_time + 0.001)
        
        for e in turn.errors:
            try:
                create_error_spans(e)
            except Exception as e:
                logger.error(f"Error creating error span: {e}")

        # Determine turn end time first for unbounded children spans
        end_times = []
        if turn.tts_metrics and turn.tts_metrics[-1].tts_end_time:
            end_times.append(turn.tts_metrics[-1].tts_end_time)
        if turn.llm_metrics and turn.llm_metrics[-1].llm_end_time:
            end_times.append(turn.llm_metrics[-1].llm_end_time)
        if turn.agent_speech_end_time:
            end_times.append(turn.agent_speech_end_time)
        if turn.eou_metrics and turn.eou_metrics[-1].eou_end_time:
            end_times.append(turn.eou_metrics[-1].eou_end_time)
        if turn.stt_metrics and turn.stt_metrics[-1].stt_end_time:
            end_times.append(turn.stt_metrics[-1].stt_end_time)
        if turn.user_speech_end_time:
            end_times.append(turn.user_speech_end_time)
        if turn.interruption_metrics and turn.interruption_metrics.false_interrupt_end_time:
            end_times.append(turn.interruption_metrics.false_interrupt_end_time)
            
        turn_end_time = max(end_times) if end_times else None

        if turn.is_interrupted or turn_end_time is None:
            turn_end_time = time.perf_counter()


            
                    

        # --- Timeline events ---
        if turn.timeline_event_metrics:
            for event in turn.timeline_event_metrics:
                if event.event_type == "user_speech":
                    user_speech_span = create_span(
                        "User Input Speech",
                        {"Transcript": event.text, "duration_ms": event.duration_ms},
                        parent_span=turn_span,
                        start_time=event.start_time,
                    )
                    self.end_span(user_speech_span, end_time=event.end_time if event.end_time else turn_end_time)
                elif event.event_type == "agent_speech":
                    agent_speech_span = create_span(
                        "Agent Output Speech",
                        {"Transcript": event.text, "duration_ms": event.duration_ms},
                        parent_span=turn_span,
                        start_time=event.start_time,
                    )
                    self.end_span(agent_speech_span, end_time=event.end_time if event.end_time else turn_end_time)
                elif event.event_type == "thinking_audio":
                    thinking_attrs = {}
                    if turn.thinking_audio_file_path:
                        thinking_attrs["file_path"] = turn.thinking_audio_file_path
                    if turn.thinking_audio_looping is not None:
                        thinking_attrs["looping"] = turn.thinking_audio_looping
                    if turn.thinking_audio_override_thinking is not None:
                        thinking_attrs["override_thinking"] = turn.thinking_audio_override_thinking
                    if event.duration_ms is not None:
                        thinking_attrs["duration_ms"] = event.duration_ms
                    thinking_span = create_span(
                        "Thinking Audio",
                        thinking_attrs,
                        parent_span=turn_span,
                        start_time=event.start_time,
                    )
                    self.end_span(thinking_span, end_time=event.end_time if event.end_time else turn_end_time)
                elif event.event_type == "background_audio":
                    bg_attrs = {}
                    if turn.background_audio_file_path:
                        bg_attrs["file_path"] = turn.background_audio_file_path
                    if turn.background_audio_looping is not None:
                        bg_attrs["looping"] = turn.background_audio_looping
                    if event.duration_ms is not None:
                        bg_attrs["duration_ms"] = event.duration_ms
                    bg_span = create_span(
                        "Background Audio",
                        bg_attrs,
                        parent_span=turn_span,
                        start_time=event.start_time,
                    )
                    self.end_span(bg_span, end_time=event.end_time if event.end_time else turn_end_time)



    self.end_span(turn_span, message="End of turn trace.", end_time=turn_end_time)

Creates a full trace for a single turn from the unified TurnMetrics schema. Handles both cascading and realtime component spans based on what data is present.

def end_a2a_communication(self)
Expand source code
def end_a2a_communication(self):
    """Ends the A2A communication parent span."""
    complete_span(self.a2a_span, StatusCode.OK, end_time=time.perf_counter())
    self.a2a_span = None
    self._a2a_turn_count = 0  

Ends the A2A communication parent span.

def end_a2a_trace(self, span: opentelemetry.trace.span.Span | None, message: str = '')
Expand source code
def end_a2a_trace(self, span: Optional[Span], message: str = ""):
    """Ends an A2A trace span."""
    complete_span(span, StatusCode.OK, end_time=time.perf_counter())

Ends an A2A trace span.

def end_agent_session(self)
Expand source code
def end_agent_session(self):
    """Completes the agent session span."""
    if self.main_turn_span:
        self.end_main_turn()
    self.end_span(self.agent_session_span, "Agent session ended", end_time=time.perf_counter())
    self.agent_session_span = None

Completes the agent session span.

def end_agent_session_closed(self)
Expand source code
def end_agent_session_closed(self):
    """Completes the agent session closed span."""
    end_time = time.perf_counter()
    self.end_span(self.agent_session_closed_span, "Agent session closed", end_time=end_time)
    self.agent_session_closed_span = None

Completes the agent session closed span.

def end_agent_session_config(self)
Expand source code
def end_agent_session_config(self):
    """Completes the agent session config span."""
    end_time = time.perf_counter()
    self.end_span(self.agent_session_config_span, "Agent session config ended", end_time=end_time)
    self.agent_session_config_span = None

Completes the agent session config span.

def end_main_turn(self)
Expand source code
def end_main_turn(self):
    """Completes the main turn span."""
    self.end_span(self.main_turn_span, "All turns processed", end_time=time.perf_counter())
    self.main_turn_span = None

Completes the main turn span.

def end_span(self,
span: opentelemetry.trace.span.Span | None,
message: str = '',
status_code: opentelemetry.trace.status.StatusCode = StatusCode.OK,
end_time: float | None = None)
Expand source code
def end_span(self, span: Optional[Span], message: str = "", status_code: StatusCode = StatusCode.OK, end_time: Optional[float] = None):
    """Completes a given span with a status."""
    if span:
        if end_time is None:
            end_time = time.perf_counter()
        desc = message if status_code == StatusCode.ERROR else ""
        complete_span(span, status_code, desc, end_time)

Completes a given span with a status.

def set_session_id(self, session_id: str)
Expand source code
def set_session_id(self, session_id: str):
    """Set the session ID for the trace manager."""
    self.session_id = session_id

Set the session ID for the trace manager.

def set_session_metrics(self,
session_metrics: SessionMetrics)
Expand source code
def set_session_metrics(self, session_metrics: SessionMetrics):
    """Set the session metrics for the trace manager."""
    self.session_metrics = session_metrics

Set the session metrics for the trace manager.

def start_agent_joined_meeting(self, attributes: Dict[str, Any])
Expand source code
def start_agent_joined_meeting(self, attributes: Dict[str, Any]):
    """Starts the root span for the agent joining a meeting."""
    if self.root_span:
        return
    
    agent_name = attributes.get('agent_name', 'UnknownAgent')
    agent_id = attributes.get('peerId', 'UnknownID')

    span_name = f"Agent Session: agentName_{agent_name}_agentId_{agent_id}"

    start_time = attributes.get('start_time', time.perf_counter())
    self.root_span = create_span(span_name, attributes, start_time=start_time)

    # Always set root_span_ready so downstream awaits don't hang
    # when telemetry is not initialized (create_span returns None)
    self.root_span_ready.set()

Starts the root span for the agent joining a meeting.

async def start_agent_session(self, attributes: Dict[str, Any])
Expand source code
async def start_agent_session(self, attributes: Dict[str, Any]):
    """Starts the span for the agent's session, child of the root span."""
    await self.root_span_ready.wait()
    if not self.root_span:
        return

    if self.agent_session_span:
        return

    start_time = attributes.get('start_time', time.perf_counter())
    p_m =[]
    a_p_m = []
    for p in self.participant_metrics:
        if p.kind == "user":
            p_m.append(asdict(p))
        else:
            a_p_m.append(asdict(p))
    attributes["participant_metrics"] = p_m
    attributes["agent_participant_metrics"] = a_p_m
    self.agent_session_span = create_span("Session Started", attributes, parent_span=self.root_span, start_time=start_time)
    
    self.start_main_turn()

Starts the span for the agent's session, child of the root span.

async def start_agent_session_closed(self, attributes: Dict[str, Any])
Expand source code
async def start_agent_session_closed(self, attributes: Dict[str, Any]):
    """Starts the span for agent session closed."""
    await self.root_span_ready.wait()
    if not self.root_span:
        return

    if self.agent_session_closed_span:
        return

    start_time = attributes.get('start_time', time.perf_counter())
    self.agent_session_closed_span = create_span("Agent Session Closed", attributes, parent_span=self.root_span, start_time=start_time)

Starts the span for agent session closed.

async def start_agent_session_config(self, attributes: Dict[str, Any])
Expand source code
async def start_agent_session_config(self, attributes: Dict[str, Any]):
    """Starts the span for the agent's session configuration, child of the root span."""
    await self.root_span_ready.wait()
    if not self.root_span:
        return

    if self.agent_session_config_span:
        return

    start_time = attributes.get('start_time', time.perf_counter())
    self.agent_session_config_span = create_span("Session Configuration", attributes, parent_span=self.root_span, start_time=start_time)
    if self.agent_session_config_span:
        with trace.use_span(self.agent_session_config_span):
            self.end_agent_session_config()

Starts the span for the agent's session configuration, child of the root span.

def start_main_turn(self)
Expand source code
def start_main_turn(self):
    """Starts a parent span for all user-agent turn."""
    if not self.agent_session_span:
        return

    if self.main_turn_span:
        return
        
    start_time = time.perf_counter()
    self.main_turn_span = create_span("User & Agent Turns", parent_span=self.agent_session_span, start_time=start_time)

Starts a parent span for all user-agent turn.

class TurnMetrics (turn_id: str = '',
is_interrupted: bool = False,
user_speech_start_time: float | None = None,
user_speech_end_time: float | None = None,
user_speech_duration: float | None = None,
user_speech: str | None = None,
agent_speech_start_time: float | None = None,
agent_speech_end_time: float | None = None,
agent_speech_duration: float | None = None,
agent_speech: str | None = None,
e2e_latency: float | None = None,
function_tools_called: List[str] = <factory>,
function_tool_timestamps: List[Dict[str, Any]] = <factory>,
is_a2a_enabled: bool = False,
handoff_occurred: bool = False,
errors: List[Dict[str, Any]] = <factory>,
timestamp: float = <factory>,
preemtive_generation_enabled: bool = False,
vad_metrics: List[VadMetrics] = <factory>,
stt_metrics: List[SttMetrics] = <factory>,
eou_metrics: List[EouMetrics] = <factory>,
kb_metrics: List[KbMetrics] = <factory>,
llm_metrics: List[LlmMetrics] = <factory>,
tts_metrics: List[TtsMetrics] = <factory>,
interruption_metrics: InterruptionMetrics | None = None,
realtime_metrics: List[RealtimeMetrics] = <factory>,
timeline_event_metrics: List[TimelineEvent] = <factory>,
fallback_events: List[FallbackEvent] = <factory>,
function_tool_metrics: List[FunctionToolMetrics] = <factory>,
mcp_tool_metrics: List[McpToolMetrics] = <factory>,
session_metrics: SessionMetrics | None = <factory>,
background_audio_file_path: str | None = None,
background_audio_looping: bool | None = None,
thinking_audio_file_path: str | None = None,
thinking_audio_looping: bool | None = None,
thinking_audio_override_thinking: bool | None = None)
Expand source code
@dataclass
class TurnMetrics:
    """Single turn with lists of per-component metrics."""
    turn_id: str = ""
    is_interrupted: bool = False
    user_speech_start_time: Optional[float] = None
    user_speech_end_time: Optional[float] = None
    user_speech_duration: Optional[float] = None
    user_speech: Optional[str] = None
    agent_speech_start_time: Optional[float] = None
    agent_speech_end_time: Optional[float] = None
    agent_speech_duration: Optional[float] = None
    agent_speech: Optional[str] = None
    e2e_latency: Optional[float] = None
    function_tools_called: List[str] = field(default_factory=list)
    function_tool_timestamps: List[Dict[str, Any]] = field(default_factory=list)
    is_a2a_enabled: bool = False
    handoff_occurred: bool = False
    errors: List[Dict[str, Any]] = field(default_factory=list)
    timestamp: float = field(default_factory=time.time)
    preemtive_generation_enabled: bool = False

    vad_metrics: List[VadMetrics] = field(default_factory=list)
    stt_metrics: List[SttMetrics] = field(default_factory=list)
    eou_metrics: List[EouMetrics] = field(default_factory=list)
    kb_metrics: List[KbMetrics] = field(default_factory=list)
    llm_metrics: List[LlmMetrics] = field(default_factory=list)
    tts_metrics: List[TtsMetrics] = field(default_factory=list)
    interruption_metrics: Optional[InterruptionMetrics] = None
    realtime_metrics: List[RealtimeMetrics] = field(default_factory=list)
    timeline_event_metrics: List[TimelineEvent] = field(default_factory=list)
    fallback_events: List[FallbackEvent] = field(default_factory=list)
    function_tool_metrics: List[FunctionToolMetrics] = field(default_factory=list)
    mcp_tool_metrics: List[McpToolMetrics] = field(default_factory=list)
    session_metrics: Optional[SessionMetrics] = field(default_factory=SessionMetrics)

    # Background audio
    background_audio_file_path: Optional[str] = None
    background_audio_looping: Optional[bool] = None

    # Thinking audio
    thinking_audio_file_path: Optional[str] = None
    thinking_audio_looping: Optional[bool] = None
    thinking_audio_override_thinking: Optional[bool] = None

    def compute_e2e_latency(self) -> None:
        """Calculate E2E latency by summing component latencies (STT + EOU + LLM TTFT + TTS)."""
        e2e_components = []

        if self.stt_metrics:
            stt = self.stt_metrics[-1]
            if stt.stt_latency is not None:
                e2e_components.append(stt.stt_latency)

        if self.eou_metrics:
            eou = self.eou_metrics[-1]
            if eou.eou_latency is not None:
                e2e_components.append(eou.eou_latency)

        if self.llm_metrics:
            llm = self.llm_metrics[-1]
            if llm.llm_ttft is not None:
                e2e_components.append(llm.llm_ttft)

        if self.tts_metrics:
            tts = self.tts_metrics[-1]
            if tts.tts_latency is not None:
                e2e_components.append(tts.tts_latency)

        if self.realtime_metrics:
            rt = self.realtime_metrics[-1]
            if rt.realtime_ttfb is not None:
                e2e_components.append(rt.realtime_ttfb)

        if e2e_components:
            self.e2e_latency = round(sum(e2e_components), 4)

    def to_dict(self) -> Dict[str, Any]:
        """Serialize turn metrics to a dictionary."""
        self.compute_e2e_latency()
        return asdict(self)

Single turn with lists of per-component metrics.

Instance variables

var agent_speech : str | None
var agent_speech_duration : float | None
var agent_speech_end_time : float | None
var agent_speech_start_time : float | None
var background_audio_file_path : str | None
var background_audio_looping : bool | None
var e2e_latency : float | None
var eou_metrics : List[EouMetrics]
var errors : List[Dict[str, Any]]
var fallback_events : List[FallbackEvent]
var function_tool_metrics : List[FunctionToolMetrics]
var function_tool_timestamps : List[Dict[str, Any]]
var function_tools_called : List[str]
var handoff_occurred : bool
var interruption_metricsInterruptionMetrics | None
var is_a2a_enabled : bool
var is_interrupted : bool
var kb_metrics : List[KbMetrics]
var llm_metrics : List[LlmMetrics]
var mcp_tool_metrics : List[McpToolMetrics]
var preemtive_generation_enabled : bool
var realtime_metrics : List[RealtimeMetrics]
var session_metricsSessionMetrics | None
var stt_metrics : List[SttMetrics]
var thinking_audio_file_path : str | None
var thinking_audio_looping : bool | None
var thinking_audio_override_thinking : bool | None
var timeline_event_metrics : List[TimelineEvent]
var timestamp : float
var tts_metrics : List[TtsMetrics]
var turn_id : str
var user_speech : str | None
var user_speech_duration : float | None
var user_speech_end_time : float | None
var user_speech_start_time : float | None
var vad_metrics : List[VadMetrics]

Methods

def compute_e2e_latency(self) ‑> None
Expand source code
def compute_e2e_latency(self) -> None:
    """Calculate E2E latency by summing component latencies (STT + EOU + LLM TTFT + TTS)."""
    e2e_components = []

    if self.stt_metrics:
        stt = self.stt_metrics[-1]
        if stt.stt_latency is not None:
            e2e_components.append(stt.stt_latency)

    if self.eou_metrics:
        eou = self.eou_metrics[-1]
        if eou.eou_latency is not None:
            e2e_components.append(eou.eou_latency)

    if self.llm_metrics:
        llm = self.llm_metrics[-1]
        if llm.llm_ttft is not None:
            e2e_components.append(llm.llm_ttft)

    if self.tts_metrics:
        tts = self.tts_metrics[-1]
        if tts.tts_latency is not None:
            e2e_components.append(tts.tts_latency)

    if self.realtime_metrics:
        rt = self.realtime_metrics[-1]
        if rt.realtime_ttfb is not None:
            e2e_components.append(rt.realtime_ttfb)

    if e2e_components:
        self.e2e_latency = round(sum(e2e_components), 4)

Calculate E2E latency by summing component latencies (STT + EOU + LLM TTFT + TTS).

def to_dict(self) ‑> Dict[str, Any]
Expand source code
def to_dict(self) -> Dict[str, Any]:
    """Serialize turn metrics to a dictionary."""
    self.compute_e2e_latency()
    return asdict(self)

Serialize turn metrics to a dictionary.