Module agents.speech_generation
Classes
class SpeechGeneration (agent: Agent | None = None,
tts: TTS | None = None,
avatar: Any | None = None,
audio_track: CustomAudioStreamTrack | None = None,
hooks: "'PipelineHooks | None'" = None)-
Expand source code
class SpeechGeneration(EventEmitter[Literal["synthesis_started", "first_audio_byte", "last_audio_byte", "synthesis_interrupted"]]): """ Handles TTS synthesis and audio playback. Events: - synthesis_started: TTS synthesis begins - first_audio_byte: First audio byte ready - last_audio_byte: Synthesis complete - synthesis_interrupted: Synthesis was interrupted """ def __init__( self, agent: Agent | None = None, tts: TTS | None = None, avatar: Any | None = None, audio_track: CustomAudioStreamTrack | None = None, hooks: "PipelineHooks | None" = None, ) -> None: super().__init__() self.agent = agent self.tts = tts self.avatar = avatar self.audio_track = audio_track self.hooks = hooks self.tts_lock = asyncio.Lock() self._is_interrupted = False self.full_transcript = "" self.spoken_transcript: str = "" self._tier2_spoken_transcript: str = "" if self.tts and getattr(self.tts, "supports_word_timestamps", False): try: self.tts.on("word_spoken", self._on_tts_word_spoken) except Exception as e: logger.debug(f"Failed to subscribe to TTS word_spoken: {e}") self.on("last_audio_byte", self._on_final_agent_transcript) def _on_tts_word_spoken(self, data: Any) -> None: """Handler for TTS ``word_spoken`` events — emits an interim transcript.""" if not isinstance(data, dict): return cumulative = data.get("cumulative_text", "") if cumulative: self._tier2_spoken_transcript = cumulative if metrics_collector: metrics_collector.emit_agent_transcript_transport( cumulative, type="interim" ) def _on_final_agent_transcript(self, data: Any) -> None: """Emit the final agent transcript after playback completes. """ if self.full_transcript and metrics_collector: metrics_collector.emit_agent_transcript_transport( self.full_transcript, type="final" ) async def start(self) -> None: """Start the speech generation component""" logger.info("SpeechGeneration started") def set_audio_track(self, audio_track: CustomAudioStreamTrack) -> None: """Set the audio track for TTS output""" self.audio_track = audio_track if self.tts: self.tts.audio_track = audio_track async def synthesize(self, response_gen: AsyncIterator[str] | str) -> None: """ Stream text to TTS and play audio. Args: response_gen: Text generator or string to synthesize """ async with self.tts_lock: # Prepare iterator and wrapper shared logic response_iterator: AsyncIterator[str] if isinstance(response_gen, str): async def string_to_iterator(text: str): yield text response_iterator = string_to_iterator(response_gen) else: response_iterator = response_gen self.full_transcript = "" tts_start_recorded = False self.spoken_transcript = "" self._tier2_spoken_transcript = "" async def character_counting_wrapper(text_iterator: AsyncIterator[str]): async for text_chunk in text_iterator: nonlocal tts_start_recorded if isinstance(text_chunk, FlushMarker): yield text_chunk continue logger.debug(f"[TTS DEBUG] Got text chunk: {len(text_chunk) if text_chunk else 0} chars") if text_chunk and metrics_collector: if not tts_start_recorded: metrics_collector.on_tts_start() tts_start_recorded = True logger.debug(f"[TTS DEBUG] Calling add_tts_characters({len(text_chunk)})") metrics_collector.add_tts_characters(len(text_chunk)) if text_chunk: self.full_transcript += text_chunk yield text_chunk yield FlushMarker() # Wrap the iterator response_iterator = character_counting_wrapper(response_iterator) if self.hooks and self.hooks.has_tts_stream_hook(): if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track if self.audio_track and hasattr(self.audio_track, "mark_synthesis_start"): self.audio_track.mark_synthesis_start() if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) self.emit("synthesis_started", {}) if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: first_byte_emitted = False async for audio_chunk in self.hooks.process_tts_stream(response_iterator): if not first_byte_emitted: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) first_byte_emitted = True if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) if self.full_transcript and metrics_collector.current_turn: emit = not getattr(self.tts, "supports_word_timestamps", False) metrics_collector.set_agent_response(self.full_transcript, emit_transport=emit) metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() logger.info("TTS stream synthesis complete") self.emit("last_audio_byte", {}) except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() return if not self.tts: logger.warning("No TTS available for synthesis") return if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track else: logger.warning("Audio track not found in pipeline - last audio callback will be skipped") if self.audio_track and hasattr(self.audio_track, "mark_synthesis_start"): self.audio_track.mark_synthesis_start() if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) first_byte_event = asyncio.Event() async def on_first_audio_byte(): """Called when first audio byte is ready""" first_byte_event.set() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) async def on_last_audio_byte(): """Called when synthesis is complete""" metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() logger.info("TTS synthesis complete - Agent and User set to IDLE") self.emit("last_audio_byte", {}) self.tts.on_first_audio_byte(on_first_audio_byte) if self.audio_track: if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(on_last_audio_byte) else: logger.warning(f"Audio track '{type(self.audio_track).__name__}' does not have 'on_last_audio_byte' method") else: logger.warning("Audio track not initialized - skipping last audio callback registration") self.tts.reset_first_audio_tracking() self.emit("synthesis_started", {}) metrics_collector.on_tts_start() # Trigger agent_turn_start hook if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: await self.tts.synthesize(response_iterator) # If text was generated but the TTS plugin returned before sending any audio # (e.g. non-blocking streaming plugins), wait for the first audio byte # to prevent mark_synthesis_complete() from firing immediately on an empty buffer. if self.full_transcript and not first_byte_event.is_set(): try: await asyncio.wait_for(first_byte_event.wait(), timeout=10.0) except asyncio.TimeoutError: logger.warning("Timeout waiting for first audio byte before marking synthesis complete") # Signal that TTS has finished sending all audio data. # The audio track will fire on_last_audio_byte only after # this flag is set AND the buffer is fully drained. if self.audio_track and hasattr(self.audio_track, "mark_synthesis_complete"): self.audio_track.mark_synthesis_complete() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session and self._is_interrupted: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() elif self.agent and self.agent.session: self.agent.session._reply_in_progress = False def compute_spoken_transcript(self) -> str: """Best-effort estimate of the portion of full_transcript that was actually played out to the listener at the moment of call. """ if not self.full_transcript: return "" if self._tier2_spoken_transcript: return self._tier2_spoken_transcript.strip() if not self.audio_track or not hasattr(self.audio_track, "snapshot_playback"): return "" played, pushed = self.audio_track.snapshot_playback() if pushed <= 0: return "" fraction = max(0.0, min(1.0, played / pushed)) char_cutoff = int(len(self.full_transcript) * fraction) if char_cutoff <= 0: return "" truncated = self.full_transcript[:char_cutoff] last_break = max(truncated.rfind(" "), truncated.rfind("\n")) if last_break <= 0: return "" return truncated[:last_break].strip() async def synthesize_audio_bytes( self, audio_data: Union[bytes, Iterable[bytes], AsyncIterator[bytes]], text: str, handle: "UtteranceHandle | None" = None, ) -> None: """Push pre-synthesized PCM bytes to the audio track, bypassing TTS. Emits the same ``synthesis_started`` / ``first_audio_byte`` / ``last_audio_byte`` / ``synthesis_interrupted`` events as :meth:`synthesize`, so downstream listeners (orchestrator, metrics, hooks) keep working without changes. Args: audio_data: Raw int16 PCM at the audio track's sample rate. Accepts a single ``bytes`` blob, an iterable of ``bytes`` chunks, or an async iterator of ``bytes`` chunks. text: The text being spoken — used for transcripts/metrics even though no synthesis happens. handle: Optional utterance handle; checked between chunks so mid-playback interruption stops the stream promptly. """ self._is_interrupted = False self.full_transcript = text self.spoken_transcript = "" self._tier2_spoken_transcript = "" if not self.audio_track: if ( self.agent and self.agent.session and hasattr(self.agent.session, "pipeline") and hasattr(self.agent.session.pipeline, "audio_track") ): self.audio_track = self.agent.session.pipeline.audio_track if not self.audio_track: logger.warning( "No audio track available for cached audio playback" ) self.emit("last_audio_byte", {}) return if hasattr(self.audio_track, "mark_synthesis_start"): self.audio_track.mark_synthesis_start() if hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) async def _on_last_audio_byte_cb() -> None: metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() logger.info( "Cached audio playback complete - Agent and User set to IDLE" ) self.emit("last_audio_byte", {}) if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(_on_last_audio_byte_cb) self.emit("synthesis_started", {}) metrics_collector.on_tts_start() if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() first_chunk_sent = False try: async for chunk in _iter_audio_bytes(audio_data): if self._is_interrupted or (handle is not None and handle.interrupted): self.spoken_transcript = self.compute_spoken_transcript() if hasattr(self.audio_track, "interrupt"): self.audio_track.interrupt() self.emit("synthesis_interrupted", {}) return if not chunk: continue await self.audio_track.add_new_bytes(chunk) if not first_chunk_sent: first_chunk_sent = True metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) if hasattr(self.audio_track, "mark_synthesis_complete"): self.audio_track.mark_synthesis_complete() if not first_chunk_sent: self.emit("last_audio_byte", {}) if self.avatar and hasattr(self.avatar, "send_segment_end"): await self.avatar.send_segment_end() except asyncio.CancelledError: logger.info("Cached audio playback cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during cached audio playback: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if ( self.agent and self.agent.session and self.agent.session.is_background_audio_enabled ): await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session and self._is_interrupted: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() elif self.agent and self.agent.session: self.agent.session._reply_in_progress = False async def interrupt(self) -> None: """Interrupt the current synthesis""" self._is_interrupted = True self.spoken_transcript = self.compute_spoken_transcript() if self.tts: await self.tts.interrupt() # Reset audio track to clear buffers and synthesis state, # preventing stuck on_last_audio_byte callbacks if self.audio_track and hasattr(self.audio_track, 'interrupt'): self.audio_track.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): await self.avatar.interrupt() self.emit("synthesis_interrupted", {}) async def pause(self) -> None: """Pause the current synthesis (if supported)""" if self.tts and hasattr(self.tts, 'pause') and self.tts.can_pause: await self.tts.pause() self.emit("synthesis_paused", {}) async def resume(self) -> None: """Resume paused synthesis (if supported)""" if self.tts and hasattr(self.tts, 'resume') and self.tts.can_pause: await self.tts.resume() self.emit("synthesis_resumed", {}) def can_pause(self) -> bool: """Check if TTS supports pause/resume""" return self.tts and hasattr(self.tts, 'can_pause') and self.tts.can_pause def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = False async def cleanup(self) -> None: """Cleanup speech generation resources""" logger.info("Cleaning up speech generation") self.tts = None self.agent = None self.avatar = None self.audio_track = None logger.info("Speech generation cleaned up") @property def is_speaking(self) -> bool: """Returns True if the agent is currently playing audio""" if self.audio_track and hasattr(self.audio_track, "is_speaking"): return self.audio_track.is_speaking return FalseHandles TTS synthesis and audio playback.
Events: - synthesis_started: TTS synthesis begins - first_audio_byte: First audio byte ready - last_audio_byte: Synthesis complete - synthesis_interrupted: Synthesis was interrupted
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop is_speaking : bool-
Expand source code
@property def is_speaking(self) -> bool: """Returns True if the agent is currently playing audio""" if self.audio_track and hasattr(self.audio_track, "is_speaking"): return self.audio_track.is_speaking return FalseReturns True if the agent is currently playing audio
Methods
def can_pause(self) ‑> bool-
Expand source code
def can_pause(self) -> bool: """Check if TTS supports pause/resume""" return self.tts and hasattr(self.tts, 'can_pause') and self.tts.can_pauseCheck if TTS supports pause/resume
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup speech generation resources""" logger.info("Cleaning up speech generation") self.tts = None self.agent = None self.avatar = None self.audio_track = None logger.info("Speech generation cleaned up")Cleanup speech generation resources
def compute_spoken_transcript(self) ‑> str-
Expand source code
def compute_spoken_transcript(self) -> str: """Best-effort estimate of the portion of full_transcript that was actually played out to the listener at the moment of call. """ if not self.full_transcript: return "" if self._tier2_spoken_transcript: return self._tier2_spoken_transcript.strip() if not self.audio_track or not hasattr(self.audio_track, "snapshot_playback"): return "" played, pushed = self.audio_track.snapshot_playback() if pushed <= 0: return "" fraction = max(0.0, min(1.0, played / pushed)) char_cutoff = int(len(self.full_transcript) * fraction) if char_cutoff <= 0: return "" truncated = self.full_transcript[:char_cutoff] last_break = max(truncated.rfind(" "), truncated.rfind("\n")) if last_break <= 0: return "" return truncated[:last_break].strip()Best-effort estimate of the portion of full_transcript that was actually played out to the listener at the moment of call.
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt the current synthesis""" self._is_interrupted = True self.spoken_transcript = self.compute_spoken_transcript() if self.tts: await self.tts.interrupt() # Reset audio track to clear buffers and synthesis state, # preventing stuck on_last_audio_byte callbacks if self.audio_track and hasattr(self.audio_track, 'interrupt'): self.audio_track.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): await self.avatar.interrupt() self.emit("synthesis_interrupted", {})Interrupt the current synthesis
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: """Pause the current synthesis (if supported)""" if self.tts and hasattr(self.tts, 'pause') and self.tts.can_pause: await self.tts.pause() self.emit("synthesis_paused", {})Pause the current synthesis (if supported)
def reset_interrupt(self) ‑> None-
Expand source code
def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = FalseReset interrupt flag
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: """Resume paused synthesis (if supported)""" if self.tts and hasattr(self.tts, 'resume') and self.tts.can_pause: await self.tts.resume() self.emit("synthesis_resumed", {})Resume paused synthesis (if supported)
def set_audio_track(self, audio_track: CustomAudioStreamTrack) ‑> None-
Expand source code
def set_audio_track(self, audio_track: CustomAudioStreamTrack) -> None: """Set the audio track for TTS output""" self.audio_track = audio_track if self.tts: self.tts.audio_track = audio_trackSet the audio track for TTS output
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start the speech generation component""" logger.info("SpeechGeneration started")Start the speech generation component
async def synthesize(self, response_gen: AsyncIterator[str] | str) ‑> None-
Expand source code
async def synthesize(self, response_gen: AsyncIterator[str] | str) -> None: """ Stream text to TTS and play audio. Args: response_gen: Text generator or string to synthesize """ async with self.tts_lock: # Prepare iterator and wrapper shared logic response_iterator: AsyncIterator[str] if isinstance(response_gen, str): async def string_to_iterator(text: str): yield text response_iterator = string_to_iterator(response_gen) else: response_iterator = response_gen self.full_transcript = "" tts_start_recorded = False self.spoken_transcript = "" self._tier2_spoken_transcript = "" async def character_counting_wrapper(text_iterator: AsyncIterator[str]): async for text_chunk in text_iterator: nonlocal tts_start_recorded if isinstance(text_chunk, FlushMarker): yield text_chunk continue logger.debug(f"[TTS DEBUG] Got text chunk: {len(text_chunk) if text_chunk else 0} chars") if text_chunk and metrics_collector: if not tts_start_recorded: metrics_collector.on_tts_start() tts_start_recorded = True logger.debug(f"[TTS DEBUG] Calling add_tts_characters({len(text_chunk)})") metrics_collector.add_tts_characters(len(text_chunk)) if text_chunk: self.full_transcript += text_chunk yield text_chunk yield FlushMarker() # Wrap the iterator response_iterator = character_counting_wrapper(response_iterator) if self.hooks and self.hooks.has_tts_stream_hook(): if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track if self.audio_track and hasattr(self.audio_track, "mark_synthesis_start"): self.audio_track.mark_synthesis_start() if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) self.emit("synthesis_started", {}) if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: first_byte_emitted = False async for audio_chunk in self.hooks.process_tts_stream(response_iterator): if not first_byte_emitted: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) first_byte_emitted = True if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) if self.full_transcript and metrics_collector.current_turn: emit = not getattr(self.tts, "supports_word_timestamps", False) metrics_collector.set_agent_response(self.full_transcript, emit_transport=emit) metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() logger.info("TTS stream synthesis complete") self.emit("last_audio_byte", {}) except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() return if not self.tts: logger.warning("No TTS available for synthesis") return if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track else: logger.warning("Audio track not found in pipeline - last audio callback will be skipped") if self.audio_track and hasattr(self.audio_track, "mark_synthesis_start"): self.audio_track.mark_synthesis_start() if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) first_byte_event = asyncio.Event() async def on_first_audio_byte(): """Called when first audio byte is ready""" first_byte_event.set() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) async def on_last_audio_byte(): """Called when synthesis is complete""" metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() logger.info("TTS synthesis complete - Agent and User set to IDLE") self.emit("last_audio_byte", {}) self.tts.on_first_audio_byte(on_first_audio_byte) if self.audio_track: if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(on_last_audio_byte) else: logger.warning(f"Audio track '{type(self.audio_track).__name__}' does not have 'on_last_audio_byte' method") else: logger.warning("Audio track not initialized - skipping last audio callback registration") self.tts.reset_first_audio_tracking() self.emit("synthesis_started", {}) metrics_collector.on_tts_start() # Trigger agent_turn_start hook if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: await self.tts.synthesize(response_iterator) # If text was generated but the TTS plugin returned before sending any audio # (e.g. non-blocking streaming plugins), wait for the first audio byte # to prevent mark_synthesis_complete() from firing immediately on an empty buffer. if self.full_transcript and not first_byte_event.is_set(): try: await asyncio.wait_for(first_byte_event.wait(), timeout=10.0) except asyncio.TimeoutError: logger.warning("Timeout waiting for first audio byte before marking synthesis complete") # Signal that TTS has finished sending all audio data. # The audio track will fire on_last_audio_byte only after # this flag is set AND the buffer is fully drained. if self.audio_track and hasattr(self.audio_track, "mark_synthesis_complete"): self.audio_track.mark_synthesis_complete() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session and self._is_interrupted: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() elif self.agent and self.agent.session: self.agent.session._reply_in_progress = FalseStream text to TTS and play audio.
Args
response_gen- Text generator or string to synthesize
async def synthesize_audio_bytes(self,
audio_data: Union[bytes, Iterable[bytes], AsyncIterator[bytes]],
text: str,
handle: "'UtteranceHandle | None'" = None) ‑> None-
Expand source code
async def synthesize_audio_bytes( self, audio_data: Union[bytes, Iterable[bytes], AsyncIterator[bytes]], text: str, handle: "UtteranceHandle | None" = None, ) -> None: """Push pre-synthesized PCM bytes to the audio track, bypassing TTS. Emits the same ``synthesis_started`` / ``first_audio_byte`` / ``last_audio_byte`` / ``synthesis_interrupted`` events as :meth:`synthesize`, so downstream listeners (orchestrator, metrics, hooks) keep working without changes. Args: audio_data: Raw int16 PCM at the audio track's sample rate. Accepts a single ``bytes`` blob, an iterable of ``bytes`` chunks, or an async iterator of ``bytes`` chunks. text: The text being spoken — used for transcripts/metrics even though no synthesis happens. handle: Optional utterance handle; checked between chunks so mid-playback interruption stops the stream promptly. """ self._is_interrupted = False self.full_transcript = text self.spoken_transcript = "" self._tier2_spoken_transcript = "" if not self.audio_track: if ( self.agent and self.agent.session and hasattr(self.agent.session, "pipeline") and hasattr(self.agent.session.pipeline, "audio_track") ): self.audio_track = self.agent.session.pipeline.audio_track if not self.audio_track: logger.warning( "No audio track available for cached audio playback" ) self.emit("last_audio_byte", {}) return if hasattr(self.audio_track, "mark_synthesis_start"): self.audio_track.mark_synthesis_start() if hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) async def _on_last_audio_byte_cb() -> None: metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() logger.info( "Cached audio playback complete - Agent and User set to IDLE" ) self.emit("last_audio_byte", {}) if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(_on_last_audio_byte_cb) self.emit("synthesis_started", {}) metrics_collector.on_tts_start() if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() first_chunk_sent = False try: async for chunk in _iter_audio_bytes(audio_data): if self._is_interrupted or (handle is not None and handle.interrupted): self.spoken_transcript = self.compute_spoken_transcript() if hasattr(self.audio_track, "interrupt"): self.audio_track.interrupt() self.emit("synthesis_interrupted", {}) return if not chunk: continue await self.audio_track.add_new_bytes(chunk) if not first_chunk_sent: first_chunk_sent = True metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) if hasattr(self.audio_track, "mark_synthesis_complete"): self.audio_track.mark_synthesis_complete() if not first_chunk_sent: self.emit("last_audio_byte", {}) if self.avatar and hasattr(self.avatar, "send_segment_end"): await self.avatar.send_segment_end() except asyncio.CancelledError: logger.info("Cached audio playback cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during cached audio playback: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if ( self.agent and self.agent.session and self.agent.session.is_background_audio_enabled ): await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session and self._is_interrupted: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() elif self.agent and self.agent.session: self.agent.session._reply_in_progress = FalsePush pre-synthesized PCM bytes to the audio track, bypassing TTS.
Emits the same
synthesis_started/first_audio_byte/last_audio_byte/synthesis_interruptedevents as :meth:synthesize, so downstream listeners (orchestrator, metrics, hooks) keep working without changes.Args
audio_data- Raw int16 PCM at the audio track's sample rate.
Accepts a single
bytesblob, an iterable ofbyteschunks, or an async iterator ofbyteschunks. text- The text being spoken — used for transcripts/metrics even though no synthesis happens.
handle- Optional utterance handle; checked between chunks so mid-playback interruption stops the stream promptly.
Inherited members