Skip to main content
Version: 1.0.x

Pipeline Observability

Pipeline Observability gives you first-class visibility into what your agent is doing at runtime. You can capture component-level latency and token usage, observe recording lifecycle events, centralize error handling across the Pipeline, and inspect session context for post-processing — all through a small set of decorators on the Pipeline and a method on the AgentSession.

tip

These hooks are side-effect-only. They observe pipeline events without changing the data flow — safe to register many of them for logging, analytics, and external monitoring.

What You Get

CapabilityAPIPurpose
Metrics Hooks@pipeline.metrics.on(...)Capture latency, durations, and token usage across STT, LLM, TTS, EOU, and Realtime (S2S)
Recording Lifecycle Hooks@pipeline.on("recording_started" | "recording_stopped" | "recording_failed")Observe when participant or track recording starts, stops, or fails
Error Hook@pipeline.on("error")Centralize error handling across the entire pipeline
Context Accesssession.get_context_history(...)Read session messages for debugging and post-processing

Metrics Hooks

Metrics hooks fire at the end of each component's work for a turn and deliver a dict payload describing latency, durations, and (for LLM / Realtime) token usage. Use them to ship metrics to your APM, log lines to a SIEM, or compute SLOs in real time.

Register a metrics hook with the @pipeline.metrics.on("<component>") decorator:

main.py
@pipeline.metrics.on("llm")
def on_llm_metrics(metrics: dict):
print(f"[METRICS] LLM TTFT: {metrics.get('llm_ttft')}ms")

stt — Speech-to-Text Metrics

When it fires: When an STT turn completes (final transcript available).

Payload keys: stt_latency

main.py
@pipeline.metrics.on("stt")
def on_stt_metrics(metrics: dict):
"""Fired when STT turn completes."""
print(f"[METRICS] STT Latency: {metrics.get('stt_latency')}ms")

llm — LLM Metrics

When it fires: When the LLM finishes generating its response for a turn.

Payload keys: llm_ttft, llm_duration, prompt_tokens, completion_tokens, total_tokens

main.py
@pipeline.metrics.on("llm")
def on_llm_metrics(metrics: dict):
"""Fired when LLM generation completes."""
print(
f"[METRICS] LLM TTFT: {metrics.get('llm_ttft')}ms | "
f"Total Duration: {metrics.get('llm_duration')}ms"
)
print(
"[METRICS] LLM Tokens (P/C/T): "
f"{metrics.get('prompt_tokens')}/"
f"{metrics.get('completion_tokens')}/"
f"{metrics.get('total_tokens')}"
)

tts — Text-to-Speech Metrics

When it fires: When TTS finishes synthesizing the agent's response for a turn.

Payload keys: ttfb, tts_latency

main.py
@pipeline.metrics.on("tts")
def on_tts_metrics(metrics: dict):
"""Fired when TTS finishes speaking."""
print(
f"[METRICS] TTS TTFB: {metrics.get('ttfb')}ms | "
f"Total Latency: {metrics.get('tts_latency')}ms"
)

eou — End-of-Utterance Metrics

When it fires: When the Turn Detector matches end-of-utterance.

Payload keys: eou_latency

main.py
@pipeline.metrics.on("eou")
def on_eou_metrics(metrics: dict):
"""Fired when TurnDetector matches end-of-utterance."""
print(f"[METRICS] EOU Latency: {metrics.get('eou_latency')}ms")

realtime — Realtime (S2S) Metrics

When it fires: For realtime / speech-to-speech models like OpenAI Realtime, Gemini Live, or AWS Nova Sonic — fires once per turn after the model responds.

Payload keys: realtime_ttfb, realtime_input_tokens, realtime_output_tokens, realtime_total_tokens, realtime_input_text_tokens, realtime_output_text_tokens, realtime_input_audio_tokens, realtime_output_audio_tokens

main.py
@pipeline.metrics.on("realtime")
def on_realtime_metrics(metrics: dict):
"""Fired for realtime (speech-to-speech) models."""
print(
"[METRICS] Realtime "
f"TTFB: {metrics.get('realtime_ttfb')}ms | "
f"Tokens (in/out/total): "
f"{metrics.get('realtime_input_tokens')}/"
f"{metrics.get('realtime_output_tokens')}/"
f"{metrics.get('realtime_total_tokens')} | "
f"TextTokens (in/out): "
f"{metrics.get('realtime_input_text_tokens')}/"
f"{metrics.get('realtime_output_text_tokens')} | "
f"AudioTokens (in/out): "
f"{metrics.get('realtime_input_audio_tokens')}/"
f"{metrics.get('realtime_output_audio_tokens')}"
)
note

Use stt, llm, tts, and eou metrics with Cascade pipelines. Use realtime metrics when the pipeline runs in Realtime (S2S) or Hybrid mode with a realtime model as the LLM.

Metrics Reference

HookModeKey Fields
sttCascadestt_latency
llmCascadellm_ttft, llm_duration, prompt_tokens, completion_tokens, total_tokens
ttsCascadettfb, tts_latency
eouCascadeeou_latency
realtimeRealtime / Hybridrealtime_ttfb, realtime_input_tokens, realtime_output_tokens, realtime_total_tokens, realtime_input_text_tokens, realtime_output_text_tokens, realtime_input_audio_tokens, realtime_output_audio_tokens

Recording Lifecycle Hooks

Recording lifecycle hooks let you observe the start, stop, and failure of Recording without polling APIs. They fire for both participant and track recordings.

note

These hooks only fire when recording is enabled. Turn recording on via Observability Options on session.start(), or by setting recording=True in RoomOptions.

recording_started

Fired when participant or track recording starts successfully.

main.py
@pipeline.on("recording_started")
def on_recording_started(data):
"""Fired when participant or track recording starts successfully."""
print(f"[RECORDING HOOK] Started: {data}")

recording_stopped

Fired when recording stops successfully (typically at session end).

main.py
@pipeline.on("recording_stopped")
def on_recording_stopped(data):
"""Fired when recording stops successfully."""
print(f"[RECORDING HOOK] Stopped: {data}")

recording_failed

Fired when recording fails to start or stop. Use this to surface issues to your monitoring system before the session ends.

main.py
@pipeline.on("recording_failed")
def on_recording_failed(data):
"""Fired when recording fails to start or stop."""
print(f"[RECORDING HOOK] Failed: {data}")

Error Hook

The error hook centralizes error handling across the pipeline. Instead of attaching error listeners on each component, register one hook on the pipeline and receive errors from STT, LLM, TTS, VAD, Turn Detector, and the underlying VideoSDK Room connection.

Payload keys: source, error

main.py
@pipeline.on("error")
def on_pipeline_error(data):
"""
Catch any errors from STT, LLM, TTS, VAD, Turn Detector,
or the VideoSDK Room connection.
"""
source = data.get("source", "unknown")
error = data.get("error", "No error details")
print(f"[ERROR HOOK] Pipeline Error from {source}: {error}")
tip

Pair the error hook with the Fallback Adapter — the hook gives you visibility into which component failed, while the adapter handles automatic provider failover.


Accessing Session Context

session.get_context_history() returns the conversation messages accumulated during the session. Use it inside lifecycle methods like Agent.on_exit() to log final transcripts, send summaries to a backend, or build evaluation datasets.

Signature

session.get_context_history(
include_function_calls: bool = False,
include_system_messages: bool = False,
) -> list[dict]

Parameters

ParameterTypeDefaultDescription
include_function_callsboolFalseInclude function/tool calls and their results in the returned history
include_system_messagesboolFalseInclude system messages (e.g., the agent's instructions) in the returned history

Return Value

A list of message dicts. Each message has:

  • role: One of user, assistant, system, or tool.
  • content: A string, or a list of content parts (e.g., text and images for multi-modal turns).

Example — Print Final Transcript on Session End

main.py
class MyVoiceAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are VideoSDK's Voice Agent."
)

async def on_exit(self) -> None:
history = self.session.get_context_history(
include_function_calls=True,
include_system_messages=False,
)
print("\n=== SESSION END: CONTEXT HISTORY ===")
for msg in history:
role = msg.get("role", "unknown").upper()
content = msg.get("content", "")
if isinstance(content, list):
text_blocks = [
c if isinstance(c, str) else "[Image/Other]"
for c in content
]
content = " ".join(text_blocks)
print(f"{role}: {content}")
print("===================================\n")
await self.session.say("Goodbye!")

Complete Example

For a runnable script that wires up metrics, error, and recording hooks together with session.get_context_history() on exit, see the Observability Hooks example on GitHub.

Pair with the Dashboard

The metrics emitted by these hooks are also visualized on the VideoSDK Dashboard. See Session Analytics and Trace Insights to inspect the same data alongside transcripts and recordings.

Examples - Try Out Yourself

Got a Question? Ask us on discord