Module agents.tts.tts
Classes
class FlushMarker-
Expand source code
class FlushMarker: """Marker passed through a TTS text iterator to signal a segment boundary. Plugins that support segmented WebSocket synthesis (e.g. Cartesia, ElevenLabs streaming) treat this as "close the current segment, start a new one." It lets the pipeline release a complete sentence to the TTS provider without waiting for end-of-stream, reducing TTFB for the next segment. Plugins that don't override ``flush()`` simply ignore it. """ __slots__ = ()Marker passed through a TTS text iterator to signal a segment boundary.
Plugins that support segmented WebSocket synthesis (e.g. Cartesia, ElevenLabs streaming) treat this as "close the current segment, start a new one." It lets the pipeline release a complete sentence to the TTS provider without waiting for end-of-stream, reducing TTFB for the next segment. Plugins that don't override
flush()simply ignore it. class TTS (sample_rate: int = 16000, num_channels: int = 1, word_timestamps: bool = False)-
Expand source code
class TTS(EventEmitter[Literal["error", "word_spoken"]]): """Base class for Text-to-Speech implementations""" def __init__( self, sample_rate: int = 16000, num_channels: int = 1, word_timestamps: bool = False, ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = sample_rate self._num_channels = num_channels self._first_audio_callback: Optional[Callable[[], Awaitable[None]]] = None self.audio_track = None self.supports_word_timestamps = word_timestamps @property def label(self) -> str: """Get the TTS provider label""" return self._label @property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rate @property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channels def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callback def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics pass async def pause(self) -> None: """Pause audio playback if the audio track supports it, otherwise interrupt.""" if self.audio_track and hasattr(self.audio_track, 'pause'): await self.audio_track.pause() else: await self.interrupt() async def resume(self) -> None: """Resume audio playback if the audio track supports it.""" if self.audio_track and hasattr(self.audio_track, 'resume'): await self.audio_track.resume() @property def can_pause(self) -> bool: """Return whether the current audio track supports pausing.""" return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pause @abstractmethod async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech. Either a plain string or an async iterator that may yield ``str`` chunks and ``FlushMarker`` segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline ``isinstance`` check (or rely on ``segment_text`` which already drops them). voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedError @abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedError async def prewarm(self) -> None: """Pre-establish provider connections so the first ``synthesize()`` call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up). Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent. """ pass async def stream_synthesize( self, text_stream: AsyncIterator[Union[str, FlushMarker]], **kwargs: Any ) -> AsyncIterator[bytes]: """ Synthesize text stream to audio stream. This default implementation mocks the audio track to capture frames. Args: text_stream: Async iterator of text **kwargs: Additional arguments Yields: Audio bytes """ original_track = self.audio_track original_loop = getattr(self, "loop", None) frame_queue = asyncio.Queue() class QueueTrack: def __init__(self): self.hooks = None async def add_new_bytes(self, audio_data: bytes): await frame_queue.put(audio_data) def on_last_audio_byte(self, cb): pass def set_pipeline_hooks(self, hooks): self.hooks = hooks def enable_audio_input(self, manual_control=False): pass def interrupt(self): pass mock_track = QueueTrack() self.audio_track = mock_track if original_loop is None: self.loop = asyncio.get_running_loop() async def synthesize_task(): try: await self.synthesize(text_stream, **kwargs) finally: await frame_queue.put(None) task = asyncio.create_task(synthesize_task()) try: while True: get_task = asyncio.create_task(frame_queue.get()) done, pending = await asyncio.wait( [get_task, task], return_when=asyncio.FIRST_COMPLETED ) if get_task in done: data = get_task.result() if data is None: break yield data if task in done: if task.exception(): raise task.exception() finally: self.audio_track = original_track if original_loop is None: self.loop = None if not task.done(): task.cancel() async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up TTS: {self.label}") self._first_audio_callback = None try: import gc gc.collect() except Exception as e: logger.error(f"Error during TTS garbage collection: {e}") logger.info(f"TTS cleanup completed: {self.label}") async def __aenter__(self) -> TTS: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.aclose()Base class for Text-to-Speech implementations
Ancestors
- EventEmitter
- typing.Generic
Subclasses
Instance variables
prop can_pause : bool-
Expand source code
@property def can_pause(self) -> bool: """Return whether the current audio track supports pausing.""" return self.audio_track and hasattr(self.audio_track, 'can_pause') and self.audio_track.can_pauseReturn whether the current audio track supports pausing.
prop label : str-
Expand source code
@property def label(self) -> str: """Get the TTS provider label""" return self._labelGet the TTS provider label
prop num_channels : int-
Expand source code
@property def num_channels(self) -> int: """Get number of audio channels""" return self._num_channelsGet number of audio channels
prop sample_rate : int-
Expand source code
@property def sample_rate(self) -> int: """Get audio sample rate""" return self._sample_rateGet audio sample rate
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up TTS: {self.label}") self._first_audio_callback = None try: import gc gc.collect() except Exception as e: logger.error(f"Error during TTS garbage collection: {e}") logger.info(f"TTS cleanup completed: {self.label}")Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
@abstractmethod async def interrupt(self) -> None: """Interrupt the TTS process""" raise NotImplementedErrorInterrupt the TTS process
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) ‑> None-
Expand source code
def on_first_audio_byte(self, callback: Callable[[], Awaitable[None]]) -> None: """Set callback for when first audio byte is produced""" self._first_audio_callback = callbackSet callback for when first audio byte is produced
async def pause(self) ‑> None-
Expand source code
async def pause(self) -> None: """Pause audio playback if the audio track supports it, otherwise interrupt.""" if self.audio_track and hasattr(self.audio_track, 'pause'): await self.audio_track.pause() else: await self.interrupt()Pause audio playback if the audio track supports it, otherwise interrupt.
async def prewarm(self) ‑> None-
Expand source code
async def prewarm(self) -> None: """Pre-establish provider connections so the first ``synthesize()`` call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up). Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent. """ passPre-establish provider connections so the first
synthesize()call doesn't pay startup cost (TLS, WebSocket handshake, model warm-up).Default is a no-op. Plugins with persistent connections (Cartesia, ElevenLabs WS, etc.) override this. Safe to call multiple times — must be idempotent.
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: """Reset the first audio tracking state for next TTS task""" # To be overridden by implementations for TTFB metrics passReset the first audio tracking state for next TTS task
async def resume(self) ‑> None-
Expand source code
async def resume(self) -> None: """Resume audio playback if the audio track supports it.""" if self.audio_track and hasattr(self.audio_track, 'resume'): await self.audio_track.resume()Resume audio playback if the audio track supports it.
async def stream_synthesize(self,
text_stream: AsyncIterator[Union[str, FlushMarker]],
**kwargs: Any) ‑> AsyncIterator[bytes]-
Expand source code
async def stream_synthesize( self, text_stream: AsyncIterator[Union[str, FlushMarker]], **kwargs: Any ) -> AsyncIterator[bytes]: """ Synthesize text stream to audio stream. This default implementation mocks the audio track to capture frames. Args: text_stream: Async iterator of text **kwargs: Additional arguments Yields: Audio bytes """ original_track = self.audio_track original_loop = getattr(self, "loop", None) frame_queue = asyncio.Queue() class QueueTrack: def __init__(self): self.hooks = None async def add_new_bytes(self, audio_data: bytes): await frame_queue.put(audio_data) def on_last_audio_byte(self, cb): pass def set_pipeline_hooks(self, hooks): self.hooks = hooks def enable_audio_input(self, manual_control=False): pass def interrupt(self): pass mock_track = QueueTrack() self.audio_track = mock_track if original_loop is None: self.loop = asyncio.get_running_loop() async def synthesize_task(): try: await self.synthesize(text_stream, **kwargs) finally: await frame_queue.put(None) task = asyncio.create_task(synthesize_task()) try: while True: get_task = asyncio.create_task(frame_queue.get()) done, pending = await asyncio.wait( [get_task, task], return_when=asyncio.FIRST_COMPLETED ) if get_task in done: data = get_task.result() if data is None: break yield data if task in done: if task.exception(): raise task.exception() finally: self.audio_track = original_track if original_loop is None: self.loop = None if not task.done(): task.cancel()Synthesize text stream to audio stream.
This default implementation mocks the audio track to capture frames.
Args
text_stream- Async iterator of text
**kwargs- Additional arguments
Yields
Audio bytes
async def synthesize(self,
text: AsyncIterator[Union[str, FlushMarker]] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def synthesize( self, text: AsyncIterator[Union[str, FlushMarker]] | str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: """ Convert text to speech Args: text: Text to convert to speech. Either a plain string or an async iterator that may yield ``str`` chunks and ``FlushMarker`` segment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inline ``isinstance`` check (or rely on ``segment_text`` which already drops them). voice_id: Optional voice identifier **kwargs: Additional provider-specific arguments Returns: None """ raise NotImplementedErrorConvert text to speech
Args
text- Text to convert to speech. Either a plain string or an async
iterator that may yield
strchunks andFlushMarkersegment-boundary markers. Plugins that don't support per-segment flushing should drop the markers with an inlineisinstancecheck (or rely onsegment_textwhich already drops them). voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None
Inherited members