Module agents.tts

Sub-modules

agents.tts.fallback_tts
agents.tts.tts

Classes

class FallbackTTS (providers: List[TTS],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3)
Expand source code
class FallbackTTS(TTS, FallbackBase):
    def __init__(self, providers: List[TTS], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3):
        TTS.__init__(
            self,
            sample_rate=providers[0].sample_rate, 
            num_channels=providers[0].num_channels
        )
        FallbackBase.__init__(self, providers, "TTS", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts)
        self._initializing = False
        self._setup_event_listeners()

    def _setup_event_listeners(self):
        self.active_provider.on("error", self._on_provider_error)

    def _on_provider_error(self, error_msg):
        failed_p = self.active_provider
        asyncio.create_task(self._handle_async_error(str(error_msg), failed_p))

    async def _handle_async_error(self, error_msg: str, failed_provider: Any):
        switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider)
        if not switched:
            self.emit("error", error_msg)

    async def _switch_provider(self, reason: str, failed_provider: Any = None):
        provider_to_cleanup = failed_provider if failed_provider else self.active_provider
        try:
            provider_to_cleanup.off("error", self._on_provider_error)
        except: pass

        active_before = self.active_provider
        switched = await super()._switch_provider(reason, failed_provider)
        active_after = self.active_provider
        
        if switched:
            if active_before != active_after:
                self.active_provider.on("error", self._on_provider_error)
                if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track:
                    self._propagate_settings(self.active_provider)
            return True
        return False
    
    def _propagate_settings(self, provider):
        """Helper to set loop/audio_track on a provider."""
        try:
            name = "loop"
            value = self.loop
            if hasattr(provider, f"_set_{name}"):
                getattr(provider, f"_set_{name}")(value)
            else:
                setattr(provider, name, value)
                
            name = "audio_track"
            value = self.audio_track
            if hasattr(provider, f"_set_{name}"):
                getattr(provider, f"_set_{name}")(value)
            else:
                setattr(provider, name, value)
        except Exception as e:
            logger.warning(f"[TTS] Failed to propagate settings to {provider.label}: {e}")

    def __setattr__(self, name: str, value: Any) -> None:
        """
        Intercept attribute assignments to propagate loop and audio_track to all providers.
        This allows FallbackTTS to work without CascadingPipeline needing to know about it.
        """
        object.__setattr__(self, name, value)
        
        if name in ("loop", "audio_track") and hasattr(self, "providers") and not getattr(self, "_initializing", False):
            logger.info(f"[TTS] FallbackTTS: {name} was set to {value}, propagating to all providers")
            for provider in self.providers:
                try:
                    if hasattr(provider, f"_set_{name}"):
                        getattr(provider, f"_set_{name}")(value)
                    else:
                        setattr(provider, name, value)
                    logger.info(f"[TTS] Set {name} on provider {provider.label}")
                except Exception as e:
                    logger.warning(f"[TTS] Failed to set {name} on provider {provider.label}: {e}")

    def _set_loop_and_audio_track(self, loop, audio_track):
        """
        Optional method for explicit setup (for compatibility).
        Setting these attributes will trigger __setattr__ which propagates to all providers.
        """
        logger.info(f"[TTS] _set_loop_and_audio_track called on FallbackTTS. loop={loop}, audio_track={audio_track}")
        self.loop = loop  
        self.audio_track = audio_track  

    async def synthesize(self, text, **kwargs) -> None:
        """
        Try active provider. If exception, switch and retry with same text.
        Checks for recovery of primary providers before starting.
        """
        if self.check_recovery():
             if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track:
                 self._propagate_settings(self.active_provider)

        while True:
            current_provider = self.active_provider
            try:
                logger.info(f"[TTS] Attempting synthesis with {current_provider.label}")
                await current_provider.synthesize(text, **kwargs)
                logger.info(f"[TTS] Synthesis successful with {current_provider.label}")
                return
            except Exception as e:
                logger.error(f"[TTS] Synthesis failed with {current_provider.label}: {e}")
                switched = await self._switch_provider(str(e), failed_provider=current_provider)
                if not switched:
                    logger.error(f"[TTS] All providers exhausted. Raising error.")
                    raise e
                logger.info(f"[TTS] Retrying with new provider: {self.active_provider.label}")

    async def interrupt(self):
        if self.active_provider:
            await self.active_provider.interrupt()

    async def aclose(self):
        for p in self.providers:
            await p.aclose()
        await super().aclose()

Base class for Text-to-Speech implementations

Ancestors

Methods

async def synthesize(self, text, **kwargs) ‑> None
Expand source code
async def synthesize(self, text, **kwargs) -> None:
    """
    Try active provider. If exception, switch and retry with same text.
    Checks for recovery of primary providers before starting.
    """
    if self.check_recovery():
         if hasattr(self, "loop") and self.loop and hasattr(self, "audio_track") and self.audio_track:
             self._propagate_settings(self.active_provider)

    while True:
        current_provider = self.active_provider
        try:
            logger.info(f"[TTS] Attempting synthesis with {current_provider.label}")
            await current_provider.synthesize(text, **kwargs)
            logger.info(f"[TTS] Synthesis successful with {current_provider.label}")
            return
        except Exception as e:
            logger.error(f"[TTS] Synthesis failed with {current_provider.label}: {e}")
            switched = await self._switch_provider(str(e), failed_provider=current_provider)
            if not switched:
                logger.error(f"[TTS] All providers exhausted. Raising error.")
                raise e
            logger.info(f"[TTS] Retrying with new provider: {self.active_provider.label}")

Try active provider. If exception, switch and retry with same text. Checks for recovery of primary providers before starting.

Inherited members

class TTS (sample_rate: int = 16000, num_channels: int = 1)
Expand source code
class TTS(EventEmitter[Literal["error"]]):
    """Base class for Text-to-Speech implementations"""
    
    def __init__(
        self,
        sample_rate: int = 16000,
        num_channels: int = 1
    ) -> None:
        super().__init__()
        self._label = f"{type(self).__module__}.{type(self).__name__}"
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._first_audio_callback: Optional[Callable[[], Awaitable[None]]] = None
        self.audio_track = None 

    @property
    def label(self) -> str:
        """Get the TTS provider label"""
        return self._label
    
    @property
    def sample_rate(self) -> int:
        """Get audio sample rate"""
        return self._sample_rate
    
    @property
    def num_channels(self) -> int:
        """Get number of audio channels"""
        return self._num_channels

    def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when first audio byte is produced"""
        self._first_audio_callback = callback

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        # To be overridden by implementations for TTFB metrics
        pass 

    async def pause(self) -> None:
        if self.audio_track and hasattr(self.audio_track, 'pause'):
            await self.audio_track.pause()
        else:
            await self.interrupt()

    async def resume(self) -> None:
        if self.audio_track and hasattr(self.audio_track, 'resume'):
            await self.audio_track.resume()

    @property
    def can_pause(self) -> bool:
        return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause

    @abstractmethod
    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """
        Convert text to speech
        
        Args:
            text: Text to convert to speech (either string or async iterator of strings)
            voice_id: Optional voice identifier
            **kwargs: Additional provider-specific arguments
            
        Returns:
            None
        """
        raise NotImplementedError

    @abstractmethod
    async def interrupt(self) -> None:
        """Interrupt the TTS process"""
        raise NotImplementedError

    async def aclose(self) -> None:
        """Cleanup resources"""
        logger.info(f"Cleaning up TTS: {self.label}")
        self._first_audio_callback = None
        try:
            import gc
            gc.collect()
        except Exception as e:
            logger.error(f"Error during TTS garbage collection: {e}")
        
        logger.info(f"TTS cleanup completed: {self.label}")
    
    async def __aenter__(self) -> TTS:
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.aclose()

Base class for Text-to-Speech implementations

Ancestors

Subclasses

Instance variables

prop can_pause : bool
Expand source code
@property
def can_pause(self) -> bool:
    return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause
prop label : str
Expand source code
@property
def label(self) -> str:
    """Get the TTS provider label"""
    return self._label

Get the TTS provider label

prop num_channels : int
Expand source code
@property
def num_channels(self) -> int:
    """Get number of audio channels"""
    return self._num_channels

Get number of audio channels

prop sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    """Get audio sample rate"""
    return self._sample_rate

Get audio sample rate

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    logger.info(f"Cleaning up TTS: {self.label}")
    self._first_audio_callback = None
    try:
        import gc
        gc.collect()
    except Exception as e:
        logger.error(f"Error during TTS garbage collection: {e}")
    
    logger.info(f"TTS cleanup completed: {self.label}")

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
@abstractmethod
async def interrupt(self) -> None:
    """Interrupt the TTS process"""
    raise NotImplementedError

Interrupt the TTS process

def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None
Expand source code
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Set callback for when first audio byte is produced"""
    self._first_audio_callback = callback

Set callback for when first audio byte is produced

async def pause(self) ‑> None
Expand source code
async def pause(self) -> None:
    if self.audio_track and hasattr(self.audio_track, 'pause'):
        await self.audio_track.pause()
    else:
        await self.interrupt()
def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    # To be overridden by implementations for TTFB metrics
    pass 

Reset the first audio tracking state for next TTS task

async def resume(self) ‑> None
Expand source code
async def resume(self) -> None:
    if self.audio_track and hasattr(self.audio_track, 'resume'):
        await self.audio_track.resume()
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
@abstractmethod
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any
) -> None:
    """
    Convert text to speech
    
    Args:
        text: Text to convert to speech (either string or async iterator of strings)
        voice_id: Optional voice identifier
        **kwargs: Additional provider-specific arguments
        
    Returns:
        None
    """
    raise NotImplementedError

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None