Module agents.tts
Sub-modules
agents.tts.fallback_ttsagents.tts.tts
Classes
class FallbackTTS (providers: List[TTS],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3)-
Expand source code
class FallbackTTS(TTS, FallbackBase): def __init__(self, providers: List[TTS], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3): TTS.__init__( self, sample_rate=providers[0].sample_rate, num_channels=providers[0].num_channels ) FallbackBase.__init__(self, providers, "TTS", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts) self._initializing = False self._setup_event_listeners() def _setup_event_listeners(self): self.active_provider.on("error", self._on_provider_error) def _on_provider_error(self, error_msg): failed_p = self.active_provider asyncio.create_task(self._handle_async_error(str(error_msg), failed_p)) async def _handle_async_error(self, error_msg: str, failed_provider: Any): switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider) if not switched: self.emit("error", error_msg) async def _switch_provider(self, reason: str, failed_provider: Any = None): provider_to_cleanup = failed_provider if failed_provider else self.active_provider try: provider_to_cleanup.off("error", self._on_provider_error) except: pass active_before = self.active_provider switched = await super()._switch_provider(reason, failed_provider) active_after = self.active_provider if switched: if active_before != active_after: self.active_provider.on("error", self._on_provider_error) if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track: self._propagate_settings(self.active_provider) return True return False def _propagate_settings(self, provider): """Helper to set loop/audio_track on a provider.""" try: name = "loop" value = self.loop if hasattr(provider, f"_set_{name}"): getattr(provider, f"_set_{name}")(value) else: setattr(provider, name, value) name = "audio_track" value = self.audio_track if hasattr(provider, f"_set_{name}"): getattr(provider, f"_set_{name}")(value) else: setattr(provider, name, value) except Exception as e: logger.warning(f"[TTS] Failed to propagate settings to {provider.label}: {e}") def __setattr__(self, name: str, value: Any) -> None: """ Intercept attribute assignments to propagate loop and audio_track to all providers. This allows FallbackTTS to work without CascadingPipeline needing to know about it. """ object.__setattr__(self, name, value) if name in ("loop", "audio_track") and hasattr(self, "providers") and not getattr(self, "_initializing", False): logger.info(f"[TTS] FallbackTTS: {name} was set to {value}, propagating to all providers") for provider in self.providers: try: if hasattr(provider, f"_set_{name}"): getattr(provider, f"_set_{name}")(value) else: setattr(provider, name, value) logger.info(f"[TTS] Set {name} on provider {provider.label}") except Exception as e: logger.warning(f"[TTS] Failed to set {name} on provider {provider.label}: {e}") def _set_loop_and_audio_track(self, loop, audio_track): """ Optional method for explicit setup (for compatibility). Setting these attributes will trigger __setattr__ which propagates to all providers. """ logger.info(f"[TTS] _set_loop_and_audio_track called on FallbackTTS. loop={loop}, audio_track={audio_track}") self.loop = loop self.audio_track = audio_track async def synthesize(self, text, **kwargs) -> None: """ Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting. """ if self.check_recovery(): if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track: self._propagate_settings(self.active_provider) while True: current_provider = self.active_provider try: logger.info(f"[TTS] Attempting synthesis with {current_provider.label}") await current_provider.synthesize(text, **kwargs) logger.info(f"[TTS] Synthesis successful with {current_provider.label}") return except Exception as e: logger.error(f"[TTS] Synthesis failed with {current_provider.label}: {e}") switched = await self._switch_provider(str(e), failed_provider=current_provider) if not switched: logger.error(f"[TTS] All providers exhausted. Raising error.") raise e logger.info(f"[TTS] Retrying with new provider: {self.active_provider.label}") async def interrupt(self): if self.active_provider: await self.active_provider.interrupt() async def aclose(self): for p in self.providers: await p.aclose() await super().aclose()Base class for Text-to-Speech implementations
Ancestors
- TTS
- EventEmitter
- typing.Generic
- FallbackBase
Methods
async def synthesize(self, text, **kwargs) ‑> None-
Expand source code
async def synthesize(self, text, **kwargs) -> None: """ Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting. """ if self.check_recovery(): if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track: self._propagate_settings(self.active_provider) while True: current_provider = self.active_provider try: logger.info(f"[TTS] Attempting synthesis with {current_provider.label}") await current_provider.synthesize(text, **kwargs) logger.info(f"[TTS] Synthesis successful with {current_provider.label}") return except Exception as e: logger.error(f"[TTS] Synthesis failed with {current_provider.label}: {e}") switched = await self._switch_provider(str(e), failed_provider=current_provider) if not switched: logger.error(f"[TTS] All providers exhausted. Raising error.") raise e logger.info(f"[TTS] Retrying with new provider: {self.active_provider.label}")Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting.
Inherited members
class TTS (sample_rate: int = 16000, num_channels: int = 1)-
Expand source code
class TTS(EventEmitter[Literal["error"]]): """Base class for Text-to-Speech implementations""" def __init__( self, sample_rate: int = 16000, num_channels: int = 1 ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = sample_rate self._num_channels = num_channels self._first_audio_callback: Optional[Callable[[], Awaitable[None]]] = None self.audio_track = None @property def label(self) -> str: """Get the TTS provider label""" return self._label @property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate @property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channels def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callback def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics pass async def pause(self) -> None: if self.audio_track and hasattr(self.audio_track, 'pause'): await self.audio_track.pause() else: await self.interrupt() async def resume(self) -> None: if self.audio_track and hasattr(self.audio_track, 'resume'): await self.audio_track.resume() @property def can_pause(self) -> bool: return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause @abstractmethod async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech (either string or async iterator of strings) voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedError @abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedError async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up TTS: {self.label}") self._first_audio_callback = None try: import gc gc.collect() except Exception as e: logger.error(f"Error during TTS garbage collection: {e}") logger.info(f"TTS cleanup completed: {self.label}") async def __aenter__(self) -> TTS: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.aclose()Base class for Text-to-Speech implementations
Ancestors
- EventEmitter
- typing.Generic
Subclasses
Instance variables
prop can_pause : bool-
Expand source code
@property def can_pause(self) -> bool: return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause prop label : str-
Expand source code
@property def label(self) -> str: """Get the TTS provider label""" return self._labelGet the TTS provider label
prop num_channels : int-
Expand source code
@property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channelsGet number of audio channels
prop sample_rate : int-
Expand source code
@property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rateGet audio sample rate
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up TTS: {self.label}") self._first_audio_callback = None try: import gc gc.collect() except Exception as e: logger.error(f"Error during TTS garbage collection: {e}") logger.info(f"TTS cleanup completed: {self.label}")Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
@abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedErrorInterrupt the TTS process
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None-
Expand source code
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callbackSet callback for when first audio byte is produced
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: if self.audio_track and hasattr(self.audio_track, 'pause'): await self.audio_track.pause() else: await self.interrupt() def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics passReset the first audio tracking state for next TTS task
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: if self.audio_track and hasattr(self.audio_track, 'resume'): await self.audio_track.resume() async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech (either string or async iterator of strings) voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedErrorConvert text to speech
Args
text- Text to convert to speech (either string or async iterator of strings)
voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None