Module agents.tts
Sub-modules
agents.tts.fallback_ttsagents.tts.tts
Classes
class FallbackTTS (providers: List[TTS],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3,
latency_threshold_ms: float | None = None,
consecutive_latency_hits: int = 3)-
Expand source code
class FallbackTTS(TTS, FallbackBase): """TTS wrapper that automatically fails over to backup providers on errors, latency degradation, and attempts recovery of higher-priority ones.""" def __init__( self, providers: List[TTS], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3, latency_threshold_ms: Optional[float] = None, consecutive_latency_hits: int = 3, ): TTS.__init__( self, sample_rate=providers[0].sample_rate, num_channels=providers[0].num_channels ) FallbackBase.__init__( self, providers, "TTS", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts, latency_threshold_ms=latency_threshold_ms, consecutive_latency_hits=consecutive_latency_hits, ) self._initializing = False self._setup_event_listeners() self._setup_latency_listener() def _setup_event_listeners(self): self.active_provider.on("error", self._on_provider_error) def on_first_audio_byte(self, callback) -> None: """Capture the callback so we can re-apply it after switching providers. Without this override, the callback would only be stored on the FallbackTTS wrapper and the underlying active provider would never invoke it — which breaks the metrics_collector.on_tts_first_byte() hook and any TTFB tracking (including this class's own latency-based fallback path). """ super().on_first_audio_byte(callback) try: self.active_provider.on_first_audio_byte(callback) except Exception as e: logger.warning(f"[TTS] Failed to set first_audio_byte callback on {self.active_provider.label}: {e}") def reset_first_audio_tracking(self) -> None: """Forward the per-turn reset to the active provider. speech_generation calls this before every synthesize() to clear the provider's "first chunk sent" flag. The base TTS implementation is a no-op, so without this override the callback would fire only on the very first turn (when the flag is still False from init) and never again on the same provider. """ try: self.active_provider.reset_first_audio_tracking() except Exception as e: logger.warning(f"[TTS] Failed to reset first_audio_tracking on {self.active_provider.label}: {e}") def _setup_latency_listener(self): if self.latency_threshold_ms is None: return global_event_emitter.on("COMPONENT_METRIC", self._on_component_metric) def _on_component_metric(self, event: dict): if event.get("component") != "tts": return metrics = event.get("metrics") or {} ttfb = metrics.get("ttfb") if ttfb is None: return asyncio.create_task(self._record_latency(float(ttfb))) def _on_provider_error(self, error_msg): failed_p = self.active_provider asyncio.create_task(self._handle_async_error(str(error_msg), failed_p)) async def _handle_async_error(self, error_msg: str, failed_provider: Any): switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider) self.emit("error", error_msg) async def _switch_provider(self, reason: str, failed_provider: Any = None): provider_to_cleanup = failed_provider if failed_provider else self.active_provider try: provider_to_cleanup.off("error", self._on_provider_error) except: pass active_before = self.active_provider switched = await super()._switch_provider(reason, failed_provider) active_after = self.active_provider if switched: if active_before != active_after: self.active_provider.on("error", self._on_provider_error) if self._first_audio_callback is not None: try: self.active_provider.on_first_audio_byte(self._first_audio_callback) except Exception as e: logger.warning(f"[TTS] Failed to re-apply first_audio_byte callback after switch: {e}") if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track: self._propagate_settings(self.active_provider) return True return False def _propagate_settings(self, provider): """Helper to set loop/audio_track on a provider.""" try: name = "loop" value = self.loop if hasattr(provider, f"_set_{name}"): getattr(provider, f"_set_{name}")(value) else: setattr(provider, name, value) name = "audio_track" value = self.audio_track if hasattr(provider, f"_set_{name}"): getattr(provider, f"_set_{name}")(value) else: setattr(provider, name, value) except Exception as e: logger.warning(f"[TTS] Failed to propagate settings to {provider.label}: {e}") def __setattr__(self, name: str, value: Any) -> None: """ Intercept attribute assignments to propagate loop and audio_track to all providers. This allows FallbackTTS to work without CascadingPipeline needing to know about it. """ object.__setattr__(self, name, value) if name in ("loop", "audio_track") and hasattr(self, "providers") and not getattr(self, "_initializing", False): logger.info(f"[TTS] FallbackTTS: {name} was set to {value}, propagating to all providers") for provider in self.providers: try: if hasattr(provider, f"_set_{name}"): getattr(provider, f"_set_{name}")(value) else: setattr(provider, name, value) logger.info(f"[TTS] Set {name} on provider {provider.label}") except Exception as e: logger.warning(f"[TTS] Failed to set {name} on provider {provider.label}: {e}") def _set_loop_and_audio_track(self, loop, audio_track): """ Optional method for explicit setup (for compatibility). Setting these attributes will trigger __setattr__ which propagates to all providers. """ logger.info(f"[TTS] _set_loop_and_audio_track called on FallbackTTS. loop={loop}, audio_track={audio_track}") self.loop = loop self.audio_track = audio_track async def synthesize(self, text, **kwargs) -> None: """ Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting. """ if self.check_recovery(): if self._first_audio_callback is not None: try: self.active_provider.on_first_audio_byte(self._first_audio_callback) except Exception as e: logger.warning(f"[TTS] Failed to re-apply first_audio_byte callback after recovery: {e}") if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track: self._propagate_settings(self.active_provider) while True: current_provider = self.active_provider try: logger.info(f"[TTS] Attempting synthesis with {current_provider.label}") await current_provider.synthesize(text, **kwargs) logger.info(f"[TTS] Synthesis successful with {current_provider.label}") return except Exception as e: logger.error(f"[TTS] Synthesis failed with {current_provider.label}: {e}") self.emit("error", str(e)) switched = await self._switch_provider(str(e), failed_provider=current_provider) if not switched: logger.error(f"[TTS] All providers exhausted. Raising error.") raise e logger.info(f"[TTS] Retrying with new provider: {self.active_provider.label}") async def interrupt(self): if self.active_provider: await self.active_provider.interrupt() async def aclose(self): if self.latency_threshold_ms is not None: try: global_event_emitter.off("COMPONENT_METRIC", self._on_component_metric) except Exception: pass for p in self.providers: await p.aclose() await super().aclose()TTS wrapper that automatically fails over to backup providers on errors, latency degradation, and attempts recovery of higher-priority ones.
Ancestors
- TTS
- EventEmitter
- typing.Generic
- FallbackBase
Methods
def on_first_audio_byte(self, callback) ‑> None-
Expand source code
def on_first_audio_byte(self, callback) -> None: """Capture the callback so we can re-apply it after switching providers. Without this override, the callback would only be stored on the FallbackTTS wrapper and the underlying active provider would never invoke it — which breaks the metrics_collector.on_tts_first_byte() hook and any TTFB tracking (including this class's own latency-based fallback path). """ super().on_first_audio_byte(callback) try: self.active_provider.on_first_audio_byte(callback) except Exception as e: logger.warning(f"[TTS] Failed to set first_audio_byte callback on {self.active_provider.label}: {e}")Capture the callback so we can re-apply it after switching providers.
Without this override, the callback would only be stored on the FallbackTTS wrapper and the underlying active provider would never invoke it — which breaks the metrics_collector.on_tts_first_byte() hook and any TTFB tracking (including this class's own latency-based fallback path).
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Forward the per-turn reset to the active provider. speech_generation calls this before every synthesize() to clear the provider's "first chunk sent" flag. The base TTS implementation is a no-op, so without this override the callback would fire only on the very first turn (when the flag is still False from init) and never again on the same provider. """ try: self.active_provider.reset_first_audio_tracking() except Exception as e: logger.warning(f"[TTS] Failed to reset first_audio_tracking on {self.active_provider.label}: {e}")Forward the per-turn reset to the active provider.
speech_generation calls this before every synthesize() to clear the provider's "first chunk sent" flag. The base TTS implementation is a no-op, so without this override the callback would fire only on the very first turn (when the flag is still False from init) and never again on the same provider.
async def synthesize(self, text, **kwargs) ‑> None-
Expand source code
async def synthesize(self, text, **kwargs) -> None: """ Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting. """ if self.check_recovery(): if self._first_audio_callback is not None: try: self.active_provider.on_first_audio_byte(self._first_audio_callback) except Exception as e: logger.warning(f"[TTS] Failed to re-apply first_audio_byte callback after recovery: {e}") if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track: self._propagate_settings(self.active_provider) while True: current_provider = self.active_provider try: logger.info(f"[TTS] Attempting synthesis with {current_provider.label}") await current_provider.synthesize(text, **kwargs) logger.info(f"[TTS] Synthesis successful with {current_provider.label}") return except Exception as e: logger.error(f"[TTS] Synthesis failed with {current_provider.label}: {e}") self.emit("error", str(e)) switched = await self._switch_provider(str(e), failed_provider=current_provider) if not switched: logger.error(f"[TTS] All providers exhausted. Raising error.") raise e logger.info(f"[TTS] Retrying with new provider: {self.active_provider.label}")Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting.
Inherited members
class FlushMarker-
Expand source code
class FlushMarker: """Marker passed through a TTS text iterator to signal a segment boundary. Plugins that support segmented WebSocket synthesis (e.g. Cartesia, ElevenLabs streaming) treat this as "close the current segment, start a new one." It lets the pipeline release a complete sentence to the TTS provider without waiting for end-of-stream, reducing TTFB for the next segment. Plugins that don't override ``flush()`` simply ignore it. """ __slots__ = ()Marker passed through a TTS text iterator to signal a segment boundary.
Plugins that support segmented WebSocket synthesis (e.g. Cartesia, ElevenLabs streaming) treat this as "close the current segment, start a new one." It lets the pipeline release a complete sentence to the TTS provider without waiting for end-of-stream, reducing TTFB for the next segment. Plugins that don't override
flush()simply ignore it. class TTS (sample_rate: int = 16000, num_channels: int = 1, word_timestamps: bool = False)-
Expand source code
class TTS(EventEmitter[Literal["error", "word_spoken"]]): """Base class for Text-to-Speech implementations""" def __init__( self, sample_rate: int = 16000, num_channels: int = 1, word_timestamps: bool = False, ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = sample_rate self._num_channels = num_channels self._first_audio_callback: Optional[Callable[[], Awaitable[None]]] = None self.audio_track = None self.supports_word_timestamps = word_timestamps @property def label(self) -> str: """Get the TTS provider label""" return self._label @property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate @property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channels def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callback def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics pass async def pause(self) -> None: """Pause audio playback if the audio track supports it, otherwise interrupt.""" if self.audio_track and hasattr(self.audio_track, 'pause'): await self.audio_track.pause() else: await self.interrupt() async def resume(self) -> None: """Resume audio playback if the audio track supports it.""" if self.audio_track and hasattr(self.audio_track, 'resume'): await self.audio_track.resume() @property def can_pause(self) -> bool: """Return whether the current audio track supports pausing.""" return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause @abstractmethod async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech. Either a plain string or an async iterator that may yield ``str`` chunks and ``FlushMarker`` segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline ``isinstance`` check (or rely on ``segment_text`` which already drops them). voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedError @abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedError async def prewarm(self) -> None: """Pre-establish provider connections so the first ``synthesize()`` call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up). Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent. """ pass async def stream_synthesize( self, text_stream: AsyncIterator[Union[str, FlushMarker]], **kwargs: Any ) -> AsyncIterator[bytes]: """ Synthesize text stream to audio stream. This default implementation mocks the audio track to capture frames. Args: text_stream: Async iterator of text **kwargs: Additional arguments Yields: Audio bytes """ original_track = self.audio_track original_loop = getattr(self, "loop", None) frame_queue = asyncio.Queue() class QueueTrack: def __init__(self): self.hooks = None async def add_new_bytes(self, audio_data: bytes): await frame_queue.put(audio_data) def on_last_audio_byte(self, cb): pass def set_pipeline_hooks(self, hooks): self.hooks = hooks def enable_audio_input(self, manual_control=False): pass def interrupt(self): pass mock_track = QueueTrack() self.audio_track = mock_track if original_loop is None: self.loop = asyncio.get_running_loop() async def synthesize_task(): try: await self.synthesize(text_stream, **kwargs) finally: await frame_queue.put(None) task = asyncio.create_task(synthesize_task()) try: while True: get_task = asyncio.create_task(frame_queue.get()) done, pending = await asyncio.wait( [get_task, task], return_when=asyncio.FIRST_COMPLETED ) if get_task in done: data = get_task.result() if data is None: break yield data if task in done: if task.exception(): raise task.exception() finally: self.audio_track = original_track if original_loop is None: self.loop = None if not task.done(): task.cancel() async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up TTS: {self.label}") self._first_audio_callback = None try: import gc gc.collect() except Exception as e: logger.error(f"Error during TTS garbage collection: {e}") logger.info(f"TTS cleanup completed: {self.label}") async def __aenter__(self) -> TTS: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.aclose()Base class for Text-to-Speech implementations
Ancestors
- EventEmitter
- typing.Generic
Subclasses
Instance variables
prop can_pause : bool-
Expand source code
@property def can_pause(self) -> bool: """Return whether the current audio track supports pausing.""" return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pauseReturn whether the current audio track supports pausing.
prop label : str-
Expand source code
@property def label(self) -> str: """Get the TTS provider label""" return self._labelGet the TTS provider label
prop num_channels : int-
Expand source code
@property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channelsGet number of audio channels
prop sample_rate : int-
Expand source code
@property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rateGet audio sample rate
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up TTS: {self.label}") self._first_audio_callback = None try: import gc gc.collect() except Exception as e: logger.error(f"Error during TTS garbage collection: {e}") logger.info(f"TTS cleanup completed: {self.label}")Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
@abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedErrorInterrupt the TTS process
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None-
Expand source code
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callbackSet callback for when first audio byte is produced
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: """Pause audio playback if the audio track supports it, otherwise interrupt.""" if self.audio_track and hasattr(self.audio_track, 'pause'): await self.audio_track.pause() else: await self.interrupt()Pause audio playback if the audio track supports it, otherwise interrupt.
async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish provider connections so the first ``synthesize()`` call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up). Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent. """ passPre-establish provider connections so the first
synthesize()call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up).Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent.
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics passReset the first audio tracking state for next TTS task
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: """Resume audio playback if the audio track supports it.""" if self.audio_track and hasattr(self.audio_track, 'resume'): await self.audio_track.resume()Resume audio playback if the audio track supports it.
async def stream_synthesize(self,
text_stream: AsyncIterator[Union[str, FlushMarker]],
**kwargs: Any) ‑> AsyncIterator[bytes]-
Expand source code
async def stream_synthesize( self, text_stream: AsyncIterator[Union[str, FlushMarker]], **kwargs: Any ) -> AsyncIterator[bytes]: """ Synthesize text stream to audio stream. This default implementation mocks the audio track to capture frames. Args: text_stream: Async iterator of text **kwargs: Additional arguments Yields: Audio bytes """ original_track = self.audio_track original_loop = getattr(self, "loop", None) frame_queue = asyncio.Queue() class QueueTrack: def __init__(self): self.hooks = None async def add_new_bytes(self, audio_data: bytes): await frame_queue.put(audio_data) def on_last_audio_byte(self, cb): pass def set_pipeline_hooks(self, hooks): self.hooks = hooks def enable_audio_input(self, manual_control=False): pass def interrupt(self): pass mock_track = QueueTrack() self.audio_track = mock_track if original_loop is None: self.loop = asyncio.get_running_loop() async def synthesize_task(): try: await self.synthesize(text_stream, **kwargs) finally: await frame_queue.put(None) task = asyncio.create_task(synthesize_task()) try: while True: get_task = asyncio.create_task(frame_queue.get()) done, pending = await asyncio.wait( [get_task, task], return_when=asyncio.FIRST_COMPLETED ) if get_task in done: data = get_task.result() if data is None: break yield data if task in done: if task.exception(): raise task.exception() finally: self.audio_track = original_track if original_loop is None: self.loop = None if not task.done(): task.cancel()Synthesize text stream to audio stream.
This default implementation mocks the audio track to capture frames.
Args
text_stream- Async iterator of text
**kwargs- Additional arguments
Yields
Audio bytes
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech. Either a plain string or an async iterator that may yield ``str`` chunks and ``FlushMarker`` segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline ``isinstance`` check (or rely on ``segment_text`` which already drops them). voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedErrorConvert text to speech
Args
text- Text to convert to speech. Either a plain string or an async
iterator that may yield
strchunks andFlushMarkersegment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inlineisinstancecheck (or rely onsegment_textwhich already drops them). voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None
Inherited members