Module agents.stt
Sub-modules
agents.stt.fallback_sttagents.stt.stt
Classes
class FallbackSTT (providers: List[STT],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3)-
Expand source code
class FallbackSTT(STT, FallbackBase): """STT wrapper that automatically fails over to backup providers on errors and attempts recovery of higher-priority ones.""" def __init__(self, providers: List[STT], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3): STT.__init__(self) FallbackBase.__init__(self, providers, "STT", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts) self._transcript_callback = None self._setup_event_listeners() def _setup_event_listeners(self): """Attach error listener to the currently active provider.""" self.active_provider.on("error", self._on_provider_error) def _on_provider_error(self, error_msg): """Handle async errors (e.g. WebSocket disconnects)""" failed_p = self.active_provider asyncio.create_task(self._handle_async_error(str(error_msg), failed_p)) async def _handle_async_error(self, error_msg: str, failed_provider: Any): """Async wrapper to handle switching logic""" switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider) self.emit("error", error_msg) async def _switch_provider(self, reason: str, failed_provider: Any = None): """Override switch to handle STT specific setup""" provider_to_cleanup = failed_provider if failed_provider else self.active_provider try: provider_to_cleanup.off("error", self._on_provider_error) except: pass active_before = self.active_provider switched = await super()._switch_provider(reason, failed_provider) active_after = self.active_provider if switched: if active_before != active_after: if self._transcript_callback: self.active_provider.on_stt_transcript(self._transcript_callback) self.active_provider.on("error", self._on_provider_error) return True return False def on_stt_transcript(self, callback) -> None: """Capture the callback so we can re-apply it after switching.""" self._transcript_callback = callback self.active_provider.on_stt_transcript(callback) async def process_audio(self, audio_frames: bytes, **kwargs) -> None: """ Main entry point. If this fails, it's usually a connection error. We catch, switch, and retry immediately. """ if self.check_recovery(): if self._transcript_callback: self.active_provider.on_stt_transcript(self._transcript_callback) self.active_provider.on("error", self._on_provider_error) current_provider = self.active_provider try: await current_provider.process_audio(audio_frames, **kwargs) except Exception as e: switched = await self._switch_provider(str(e), failed_provider=current_provider) self.emit("error", str(e)) if switched: await self.active_provider.process_audio(audio_frames, **kwargs) else: raise e async def aclose(self) -> None: """Close all providers.""" for p in self.providers: await p.aclose() await super().aclose()STT wrapper that automatically fails over to backup providers on errors and attempts recovery of higher-priority ones.
Ancestors
- STT
- EventEmitter
- typing.Generic
- FallbackBase
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Close all providers.""" for p in self.providers: await p.aclose() await super().aclose()Close all providers.
def on_stt_transcript(self, callback) ‑> None-
Expand source code
def on_stt_transcript(self, callback) -> None: """Capture the callback so we can re-apply it after switching.""" self._transcript_callback = callback self.active_provider.on_stt_transcript(callback)Capture the callback so we can re-apply it after switching.
async def process_audio(self, audio_frames: bytes, **kwargs) ‑> None-
Expand source code
async def process_audio(self, audio_frames: bytes, **kwargs) -> None: """ Main entry point. If this fails, it's usually a connection error. We catch, switch, and retry immediately. """ if self.check_recovery(): if self._transcript_callback: self.active_provider.on_stt_transcript(self._transcript_callback) self.active_provider.on("error", self._on_provider_error) current_provider = self.active_provider try: await current_provider.process_audio(audio_frames, **kwargs) except Exception as e: switched = await self._switch_provider(str(e), failed_provider=current_provider) self.emit("error", str(e)) if switched: await self.active_provider.process_audio(audio_frames, **kwargs) else: raise eMain entry point. If this fails, it's usually a connection error. We catch, switch, and retry immediately.
Inherited members
class STT-
Expand source code
class STT(EventEmitter[Literal["error"]]): """Base class for Speech-to-Text implementations""" def __init__( self, ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._transcript_callback: Optional[Callable[[STTResponse], Awaitable[None]]] = None @property def label(self) -> str: """Get the STT provider label""" return self._label def on_stt_transcript(self, callback: Callable[[STTResponse], Awaitable[None]]) -> None: """Set callback for receiving STT transcripts""" self._transcript_callback = callback @abstractmethod async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """ Process audio frames and convert to text Args: audio_frames: Iterator of bytes to process language: Optional language code for recognition **kwargs: Additional provider-specific arguments Returns: AsyncIterator yielding STTResponse objects """ raise NotImplementedError async def stream_transcribe( self, audio_stream: AsyncIterator[bytes], **kwargs: Any ) -> AsyncIterator[STTResponse]: """ Process audio stream and yield STT events (simulated via process_audio). This default implementation allows using the streaming hook pattern even if the STT subclass doesn't implement a native streaming method, by redirecting callbacks to a queue. Args: audio_stream: Async iterator of audio bytes **kwargs: Additional provider-specific arguments Yields: STTResponse objects """ event_queue = asyncio.Queue() async def capture_callback(response): await event_queue.put(response) original_callback = self._transcript_callback self.on_stt_transcript(capture_callback) async def feed_audio(): try: async for chunk in audio_stream: await self.process_audio(chunk, **kwargs) finally: pass feed_task = asyncio.create_task(feed_audio()) try: while True: done, pending = await asyncio.wait( [asyncio.create_task(event_queue.get()), feed_task], return_when=asyncio.FIRST_COMPLETED ) for task in done: if task == feed_task: pass else: event = task.result() yield event if feed_task.done() and event_queue.empty(): break finally: self._transcript_callback = original_callback if not feed_task.done(): feed_task.cancel() async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up STT: {self.label}") self._transcript_callback = None try: import gc gc.collect() logger.info(f"STT garbage collection completed: {self.label}") except Exception as e: logger.error(f"Error during STT garbage collection: {e}") logger.info(f"STT cleanup completed: {self.label}")Base class for Speech-to-Text implementations
Ancestors
- EventEmitter
- typing.Generic
Subclasses
Instance variables
prop label : str-
Expand source code
@property def label(self) -> str: """Get the STT provider label""" return self._labelGet the STT provider label
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" logger.info(f"Cleaning up STT: {self.label}") self._transcript_callback = None try: import gc gc.collect() logger.info(f"STT garbage collection completed: {self.label}") except Exception as e: logger.error(f"Error during STT garbage collection: {e}") logger.info(f"STT cleanup completed: {self.label}")Cleanup resources
def on_stt_transcript(self,
callback: Callable[[STTResponse], Awaitable[None]]) ‑> None-
Expand source code
def on_stt_transcript(self, callback: Callable[[STTResponse], Awaitable[None]]) -> None: """Set callback for receiving STT transcripts""" self._transcript_callback = callbackSet callback for receiving STT transcripts
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """ Process audio frames and convert to text Args: audio_frames: Iterator of bytes to process language: Optional language code for recognition **kwargs: Additional provider-specific arguments Returns: AsyncIterator yielding STTResponse objects """ raise NotImplementedErrorProcess audio frames and convert to text
Args
audio_frames- Iterator of bytes to process
language- Optional language code for recognition
**kwargs- Additional provider-specific arguments
Returns
AsyncIterator yielding STTResponse objects
async def stream_transcribe(self, audio_stream: AsyncIterator[bytes], **kwargs: Any) ‑> AsyncIterator[STTResponse]-
Expand source code
async def stream_transcribe( self, audio_stream: AsyncIterator[bytes], **kwargs: Any ) -> AsyncIterator[STTResponse]: """ Process audio stream and yield STT events (simulated via process_audio). This default implementation allows using the streaming hook pattern even if the STT subclass doesn't implement a native streaming method, by redirecting callbacks to a queue. Args: audio_stream: Async iterator of audio bytes **kwargs: Additional provider-specific arguments Yields: STTResponse objects """ event_queue = asyncio.Queue() async def capture_callback(response): await event_queue.put(response) original_callback = self._transcript_callback self.on_stt_transcript(capture_callback) async def feed_audio(): try: async for chunk in audio_stream: await self.process_audio(chunk, **kwargs) finally: pass feed_task = asyncio.create_task(feed_audio()) try: while True: done, pending = await asyncio.wait( [asyncio.create_task(event_queue.get()), feed_task], return_when=asyncio.FIRST_COMPLETED ) for task in done: if task == feed_task: pass else: event = task.result() yield event if feed_task.done() and event_queue.empty(): break finally: self._transcript_callback = original_callback if not feed_task.done(): feed_task.cancel()Process audio stream and yield STT events (simulated via process_audio).
This default implementation allows using the streaming hook pattern even if the STT subclass doesn't implement a native streaming method, by redirecting callbacks to a queue.
Args
audio_stream- Async iterator of audio bytes
**kwargs- Additional provider-specific arguments
Yields
STTResponse objects
Inherited members
class STTResponse (**data: Any)-
Expand source code
class STTResponse(BaseModel): """Response from STT processing Attributes: event_type: The type of speech event. data: The data from the speech event. metadata: Additional metadata from the speech event. """ event_type: SpeechEventType data: SpeechData metadata: Optional[dict[str, Any]] = NoneResponse from STT processing
Attributes
event_type- The type of speech event.
data- The data from the speech event.
metadata- Additional metadata from the speech event.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var data : SpeechDatavar event_type : SpeechEventTypevar metadata : dict[str, typing.Any] | Nonevar model_config
class SpeechData (text: str,
confidence: float = 0.0,
language: Optional[str] = None,
start_time: float = 0.0,
end_time: float = 0.0,
duration: float = 0.0)-
Expand source code
@dataclass class SpeechData: """Data structure for speech recognition results Attributes: text: The recognized text. confidence: The confidence level of the recognition. language: The language of the recognized text. start_time: The start time of the speech. end_time: The end time of the speech. """ text: str confidence: float = 0.0 language: Optional[str] = None start_time: float = 0.0 end_time: float = 0.0 duration: float = 0.0Data structure for speech recognition results
Attributes
text- The recognized text.
confidence- The confidence level of the recognition.
language- The language of the recognized text.
start_time- The start time of the speech.
end_time- The end time of the speech.
Instance variables
var confidence : floatvar duration : floatvar end_time : floatvar language : str | Nonevar start_time : floatvar text : str
class SpeechEventType (*args, **kwds)-
Expand source code
class SpeechEventType(str, Enum): """Type of speech event""" START = "start_of_speech" INTERIM = "interim_transcript" PREFLIGHT = "preflight_transcript" FINAL = "final_transcript" END = "end_of_speech"Type of speech event
Ancestors
- builtins.str
- enum.Enum
Class variables
var ENDvar FINALvar INTERIMvar PREFLIGHTvar START