Module agents.tts.tts

Classes

class FlushMarker
Expand source code
class FlushMarker:
    """Marker passed through a TTS text iterator to signal a segment boundary.

    Plugins that support segmented WebSocket synthesis (e.g. Cartesia, ElevenLabs
    streaming) treat this as "close the current segment, start a new one." It
    lets the pipeline release a complete sentence to the TTS provider without
    waiting for end-of-stream, reducing TTFB for the next segment. Plugins that
    don't override ``flush()`` simply ignore it.
    """

    __slots__ = ()

Marker passed through a TTS text iterator to signal a segment boundary.

Plugins that support segmented WebSocket synthesis (e.g. Cartesia, ElevenLabs streaming) treat this as "close the current segment, start a new one." It lets the pipeline release a complete sentence to the TTS provider without waiting for end-of-stream, reducing TTFB for the next segment. Plugins that don't override flush() simply ignore it.

class TTS (sample_rate: int = 16000, num_channels: int = 1, word_timestamps: bool = False)
Expand source code
class TTS(EventEmitter[Literal["error", "word_spoken"]]):
    """Base class for Text-to-Speech implementations"""
    
    def __init__(
        self,
        sample_rate: int = 16000,
        num_channels: int = 1,
        word_timestamps: bool = False,
    ) -> None:
        super().__init__()
        self._label = f"{type(self).__module__}.{type(self).__name__}"
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._first_audio_callback: Optional[Callable[[], Awaitable[None]]] = None
        self.audio_track = None 
        self.supports_word_timestamps = word_timestamps

    @property
    def label(self) -> str:
        """Get the TTS provider label"""
        return self._label
    
    @property
    def sample_rate(self) -> int:
        """Get audio sample rate"""
        return self._sample_rate
    
    @property
    def num_channels(self) -> int:
        """Get number of audio channels"""
        return self._num_channels

    def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
        """Set callback for when first audio byte is produced"""
        self._first_audio_callback = callback

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        # To be overridden by implementations for TTFB metrics
        pass 

    async def pause(self) -> None:
        """Pause audio playback if the audio track supports it, otherwise interrupt."""
        if self.audio_track and hasattr(self.audio_track, 'pause'):
            await self.audio_track.pause()
        else:
            await self.interrupt()

    async def resume(self) -> None:
        """Resume audio playback if the audio track supports it."""
        if self.audio_track and hasattr(self.audio_track, 'resume'):
            await self.audio_track.resume()

    @property
    def can_pause(self) -> bool:
        """Return whether the current audio track supports pausing."""
        return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause

    @abstractmethod
    async def synthesize(
        self,
        text: AsyncIterator[Union[str, FlushMarker]] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any
    ) -> None:
        """
        Convert text to speech

        Args:
            text: Text to convert to speech. Either a plain string or an async
                iterator that may yield ``str`` chunks and ``FlushMarker``
                segment-boundary markers. Plugins that don't support per-segment
                flushing should drop the markers with an inline ``isinstance``
                check (or rely on ``segment_text`` which already drops them).
            voice_id: Optional voice identifier
            **kwargs: Additional provider-specific arguments

        Returns:
            None
        """
        raise NotImplementedError

    @abstractmethod
    async def interrupt(self) -> None:
        """Interrupt the TTS process"""
        raise NotImplementedError

    async def prewarm(self) -> None:
        """Pre-establish provider connections so the first ``synthesize()`` call
        doesn't pay startup cost (TLS, WebSocket handshake, model warm-up).

        Default is a no-op. Plugins with persistent connections (Cartesia,
        ElevenLabs WS, etc.) override this. Safe to call multiple times — must
        be idempotent.
        """
        pass

    async def stream_synthesize(
        self,
        text_stream: AsyncIterator[Union[str, FlushMarker]],
        **kwargs: Any
    ) -> AsyncIterator[bytes]:
        """
        Synthesize text stream to audio stream.
        
        This default implementation mocks the audio track to capture frames.
        
        Args:
            text_stream: Async iterator of text
            **kwargs: Additional arguments
            
        Yields:
            Audio bytes
        """
        original_track = self.audio_track
        original_loop = getattr(self, "loop", None)
        frame_queue = asyncio.Queue()

        class QueueTrack:
            def __init__(self):
                self.hooks = None
            async def add_new_bytes(self, audio_data: bytes):
                await frame_queue.put(audio_data)
            def on_last_audio_byte(self, cb):
                pass
            def set_pipeline_hooks(self, hooks):
                self.hooks = hooks
            def enable_audio_input(self, manual_control=False):
                pass
            def interrupt(self):
                pass

        mock_track = QueueTrack()
        self.audio_track = mock_track

        if original_loop is None:
            self.loop = asyncio.get_running_loop()
        
        async def synthesize_task():
            try:
                await self.synthesize(text_stream, **kwargs)
            finally:
                await frame_queue.put(None)
                
        task = asyncio.create_task(synthesize_task())
        
        try:
            while True:
                get_task = asyncio.create_task(frame_queue.get())
                done, pending = await asyncio.wait(
                    [get_task, task],
                    return_when=asyncio.FIRST_COMPLETED
                )
                
                if get_task in done:
                    data = get_task.result()
                    if data is None:
                        break
                    yield data
                    
                if task in done:
                    if task.exception():
                        raise task.exception()
                        
        finally:
            self.audio_track = original_track
            if original_loop is None:
                self.loop = None
            if not task.done():
                task.cancel()

    async def aclose(self) -> None:
        """Cleanup resources"""
        logger.info(f"Cleaning up TTS: {self.label}")
        self._first_audio_callback = None
        try:
            import gc
            gc.collect()
        except Exception as e:
            logger.error(f"Error during TTS garbage collection: {e}")
        
        logger.info(f"TTS cleanup completed: {self.label}")
    
    async def __aenter__(self) -> TTS:
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.aclose()

Base class for Text-to-Speech implementations

Ancestors

Subclasses

Instance variables

prop can_pause : bool
Expand source code
@property
def can_pause(self) -> bool:
    """Return whether the current audio track supports pausing."""
    return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause

Return whether the current audio track supports pausing.

prop label : str
Expand source code
@property
def label(self) -> str:
    """Get the TTS provider label"""
    return self._label

Get the TTS provider label

prop num_channels : int
Expand source code
@property
def num_channels(self) -> int:
    """Get number of audio channels"""
    return self._num_channels

Get number of audio channels

prop sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    """Get audio sample rate"""
    return self._sample_rate

Get audio sample rate

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    logger.info(f"Cleaning up TTS: {self.label}")
    self._first_audio_callback = None
    try:
        import gc
        gc.collect()
    except Exception as e:
        logger.error(f"Error during TTS garbage collection: {e}")
    
    logger.info(f"TTS cleanup completed: {self.label}")

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
@abstractmethod
async def interrupt(self) -> None:
    """Interrupt the TTS process"""
    raise NotImplementedError

Interrupt the TTS process

def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None
Expand source code
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None:
    """Set callback for when first audio byte is produced"""
    self._first_audio_callback = callback

Set callback for when first audio byte is produced

async def pause(self) ‑> None
Expand source code
async def pause(self) -> None:
    """Pause audio playback if the audio track supports it, otherwise interrupt."""
    if self.audio_track and hasattr(self.audio_track, 'pause'):
        await self.audio_track.pause()
    else:
        await self.interrupt()

Pause audio playback if the audio track supports it, otherwise interrupt.

async def prewarm(self) ‑> None
Expand source code
async def prewarm(self) -> None:
    """Pre-establish provider connections so the first ``synthesize()`` call
    doesn't pay startup cost (TLS, WebSocket handshake, model warm-up).

    Default is a no-op. Plugins with persistent connections (Cartesia,
    ElevenLabs WS, etc.) override this. Safe to call multiple times — must
    be idempotent.
    """
    pass

Pre-establish provider connections so the first synthesize() call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up).

Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent.

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    # To be overridden by implementations for TTFB metrics
    pass 

Reset the first audio tracking state for next TTS task

async def resume(self) ‑> None
Expand source code
async def resume(self) -> None:
    """Resume audio playback if the audio track supports it."""
    if self.audio_track and hasattr(self.audio_track, 'resume'):
        await self.audio_track.resume()

Resume audio playback if the audio track supports it.

async def stream_synthesize(self,
text_stream: AsyncIterator[Union[str, FlushMarker]],
**kwargs: Any) ‑> AsyncIterator[bytes]
Expand source code
async def stream_synthesize(
    self,
    text_stream: AsyncIterator[Union[str, FlushMarker]],
    **kwargs: Any
) -> AsyncIterator[bytes]:
    """
    Synthesize text stream to audio stream.
    
    This default implementation mocks the audio track to capture frames.
    
    Args:
        text_stream: Async iterator of text
        **kwargs: Additional arguments
        
    Yields:
        Audio bytes
    """
    original_track = self.audio_track
    original_loop = getattr(self, "loop", None)
    frame_queue = asyncio.Queue()

    class QueueTrack:
        def __init__(self):
            self.hooks = None
        async def add_new_bytes(self, audio_data: bytes):
            await frame_queue.put(audio_data)
        def on_last_audio_byte(self, cb):
            pass
        def set_pipeline_hooks(self, hooks):
            self.hooks = hooks
        def enable_audio_input(self, manual_control=False):
            pass
        def interrupt(self):
            pass

    mock_track = QueueTrack()
    self.audio_track = mock_track

    if original_loop is None:
        self.loop = asyncio.get_running_loop()
    
    async def synthesize_task():
        try:
            await self.synthesize(text_stream, **kwargs)
        finally:
            await frame_queue.put(None)
            
    task = asyncio.create_task(synthesize_task())
    
    try:
        while True:
            get_task = asyncio.create_task(frame_queue.get())
            done, pending = await asyncio.wait(
                [get_task, task],
                return_when=asyncio.FIRST_COMPLETED
            )
            
            if get_task in done:
                data = get_task.result()
                if data is None:
                    break
                yield data
                
            if task in done:
                if task.exception():
                    raise task.exception()
                    
    finally:
        self.audio_track = original_track
        if original_loop is None:
            self.loop = None
        if not task.done():
            task.cancel()

Synthesize text stream to audio stream.

This default implementation mocks the audio track to capture frames.

Args

text_stream
Async iterator of text
**kwargs
Additional arguments

Yields

Audio bytes

async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
@abstractmethod
async def synthesize(
    self,
    text: AsyncIterator[Union[str, FlushMarker]] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any
) -> None:
    """
    Convert text to speech

    Args:
        text: Text to convert to speech. Either a plain string or an async
            iterator that may yield ``str`` chunks and ``FlushMarker``
            segment-boundary markers. Plugins that don't support per-segment
            flushing should drop the markers with an inline ``isinstance``
            check (or rely on ``segment_text`` which already drops them).
        voice_id: Optional voice identifier
        **kwargs: Additional provider-specific arguments

    Returns:
        None
    """
    raise NotImplementedError

Convert text to speech

Args

text
Text to convert to speech. Either a plain string or an async iterator that may yield str chunks and FlushMarker segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline isinstance check (or rely on segment_text which already drops them).
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

Inherited members