Module agents.speech_generation
Classes
class SpeechGeneration (agent: Agent | None = None,
tts: TTS | None = None,
avatar: Any | None = None,
audio_track: CustomAudioStreamTrack | None = None,
hooks: "'PipelineHooks | None'" = None)-
Expand source code
class SpeechGeneration(EventEmitter[Literal["synthesis_started", "first_audio_byte", "last_audio_byte", "synthesis_interrupted"]]): """ Handles TTS synthesis and audio playback. Events: - synthesis_started: TTS synthesis begins - first_audio_byte: First audio byte ready - last_audio_byte: Synthesis complete - synthesis_interrupted: Synthesis was interrupted """ def __init__( self, agent: Agent | None = None, tts: TTS | None = None, avatar: Any | None = None, audio_track: CustomAudioStreamTrack | None = None, hooks: "PipelineHooks | None" = None, ) -> None: super().__init__() self.agent = agent self.tts = tts self.avatar = avatar self.audio_track = audio_track self.hooks = hooks self.tts_lock = asyncio.Lock() self._is_interrupted = False self.full_transcript = "" async def start(self) -> None: """Start the speech generation component""" logger.info("SpeechGeneration started") def set_audio_track(self, audio_track: CustomAudioStreamTrack) -> None: """Set the audio track for TTS output""" self.audio_track = audio_track if self.tts: self.tts.audio_track = audio_track async def synthesize(self, response_gen: AsyncIterator[str] | str) -> None: """ Stream text to TTS and play audio. Args: response_gen: Text generator or string to synthesize """ async with self.tts_lock: # Prepare iterator and wrapper shared logic response_iterator: AsyncIterator[str] if isinstance(response_gen, str): async def string_to_iterator(text: str): yield text response_iterator = string_to_iterator(response_gen) else: response_iterator = response_gen self.full_transcript = "" tts_start_recorded = False async def character_counting_wrapper(text_iterator: AsyncIterator[str]): async for text_chunk in text_iterator: nonlocal tts_start_recorded logger.debug(f"[TTS DEBUG] Got text chunk: {len(text_chunk) if text_chunk else 0} chars") if text_chunk and metrics_collector: # Count characters in this chunk if not tts_start_recorded: metrics_collector.on_tts_start() tts_start_recorded = True logger.debug(f"[TTS DEBUG] Calling add_tts_characters({len(text_chunk)})") metrics_collector.add_tts_characters(len(text_chunk)) if text_chunk: self.full_transcript += text_chunk yield text_chunk # Wrap the iterator response_iterator = character_counting_wrapper(response_iterator) if self.hooks and self.hooks.has_tts_stream_hook(): if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) self.emit("synthesis_started", {}) if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: first_byte_emitted = False async for audio_chunk in self.hooks.process_tts_stream(response_iterator): if not first_byte_emitted: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) first_byte_emitted = True if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) if self.full_transcript and metrics_collector.current_turn: metrics_collector.set_agent_response(self.full_transcript) metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() logger.info("TTS stream synthesis complete") self.emit("last_audio_byte", {}) except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() return if not self.tts: logger.warning("No TTS available for synthesis") return if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track else: logger.warning("Audio track not found in pipeline - last audio callback will be skipped") if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) first_byte_event = asyncio.Event() async def on_first_audio_byte(): """Called when first audio byte is ready""" first_byte_event.set() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) async def on_last_audio_byte(): """Called when synthesis is complete""" if self.full_transcript and metrics_collector.current_turn: pass metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() logger.info("TTS synthesis complete - Agent and User set to IDLE") self.emit("last_audio_byte", {}) self.tts.on_first_audio_byte(on_first_audio_byte) if self.audio_track: if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(on_last_audio_byte) else: logger.warning(f"Audio track '{type(self.audio_track).__name__}' does not have 'on_last_audio_byte' method") else: logger.warning("Audio track not initialized - skipping last audio callback registration") self.tts.reset_first_audio_tracking() self.emit("synthesis_started", {}) metrics_collector.on_tts_start() # Trigger agent_turn_start hook if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: await self.tts.synthesize(response_iterator) # If text was generated but the TTS plugin returned before sending any audio # (e.g. non-blocking streaming plugins), wait for the first audio byte # to prevent mark_synthesis_complete() from firing immediately on an empty buffer. if self.full_transcript and not first_byte_event.is_set(): try: await asyncio.wait_for(first_byte_event.wait(), timeout=10.0) except asyncio.TimeoutError: logger.warning("Timeout waiting for first audio byte before marking synthesis complete") # Signal that TTS has finished sending all audio data. # The audio track will fire on_last_audio_byte only after # this flag is set AND the buffer is fully drained. if self.audio_track and hasattr(self.audio_track, "mark_synthesis_complete"): self.audio_track.mark_synthesis_complete() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() async def interrupt(self) -> None: """Interrupt the current synthesis""" self._is_interrupted = True if self.tts: await self.tts.interrupt() # Reset audio track to clear buffers and synthesis state, # preventing stuck on_last_audio_byte callbacks if self.audio_track and hasattr(self.audio_track, 'interrupt'): self.audio_track.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): await self.avatar.interrupt() self.emit("synthesis_interrupted", {}) async def pause(self) -> None: """Pause the current synthesis (if supported)""" if self.tts and hasattr(self.tts, 'pause') and self.tts.can_pause: await self.tts.pause() self.emit("synthesis_paused", {}) async def resume(self) -> None: """Resume paused synthesis (if supported)""" if self.tts and hasattr(self.tts, 'resume') and self.tts.can_pause: await self.tts.resume() self.emit("synthesis_resumed", {}) def can_pause(self) -> bool: """Check if TTS supports pause/resume""" return self.tts and hasattr(self.tts, 'can_pause') and self.tts.can_pause def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = False async def cleanup(self) -> None: """Cleanup speech generation resources""" logger.info("Cleaning up speech generation") self.tts = None self.agent = None self.avatar = None self.audio_track = None logger.info("Speech generation cleaned up") @property def is_speaking(self) -> bool: """Returns True if the agent is currently playing audio""" if self.audio_track and hasattr(self.audio_track, "is_speaking"): return self.audio_track.is_speaking return FalseHandles TTS synthesis and audio playback.
Events: - synthesis_started: TTS synthesis begins - first_audio_byte: First audio byte ready - last_audio_byte: Synthesis complete - synthesis_interrupted: Synthesis was interrupted
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop is_speaking : bool-
Expand source code
@property def is_speaking(self) -> bool: """Returns True if the agent is currently playing audio""" if self.audio_track and hasattr(self.audio_track, "is_speaking"): return self.audio_track.is_speaking return FalseReturns True if the agent is currently playing audio
Methods
def can_pause(self) ‑> bool-
Expand source code
def can_pause(self) -> bool: """Check if TTS supports pause/resume""" return self.tts and hasattr(self.tts, 'can_pause') and self.tts.can_pauseCheck if TTS supports pause/resume
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup speech generation resources""" logger.info("Cleaning up speech generation") self.tts = None self.agent = None self.avatar = None self.audio_track = None logger.info("Speech generation cleaned up")Cleanup speech generation resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt the current synthesis""" self._is_interrupted = True if self.tts: await self.tts.interrupt() # Reset audio track to clear buffers and synthesis state, # preventing stuck on_last_audio_byte callbacks if self.audio_track and hasattr(self.audio_track, 'interrupt'): self.audio_track.interrupt() if self.avatar and hasattr(self.avatar, 'interrupt'): await self.avatar.interrupt() self.emit("synthesis_interrupted", {})Interrupt the current synthesis
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: """Pause the current synthesis (if supported)""" if self.tts and hasattr(self.tts, 'pause') and self.tts.can_pause: await self.tts.pause() self.emit("synthesis_paused", {})Pause the current synthesis (if supported)
def reset_interrupt(self) ‑> None-
Expand source code
def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = FalseReset interrupt flag
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: """Resume paused synthesis (if supported)""" if self.tts and hasattr(self.tts, 'resume') and self.tts.can_pause: await self.tts.resume() self.emit("synthesis_resumed", {})Resume paused synthesis (if supported)
def set_audio_track(self, audio_track: CustomAudioStreamTrack) ‑> None-
Expand source code
def set_audio_track(self, audio_track: CustomAudioStreamTrack) -> None: """Set the audio track for TTS output""" self.audio_track = audio_track if self.tts: self.tts.audio_track = audio_trackSet the audio track for TTS output
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start the speech generation component""" logger.info("SpeechGeneration started")Start the speech generation component
async def synthesize(self, response_gen: AsyncIterator[str] | str) ‑> None-
Expand source code
async def synthesize(self, response_gen: AsyncIterator[str] | str) -> None: """ Stream text to TTS and play audio. Args: response_gen: Text generator or string to synthesize """ async with self.tts_lock: # Prepare iterator and wrapper shared logic response_iterator: AsyncIterator[str] if isinstance(response_gen, str): async def string_to_iterator(text: str): yield text response_iterator = string_to_iterator(response_gen) else: response_iterator = response_gen self.full_transcript = "" tts_start_recorded = False async def character_counting_wrapper(text_iterator: AsyncIterator[str]): async for text_chunk in text_iterator: nonlocal tts_start_recorded logger.debug(f"[TTS DEBUG] Got text chunk: {len(text_chunk) if text_chunk else 0} chars") if text_chunk and metrics_collector: # Count characters in this chunk if not tts_start_recorded: metrics_collector.on_tts_start() tts_start_recorded = True logger.debug(f"[TTS DEBUG] Calling add_tts_characters({len(text_chunk)})") metrics_collector.add_tts_characters(len(text_chunk)) if text_chunk: self.full_transcript += text_chunk yield text_chunk # Wrap the iterator response_iterator = character_counting_wrapper(response_iterator) if self.hooks and self.hooks.has_tts_stream_hook(): if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) self.emit("synthesis_started", {}) if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: first_byte_emitted = False async for audio_chunk in self.hooks.process_tts_stream(response_iterator): if not first_byte_emitted: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) first_byte_emitted = True if self.audio_track: await self.audio_track.add_new_bytes(audio_chunk) if self.full_transcript and metrics_collector.current_turn: metrics_collector.set_agent_response(self.full_transcript) metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() logger.info("TTS stream synthesis complete") self.emit("last_audio_byte", {}) except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer() return if not self.tts: logger.warning("No TTS available for synthesis") return if self.agent and self.agent.session: self.agent.session._pause_wake_up_timer() if not self.audio_track: if self.agent and self.agent.session and hasattr(self.agent.session, "pipeline"): if hasattr(self.agent.session.pipeline, "audio_track"): self.audio_track = self.agent.session.pipeline.audio_track else: logger.warning("Audio track not found in pipeline - last audio callback will be skipped") if self.audio_track and hasattr(self.audio_track, "enable_audio_input"): self.audio_track.enable_audio_input(manual_control=True) first_byte_event = asyncio.Event() async def on_first_audio_byte(): """Called when first audio byte is ready""" first_byte_event.set() if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() metrics_collector.on_tts_first_byte() metrics_collector.on_agent_speech_start() self.emit("first_audio_byte", {}) if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.SPEAKING) self.agent.session._emit_user_state(UserState.LISTENING) async def on_last_audio_byte(): """Called when synthesis is complete""" if self.full_transcript and metrics_collector.current_turn: pass metrics_collector.on_agent_speech_end() metrics_collector.complete_turn() if self.agent and self.agent.session: self.agent.session._emit_agent_state(AgentState.IDLE) self.agent.session._emit_user_state(UserState.IDLE) if self.hooks and self.hooks.has_agent_turn_end_hooks(): await self.hooks.trigger_agent_turn_end() logger.info("TTS synthesis complete - Agent and User set to IDLE") self.emit("last_audio_byte", {}) self.tts.on_first_audio_byte(on_first_audio_byte) if self.audio_track: if hasattr(self.audio_track, "on_last_audio_byte"): self.audio_track.on_last_audio_byte(on_last_audio_byte) else: logger.warning(f"Audio track '{type(self.audio_track).__name__}' does not have 'on_last_audio_byte' method") else: logger.warning("Audio track not initialized - skipping last audio callback registration") self.tts.reset_first_audio_tracking() self.emit("synthesis_started", {}) metrics_collector.on_tts_start() # Trigger agent_turn_start hook if self.hooks and self.hooks.has_agent_turn_start_hooks(): await self.hooks.trigger_agent_turn_start() try: await self.tts.synthesize(response_iterator) # If text was generated but the TTS plugin returned before sending any audio # (e.g. non-blocking streaming plugins), wait for the first audio byte # to prevent mark_synthesis_complete() from firing immediately on an empty buffer. if self.full_transcript and not first_byte_event.is_set(): try: await asyncio.wait_for(first_byte_event.wait(), timeout=10.0) except asyncio.TimeoutError: logger.warning("Timeout waiting for first audio byte before marking synthesis complete") # Signal that TTS has finished sending all audio data. # The audio track will fire on_last_audio_byte only after # this flag is set AND the buffer is fully drained. if self.audio_track and hasattr(self.audio_track, "mark_synthesis_complete"): self.audio_track.mark_synthesis_complete() if self.avatar and hasattr(self.avatar, 'send_segment_end'): await self.avatar.send_segment_end() except asyncio.CancelledError: logger.info("Synthesis cancelled") self.emit("synthesis_interrupted", {}) raise except Exception as e: logger.error(f"Error during synthesis: {e}") self.emit("synthesis_error", {"error": str(e)}) raise finally: if self.agent and self.agent.session and self.agent.session.is_background_audio_enabled: await self.agent.session.stop_thinking_audio() if self.agent and self.agent.session: self.agent.session._reply_in_progress = False self.agent.session._reset_wake_up_timer()Stream text to TTS and play audio.
Args
response_gen- Text generator or string to synthesize
Inherited members