Package videosdk.plugins.azure
Sub-modules
videosdk.plugins.azure.sttvideosdk.plugins.azure.ttsvideosdk.plugins.azure.voice_live
Classes
class AzureSTT (*,
speech_key: Optional[str] = None,
speech_region: Optional[str] = None,
language: str = 'en-US',
sample_rate: int = 16000,
enable_phrase_list: bool = False,
phrase_list: Optional[List[str]] = None,
**kwargs: Any)-
Expand source code
class AzureSTT(BaseSTT): def __init__( self, *, speech_key: Optional[str] = None, speech_region: Optional[str] = None, language: str = "en-US", sample_rate: int = 16000, enable_phrase_list: bool = False, phrase_list: Optional[List[str]] = None, **kwargs: Any, ) -> None: """Initialize the Azure STT plugin. Args: speech_key (Optional[str]): Azure Speech API key. Uses AZURE_SPEECH_KEY environment variable if not provided. speech_region (Optional[str]): Azure Speech region. Uses AZURE_SPEECH_REGION environment variable if not provided. language (str): The language to use for the STT plugin. Defaults to "en-US". sample_rate (int): Sample rate to use for the STT plugin. Defaults to 16000. enable_phrase_list (bool): Whether to enable phrase list for better recognition. Defaults to False. phrase_list (Optional[List[str]]): List of phrases to boost recognition. Defaults to None. """ super().__init__() if not SCIPY_AVAILABLE: raise ImportError( "scipy and numpy are required for Azure STT. Please install with 'pip install scipy numpy'" ) self.speech_key = speech_key or os.getenv("AZURE_SPEECH_KEY") self.speech_region = speech_region or os.getenv("AZURE_SPEECH_REGION") if not self.speech_key or not self.speech_region: raise ValueError( "Azure Speech key and region must be provided either through parameters or " "AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environment variables" ) self.config = AzureSTTConfig( speech_key=self.speech_key, speech_region=self.speech_region, language=language, sample_rate=sample_rate, enable_phrase_list=enable_phrase_list, phrase_list=phrase_list, ) self.input_sample_rate = 48000 self.target_sample_rate = sample_rate self._speech_processor: Optional[speechsdk.SpeechRecognizer] = None self._audio_stream: Optional[speechsdk.audio.PushAudioInputStream] = None self._is_speaking = False self._last_speech_time = 0.0 self._loop = asyncio.get_running_loop() self._event_queue = asyncio.Queue() self._processing_task: Optional[asyncio.Task] = None async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Azure Speech Service""" try: if not self._speech_processor: await self._setup_speech_processor(language) if self._audio_stream and SCIPY_AVAILABLE: audio_data = np.frombuffer(audio_frames, dtype=np.int16) if len(audio_data) > 0: stereo_data = audio_data.reshape(-1, 2) mono_data = stereo_data.mean(axis=1) resampled_data = signal.resample( mono_data, int( len(mono_data) * self.target_sample_rate / self.input_sample_rate ), ) resampled_bytes = resampled_data.astype(np.int16).tobytes() self._audio_stream.write(resampled_bytes) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) await self._cleanup_speech_processor() async def _setup_speech_processor(self, language: Optional[str] = None) -> None: """Setup Azure speech processor""" try: self._processing_task = self._loop.create_task(self._process_events()) speech_config = speechsdk.SpeechConfig( subscription=self.config.speech_key, region=self.config.speech_region ) speech_config.speech_recognition_language = language or self.config.language stream_format = speechsdk.audio.AudioStreamFormat( samples_per_second=self.config.sample_rate, bits_per_sample=16, channels=1, ) self._audio_stream = speechsdk.audio.PushAudioInputStream( stream_format=stream_format ) audio_config = speechsdk.audio.AudioConfig(stream=self._audio_stream) self._speech_processor = speechsdk.SpeechRecognizer( speech_config=speech_config, audio_config=audio_config ) if self.config.enable_phrase_list and self.config.phrase_list: phrase_list_grammar = speechsdk.PhraseListGrammar.from_recognizer( self._speech_processor ) for phrase in self.config.phrase_list: phrase_list_grammar.addPhrase(phrase) self._speech_processor.recognized.connect(self._on_final_transcript) self._speech_processor.recognizing.connect(self._on_interim_transcript) self._speech_processor.speech_start_detected.connect(self._on_user_started_speaking) self._speech_processor.speech_end_detected.connect(self._on_user_stopped_speaking) self._speech_processor.canceled.connect(self._on_speech_processing_error) self._speech_processor.start_continuous_recognition() logger.info("Azure STT speech processor started") except Exception as e: logger.error(f"Failed to setup speech processor: {str(e)}") raise def _on_final_transcript(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: """Handle final recognition results""" text = evt.result.text.strip() if not text: return if self._transcript_callback: response = STTResponse( event_type=SpeechEventType.FINAL, data=SpeechData( text=text, language=self.config.language, confidence=1.0 ), metadata={"provider": "azure", "result_reason": str(evt.result.reason)}, ) self._event_queue.put_nowait(response) def _on_interim_transcript(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: """Handle interim recognition results""" text = evt.result.text.strip() if not text: return if self._transcript_callback: response = STTResponse( event_type=SpeechEventType.INTERIM, data=SpeechData( text=text, language=self.config.language, confidence=0.5 ), metadata={"provider": "azure", "result_reason": str(evt.result.reason)}, ) self._event_queue.put_nowait(response) def _on_user_started_speaking(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: """Handle speech start detection""" if self._is_speaking: return self._is_speaking = True current_time = time.time() if self._last_speech_time == 0.0: self._last_speech_time = current_time else: if current_time - self._last_speech_time < 1.0: global_event_emitter.emit("speech_started") self._last_speech_time = current_time def _on_user_stopped_speaking(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: """Handle speech end detection""" if not self._is_speaking: return self._is_speaking = False global_event_emitter.emit("speech_stopped") def _on_speech_processing_error(self, evt: speechsdk.SpeechRecognitionCanceledEventArgs) -> None: """Handle speech processing errors and cancellations""" if evt.cancellation_details.reason == speechsdk.CancellationReason.Error: error_msg = f"Speech recognition canceled due to error: {evt.cancellation_details.error_details}" logger.error(error_msg) self.emit("error", error_msg) async def _process_events(self) -> None: """Process STT events from the queue""" while True: try: response = await self._event_queue.get() if self._transcript_callback: await self._transcript_callback(response) except asyncio.CancelledError: break except Exception as e: logger.error("Error processing STT event: %s", str(e), exc_info=True) async def _cleanup_speech_processor(self) -> None: """Cleanup speech processor resources""" try: if self._speech_processor: self._speech_processor.stop_continuous_recognition() self._speech_processor = None if self._audio_stream: self._audio_stream.close() self._audio_stream = None except Exception as e: logger.error(f"Error during speech processor cleanup: {str(e)}") async def aclose(self) -> None: """Cleanup resources""" if self._processing_task: self._processing_task.cancel() await asyncio.gather(self._processing_task, return_exceptions=True) await self._cleanup_speech_processor() logger.info("Azure STT closed") await super().aclose()Base class for Speech-to-Text implementations
Initialize the Azure STT plugin.
Args
speech_key:Optional[str]- Azure Speech API key. Uses AZURE_SPEECH_KEY environment variable if not provided.
speech_region:Optional[str]- Azure Speech region. Uses AZURE_SPEECH_REGION environment variable if not provided.
language:str- The language to use for the STT plugin. Defaults to "en-US".
sample_rate:int- Sample rate to use for the STT plugin. Defaults to 16000.
enable_phrase_list:bool- Whether to enable phrase list for better recognition. Defaults to False.
phrase_list:Optional[List[str]]- List of phrases to boost recognition. Defaults to None.
Ancestors
- videosdk.agents.stt.stt.STT
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cleanup resources""" if self._processing_task: self._processing_task.cancel() await asyncio.gather(self._processing_task, return_exceptions=True) await self._cleanup_speech_processor() logger.info("Azure STT closed") await super().aclose()Cleanup resources
async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None-
Expand source code
async def process_audio( self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any ) -> None: """Process audio frames and send to Azure Speech Service""" try: if not self._speech_processor: await self._setup_speech_processor(language) if self._audio_stream and SCIPY_AVAILABLE: audio_data = np.frombuffer(audio_frames, dtype=np.int16) if len(audio_data) > 0: stereo_data = audio_data.reshape(-1, 2) mono_data = stereo_data.mean(axis=1) resampled_data = signal.resample( mono_data, int( len(mono_data) * self.target_sample_rate / self.input_sample_rate ), ) resampled_bytes = resampled_data.astype(np.int16).tobytes() self._audio_stream.write(resampled_bytes) except Exception as e: logger.error(f"Error in process_audio: {str(e)}") self.emit("error", str(e)) await self._cleanup_speech_processor()Process audio frames and send to Azure Speech Service
class AzureTTS (*,
voice: str = 'en-US-EmmaNeural',
language: str | None = None,
tuning: Optional[VoiceTuning] = None,
style: Optional[SpeakingStyle] = None,
speech_key: str | None = None,
speech_region: str | None = None,
speech_endpoint: str | None = None,
deployment_id: str | None = None,
speech_auth_token: str | None = None,
**kwargs: Any)-
Expand source code
class AzureTTS(TTS): """ Initialize the Azure TTS plugin. Args: voice (str): Name of the Azure neural voice to use (default: "en-US-EmmaNeural"). For a full list of available voices, see: https://eastus2.tts.speech.microsoft.com/cognitiveservices/voices/list (Requires: curl --location --request GET with header 'Ocp-Apim-Subscription-Key') language (str, optional): Language code for the voice (e.g., "en-US"). If not provided, defaults to the voice's language. tuning (VoiceTuning, optional): VoiceTuning object to control speech rate, volume, and pitch. style (SpeakingStyle, optional): SpeakingStyle object for expressive speech synthesis. speech_key (str, optional): Azure Speech API key. If not provided, uses the AZURE_SPEECH_KEY environment variable. speech_region (str, optional): Azure Speech region. If not provided, uses the AZURE_SPEECH_REGION environment variable. speech_endpoint (str, optional): Custom endpoint URL. If not provided, uses the AZURE_SPEECH_ENDPOINT environment variable. deployment_id (str, optional): Custom deployment ID for model deployment scenarios. speech_auth_token (str, optional): Azure Speech authorization token for token-based authentication. """ FIXED_SAMPLE_RATE = 24000 AZURE_OUTPUT_FORMAT = "raw-24khz-16bit-mono-pcm" def __init__( self, *, voice: str = "en-US-EmmaNeural", language: str | None = None, tuning: Optional[VoiceTuning] = None, style: Optional[SpeakingStyle] = None, speech_key: str | None = None, speech_region: str | None = None, speech_endpoint: str | None = None, deployment_id: str | None = None, speech_auth_token: str | None = None, **kwargs: Any, ) -> None: super().__init__( sample_rate=self.FIXED_SAMPLE_RATE, num_channels=1, ) self.speech_key = speech_key or os.environ.get("AZURE_SPEECH_KEY") self.speech_region = speech_region or os.environ.get("AZURE_SPEECH_REGION") self.speech_endpoint = speech_endpoint or os.environ.get( "AZURE_SPEECH_ENDPOINT" ) self.speech_auth_token = speech_auth_token self.deployment_id = deployment_id has_endpoint = bool(self.speech_endpoint) has_key_and_region = bool(self.speech_key and self.speech_region) has_token_and_region = bool(self.speech_auth_token and self.speech_region) if not (has_endpoint or has_key_and_region or has_token_and_region): raise ValueError( "Authentication requires one of: speech_endpoint, (speech_key & speech_region), or (speech_auth_token & speech_region)." ) self.voice = voice self.language = language self.tuning = tuning self.style = style self._first_chunk_sent = False self._interrupted = False self._http_client: Optional[httpx.AsyncClient] = None def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = False def _get_endpoint_url(self) -> str: if self.speech_endpoint: base = self.speech_endpoint.rstrip("/") if not base.endswith("/cognitiveservices/v1"): base = f"{base}/cognitiveservices/v1" else: base = f"https://{self.speech_region}.tts.speech.microsoft.com/cognitiveservices/v1" if self.deployment_id: return f"{base}?deploymentId={self.deployment_id}" return base def _get_http_client(self) -> httpx.AsyncClient: if not self._http_client: self._http_client = httpx.AsyncClient( timeout=httpx.Timeout( connect=15.0, read=30.0, write=5.0, pool=5.0 ), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ) return self._http_client async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._interrupted: break await self._synthesize_segment(segment, voice_id, **kwargs) else: if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) except Exception as e: logger.error("Azure TTS synthesis failed: %s", str(e), exc_info=True) self.emit("error", f"Azure TTS synthesis failed: {str(e)}") async def _synthesize_segment( self, text: str, voice_id: Optional[str] = None, **kwargs: Any ) -> None: if not text.strip() or self._interrupted: return try: headers = { "Content-Type": "application/ssml+xml", "X-Microsoft-OutputFormat": self.AZURE_OUTPUT_FORMAT, "User-Agent": "VideoSDK Agents", } if self.speech_auth_token: headers["Authorization"] = f"Bearer {self.speech_auth_token}" elif self.speech_key: headers["Ocp-Apim-Subscription-Key"] = self.speech_key ssml_data = self._build_ssml(text, voice_id or self.voice) response = await self._get_http_client().post( url=self._get_endpoint_url(), headers=headers, content=ssml_data, ) response.raise_for_status() audio_data = b"" async for chunk in response.aiter_bytes(chunk_size=8192): if self._interrupted: break if chunk: audio_data += chunk if audio_data and not self._interrupted: await self._stream_audio_chunks(audio_data) except httpx.TimeoutException: logger.error("Azure TTS request timeout") self.emit("error", "Azure TTS request timeout") except httpx.HTTPStatusError as e: logger.error("Azure TTS HTTP error: %s - %s", e.response.status_code, e.response.text) self.emit("error", f"Azure TTS HTTP error: {e.response.status_code} - {e.response.text}") except Exception as e: if not self._interrupted: logger.error("Azure TTS synthesis failed: %s", str(e), exc_info=True) self.emit("error", f"Azure TTS synthesis failed: {str(e)}") def _build_ssml(self, text: str, voice: str) -> str: lang = self.language or "en-US" ssml = ( f'<speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis" ' f'xmlns:mstts="http://www.w3.org/2001/mstts" xml:lang="{lang}">' ) ssml += f'<voice name="{voice}">' if self.style: degree = f' styledegree="{self.style.degree}"' if self.style.degree else "" ssml += f'<mstts:express-as style="{self.style.style}"{degree}>' if self.tuning: t = self.tuning rate_attr = f' rate="{t.rate}"' if t.rate is not None else "" vol_attr = f' volume="{t.volume}"' if t.volume is not None else "" pitch_attr = f' pitch="{t.pitch}"' if t.pitch is not None else "" ssml += f"<prosody{rate_attr}{vol_attr}{pitch_attr}>{text}</prosody>" else: ssml += text if self.style: ssml += "</mstts:express-as>" ssml += "</voice></speak>" return ssml async def _stream_audio_chunks(self, audio_bytes: bytes) -> None: chunk_size = int(self.FIXED_SAMPLE_RATE * 2 * 20 / 1000) for i in range(0, len(audio_bytes), chunk_size): if self._interrupted: break chunk = audio_bytes[i : i + chunk_size] if len(chunk) < chunk_size and len(chunk) > 0: padding_needed = chunk_size - len(chunk) chunk += b"\x00" * padding_needed if len(chunk) == chunk_size: if not self._first_chunk_sent and self._first_audio_callback: self._first_chunk_sent = True await self._first_audio_callback() if self.audio_track: asyncio.create_task(self.audio_track.add_new_bytes(chunk)) await asyncio.sleep(0.001) async def interrupt(self) -> None: self._interrupted = True if self.audio_track: self.audio_track.interrupt() async def aclose(self) -> None: if self._http_client: await self._http_client.aclose() self._http_client = None await super().aclose()Initialize the Azure TTS plugin.
Args
voice:str- Name of the Azure neural voice to use (default: "en-US-EmmaNeural"). For a full list of available voices, see: https://eastus2.tts.speech.microsoft.com/cognitiveservices/voices/list (Requires: curl –location –request GET with header 'Ocp-Apim-Subscription-Key')
language:str, optional- Language code for the voice (e.g., "en-US"). If not provided, defaults to the voice's language.
tuning:VoiceTuning, optional- VoiceTuning object to control speech rate, volume, and pitch.
style:SpeakingStyle, optional- SpeakingStyle object for expressive speech synthesis.
speech_key:str, optional- Azure Speech API key. If not provided, uses the AZURE_SPEECH_KEY environment variable.
speech_region:str, optional- Azure Speech region. If not provided, uses the AZURE_SPEECH_REGION environment variable.
speech_endpoint:str, optional- Custom endpoint URL. If not provided, uses the AZURE_SPEECH_ENDPOINT environment variable.
deployment_id:str, optional- Custom deployment ID for model deployment scenarios.
speech_auth_token:str, optional- Azure Speech authorization token for token-based authentication.
Ancestors
- videosdk.agents.tts.tts.TTS
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
Class variables
var AZURE_OUTPUT_FORMATvar FIXED_SAMPLE_RATE
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._http_client: await self._http_client.aclose() self._http_client = None await super().aclose()Cleanup resources
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: self._interrupted = True if self.audio_track: self.audio_track.interrupt()Interrupt the TTS process
def reset_first_audio_tracking(self) ‑> None-
Expand source code
def reset_first_audio_tracking(self) -> None: self._first_chunk_sent = FalseReset the first audio tracking state for next TTS task
async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None-
Expand source code
async def synthesize( self, text: AsyncIterator[str] | str, voice_id: Optional[str] = None, **kwargs: Any, ) -> None: try: if not self.audio_track or not self.loop: self.emit("error", "Audio track or event loop not set") return self._interrupted = False if isinstance(text, AsyncIterator): async for segment in segment_text(text): if self._interrupted: break await self._synthesize_segment(segment, voice_id, **kwargs) else: if not self._interrupted: await self._synthesize_segment(text, voice_id, **kwargs) except Exception as e: logger.error("Azure TTS synthesis failed: %s", str(e), exc_info=True) self.emit("error", f"Azure TTS synthesis failed: {str(e)}")Convert text to speech
Args
text- Text to convert to speech (either string or async iterator of strings)
voice_id- Optional voice identifier
**kwargs- Additional provider-specific arguments
Returns
None
class AzureVoiceLive (*,
api_key: str | None = None,
endpoint: str | None = None,
model: str,
config: AzureVoiceLiveConfig | None = None,
credential: Union[AzureKeyCredential, TokenCredential] | None = None)-
Expand source code
class AzureVoiceLive(RealtimeBaseModel[AzureVoiceLiveEventTypes]): """Azure Voice Live realtime model implementation""" def __init__( self, *, api_key: str | None = None, endpoint: str | None = None, model: str, config: AzureVoiceLiveConfig | None = None, credential: Union[AzureKeyCredential, TokenCredential] | None = None, ) -> None: """ Initialize Azure Voice Live realtime model. Args: api_key: Azure Voice Live API key. If not provided, will attempt to read from AZURE_VOICE_LIVE_API_KEY env var endpoint: Azure Voice Live endpoint. If not provided, will attempt to read from AZURE_VOICE_LIVE_ENDPOINT env var model: The model identifier to use (e.g., 'gpt-4o-realtime-preview') config: Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output (Azure or OpenAI voices) - modalities: List of enabled response types [TEXT, AUDIO] - turn_detection: Voice activity detection settings - temperature: Response randomness control credential: Azure credential object. If provided, takes precedence over api_key Raises: ValueError: If no API key or credential is provided and none found in environment variables """ super().__init__() self.model = model self.endpoint = endpoint or os.getenv( "AZURE_VOICE_LIVE_ENDPOINT", "wss://api.voicelive.com/v1" ) if credential: self.credential = credential elif api_key: self.credential = AzureKeyCredential(api_key) else: env_api_key = os.getenv("AZURE_VOICE_LIVE_API_KEY") if env_api_key: self.credential = AzureKeyCredential(env_api_key) else: try: self.credential = DefaultAzureCredential() except Exception: self.emit("error", "Azure Voice Live credentials required") raise ValueError( "Azure Voice Live credentials required. Provide api_key, credential, or set AZURE_VOICE_LIVE_API_KEY environment variable" ) self._session: Optional[AzureVoiceLiveSession] = None self._closing = False self._instructions: str = ( "You are a helpful voice assistant that can answer questions and help with tasks." ) self.loop = None self.audio_track: Optional[CustomAudioStreamTrack] = None self.config: AzureVoiceLiveConfig = config or AzureVoiceLiveConfig() self.input_sample_rate = VIDEOSDK_INPUT_SAMPLE_RATE self.target_sample_rate = AZURE_VOICE_LIVE_SAMPLE_RATE self._agent_speaking = False self._user_speaking = False self.session_ready = False self._session_ready_event = asyncio.Event() def set_agent(self, agent: Agent) -> None: """Set the agent configuration""" self._instructions = agent.instructions async def connect(self) -> None: """Connect to Azure Voice Live API""" if self._session: await self._cleanup_session(self._session) self._session = None self._closing = False try: if ( not self.audio_track and self.loop and Modality.AUDIO in self.config.modalities ): self.audio_track = CustomAudioStreamTrack(self.loop) elif not self.loop and Modality.AUDIO in self.config.modalities: self.emit( "error", "Event loop not initialized. Audio playback will not work." ) raise RuntimeError( "Event loop not initialized. Audio playback will not work." ) session = await self._create_session() if session: self._session = session if self._session: asyncio.create_task( self._process_events(), name="azure-voice-live-events" ) try: logger.info("Waiting for Azure Voice Live session to be ready...") await asyncio.wait_for( self._session_ready_event.wait(), timeout=10.0 ) logger.info("Azure Voice Live session is ready.") except asyncio.TimeoutError: self.emit("error", "Azure Voice Live session ready timeout") raise RuntimeError( "Azure Voice Live session did not become ready in time" ) except Exception as e: self.emit("error", f"Error connecting to Azure Voice Live API: {e}") traceback.print_exc() raise async def _create_session(self) -> AzureVoiceLiveSession: """Create a new Azure Voice Live session""" try: connection_cm = connect( endpoint=self.endpoint, credential=self.credential, model=self.model, connection_options={ "max_msg_size": 10 * 1024 * 1024, "heartbeat": 20, "timeout": 20, }, ) connection = await connection_cm.__aenter__() await self._setup_session(connection) return AzureVoiceLiveSession( connection=connection, session_id=None, tasks=[] ) except Exception as e: self.emit("error", f"Failed to create Azure Voice Live session: {e}") traceback.print_exc() raise async def _setup_session(self, connection) -> None: """Configure the Azure Voice Live session""" logger.info("Setting up Azure Voice Live session...") voice_config: Union[AzureStandardVoice, str] if ( self.config.voice.startswith("en-US-") or self.config.voice.startswith("en-CA-") or "-" in self.config.voice ): voice_config = AzureStandardVoice( name=self.config.voice, type="azure-standard" ) else: voice_config = self.config.voice turn_detection_config = ServerVad( threshold=self.config.turn_detection_threshold, prefix_padding_ms=self.config.turn_detection_prefix_padding_ms, silence_duration_ms=self.config.turn_detection_silence_duration_ms, ) session_config = RequestSession( modalities=self.config.modalities, instructions=self._instructions, voice=voice_config, input_audio_format=self.config.input_audio_format, output_audio_format=self.config.output_audio_format, turn_detection=turn_detection_config, ) if self.config.temperature is not None: session_config.temperature = self.config.temperature if self.config.max_completion_tokens is not None: session_config.max_completion_tokens = self.config.max_completion_tokens await connection.session.update(session=session_config) logger.info("Azure Voice Live session configuration sent") async def _process_events(self) -> None: """Process events from the Azure Voice Live connection""" try: if not self._session or not self._session.connection: return async for event in self._session.connection: if self._closing: break await self._handle_event(event) except asyncio.CancelledError: logger.info("Event processing cancelled") except Exception as e: self.emit("error", f"Error processing events: {e}") traceback.print_exc() async def _handle_event(self, event) -> None: """Handle different types of events from Azure Voice Live""" try: logger.debug(f"Received event: {event.type}") if event.type == ServerEventType.SESSION_UPDATED: logger.info(f"Session ready: {event.session.id}") if self._session: self._session.session_id = event.session.id self.session_ready = True self._session_ready_event.set() elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: logger.info("User started speaking") if not self._user_speaking: await realtime_metrics_collector.set_user_speech_start() self._user_speaking = True self.emit("user_speech_started", {"type": "done"}) if self.audio_track and Modality.AUDIO in self.config.modalities: self.audio_track.interrupt() if self._session and self._session.connection: try: await self._session.connection.response.cancel() except Exception as e: logger.debug(f"No response to cancel: {e}") elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: logger.info("User stopped speaking") if self._user_speaking: await realtime_metrics_collector.set_user_speech_end() self._user_speaking = False elif event.type == ServerEventType.RESPONSE_CREATED: logger.info("Assistant response created") elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: logger.debug("Received audio delta") if Modality.AUDIO in self.config.modalities: if not self._agent_speaking: await realtime_metrics_collector.set_agent_speech_start() self._agent_speaking = True if self.audio_track and self.loop: asyncio.create_task(self.audio_track.add_new_bytes(event.delta)) elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: logger.info("Assistant finished speaking") if self._agent_speaking: await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False elif event.type == ServerEventType.RESPONSE_TEXT_DELTA: if hasattr(self, "_current_text_response"): self._current_text_response += event.delta else: self._current_text_response = event.delta elif event.type == ServerEventType.RESPONSE_TEXT_DONE: if hasattr(self, "_current_text_response"): global_event_emitter.emit( "text_response", {"text": self._current_text_response, "type": "done"}, ) await realtime_metrics_collector.set_agent_response( self._current_text_response ) try: self.emit( "realtime_model_transcription", { "role": "agent", "text": self._current_text_response, "is_final": True, }, ) except Exception: pass self._current_text_response = "" elif event.type == ServerEventType.RESPONSE_DONE: logger.info("Response complete") if self._agent_speaking: await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False elif event.type == ServerEventType.ERROR: logger.error(f"Azure Voice Live error: {event.error.message}") self.emit("error", f"Azure Voice Live error: {event.error.message}") elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: logger.debug(f"Conversation item created: {event.item.id}") if ( hasattr(event.item, "content") and event.item.content and hasattr(event.item.content[0], "transcript") ): transcript = event.item.content[0].transcript if transcript and event.item.role == "user": await realtime_metrics_collector.set_user_transcript(transcript) try: self.emit( "realtime_model_transcription", {"role": "user", "text": transcript, "is_final": True}, ) except Exception: pass else: logger.debug(f"Unhandled event type: {event.type}") except Exception as e: self.emit("error", f"Error handling event {event.type}: {e}") traceback.print_exc() async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if not self._session or self._closing or not self.session_ready: return if Modality.AUDIO not in self.config.modalities: return try: audio_array = np.frombuffer(audio_data, dtype=np.int16) if len(audio_array) % 2 == 0: audio_array = audio_array.reshape(-1, 2) audio_array = np.mean(audio_array, axis=1).astype(np.int16) target_length = int( len(audio_array) * self.target_sample_rate / self.input_sample_rate ) resampled_float = signal.resample( audio_array.astype(np.float32), target_length ) resampled_int16 = np.clip(resampled_float, -32768, 32767).astype(np.int16) resampled_bytes = resampled_int16.tobytes() encoded_audio = base64.b64encode(resampled_bytes).decode("utf-8") await self._session.connection.input_audio_buffer.append( audio=encoded_audio ) except Exception as e: self.emit("error", f"Error processing audio input: {e}") async def interrupt(self) -> None: """Interrupt current response""" if not self._session or self._closing: return try: if self._session.connection: await self._session.connection.response.cancel() if self.audio_track and Modality.AUDIO in self.config.modalities: self.audio_track.interrupt() await realtime_metrics_collector.set_interrupted() if self._agent_speaking: await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False except Exception as e: self.emit("error", f"Interrupt error: {e}") async def send_message(self, message: str) -> None: """Send a text message to get audio response""" retry_count = 0 max_retries = 5 while not self._session or not self.session_ready: if retry_count >= max_retries: raise RuntimeError( "No active Azure Voice Live session after maximum retries" ) logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.connection.conversation.item.create( item={ "type": "message", "role": "assistant", "content": [ { "type": "text", "text": f"Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]: {message}", } ], } ) await self._session.connection.response.create() except Exception as e: self.emit("error", f"Error sending message: {e}") async def send_text_message(self, message: str) -> None: """Send a text message for text-only communication""" retry_count = 0 max_retries = 5 while not self._session or not self.session_ready: if retry_count >= max_retries: raise RuntimeError( "No active Azure Voice Live session after maximum retries" ) logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.connection.conversation.item.create( item={ "type": "message", "role": "user", "content": [{"type": "input_text", "text": message}], } ) await self._session.connection.response.create() except Exception as e: self.emit("error", f"Error sending text message: {e}") async def _cleanup_session(self, session: AzureVoiceLiveSession) -> None: """Clean up session resources""" for task in session.tasks: if not task.done(): task.cancel() try: if session.connection: if hasattr(session.connection, "close"): await session.connection.close() except Exception as e: self.emit("error", f"Error closing session: {e}") async def aclose(self) -> None: """Clean up all resources""" if self._closing: return self._closing = True if self._session: await self._cleanup_session(self._session) self._session = None if hasattr(self.audio_track, "cleanup") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: self.emit("error", f"Error cleaning up audio track: {e}")Azure Voice Live realtime model implementation
Initialize Azure Voice Live realtime model.
Args
api_key- Azure Voice Live API key. If not provided, will attempt to read from AZURE_VOICE_LIVE_API_KEY env var
endpoint- Azure Voice Live endpoint. If not provided, will attempt to read from AZURE_VOICE_LIVE_ENDPOINT env var
model- The model identifier to use (e.g., 'gpt-4o-realtime-preview')
config- Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output (Azure or OpenAI voices) - modalities: List of enabled response types [TEXT, AUDIO] - turn_detection: Voice activity detection settings - temperature: Response randomness control
credential- Azure credential object. If provided, takes precedence over api_key
Raises
ValueError- If no API key or credential is provided and none found in environment variables
Ancestors
- videosdk.agents.realtime_base_model.RealtimeBaseModel
- videosdk.agents.event_emitter.EventEmitter
- typing.Generic
- abc.ABC
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Clean up all resources""" if self._closing: return self._closing = True if self._session: await self._cleanup_session(self._session) self._session = None if hasattr(self.audio_track, "cleanup") and self.audio_track: try: await self.audio_track.cleanup() except Exception as e: self.emit("error", f"Error cleaning up audio track: {e}")Clean up all resources
async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Connect to Azure Voice Live API""" if self._session: await self._cleanup_session(self._session) self._session = None self._closing = False try: if ( not self.audio_track and self.loop and Modality.AUDIO in self.config.modalities ): self.audio_track = CustomAudioStreamTrack(self.loop) elif not self.loop and Modality.AUDIO in self.config.modalities: self.emit( "error", "Event loop not initialized. Audio playback will not work." ) raise RuntimeError( "Event loop not initialized. Audio playback will not work." ) session = await self._create_session() if session: self._session = session if self._session: asyncio.create_task( self._process_events(), name="azure-voice-live-events" ) try: logger.info("Waiting for Azure Voice Live session to be ready...") await asyncio.wait_for( self._session_ready_event.wait(), timeout=10.0 ) logger.info("Azure Voice Live session is ready.") except asyncio.TimeoutError: self.emit("error", "Azure Voice Live session ready timeout") raise RuntimeError( "Azure Voice Live session did not become ready in time" ) except Exception as e: self.emit("error", f"Error connecting to Azure Voice Live API: {e}") traceback.print_exc() raiseConnect to Azure Voice Live API
async def handle_audio_input(self, audio_data: bytes) ‑> None-
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None: """Handle incoming audio data from the user""" if not self._session or self._closing or not self.session_ready: return if Modality.AUDIO not in self.config.modalities: return try: audio_array = np.frombuffer(audio_data, dtype=np.int16) if len(audio_array) % 2 == 0: audio_array = audio_array.reshape(-1, 2) audio_array = np.mean(audio_array, axis=1).astype(np.int16) target_length = int( len(audio_array) * self.target_sample_rate / self.input_sample_rate ) resampled_float = signal.resample( audio_array.astype(np.float32), target_length ) resampled_int16 = np.clip(resampled_float, -32768, 32767).astype(np.int16) resampled_bytes = resampled_int16.tobytes() encoded_audio = base64.b64encode(resampled_bytes).decode("utf-8") await self._session.connection.input_audio_buffer.append( audio=encoded_audio ) except Exception as e: self.emit("error", f"Error processing audio input: {e}")Handle incoming audio data from the user
async def interrupt(self) ‑> None-
Expand source code
async def interrupt(self) -> None: """Interrupt current response""" if not self._session or self._closing: return try: if self._session.connection: await self._session.connection.response.cancel() if self.audio_track and Modality.AUDIO in self.config.modalities: self.audio_track.interrupt() await realtime_metrics_collector.set_interrupted() if self._agent_speaking: await realtime_metrics_collector.set_agent_speech_end(timeout=1.0) self._agent_speaking = False except Exception as e: self.emit("error", f"Interrupt error: {e}")Interrupt current response
async def send_message(self, message: str) ‑> None-
Expand source code
async def send_message(self, message: str) -> None: """Send a text message to get audio response""" retry_count = 0 max_retries = 5 while not self._session or not self.session_ready: if retry_count >= max_retries: raise RuntimeError( "No active Azure Voice Live session after maximum retries" ) logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.connection.conversation.item.create( item={ "type": "message", "role": "assistant", "content": [ { "type": "text", "text": f"Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]: {message}", } ], } ) await self._session.connection.response.create() except Exception as e: self.emit("error", f"Error sending message: {e}")Send a text message to get audio response
async def send_text_message(self, message: str) ‑> None-
Expand source code
async def send_text_message(self, message: str) -> None: """Send a text message for text-only communication""" retry_count = 0 max_retries = 5 while not self._session or not self.session_ready: if retry_count >= max_retries: raise RuntimeError( "No active Azure Voice Live session after maximum retries" ) logger.debug("No active session, waiting for connection...") await asyncio.sleep(1) retry_count += 1 try: await self._session.connection.conversation.item.create( item={ "type": "message", "role": "user", "content": [{"type": "input_text", "text": message}], } ) await self._session.connection.response.create() except Exception as e: self.emit("error", f"Error sending text message: {e}")Send a text message for text-only communication
def set_agent(self, agent: Agent) ‑> None-
Expand source code
def set_agent(self, agent: Agent) -> None: """Set the agent configuration""" self._instructions = agent.instructionsSet the agent configuration
class AzureVoiceLiveConfig (voice: str = 'en-US-AvaNeural',
modalities: List[Modality] = <factory>,
input_audio_format: InputAudioFormat = InputAudioFormat.PCM16,
output_audio_format: InputAudioFormat = InputAudioFormat.PCM16,
turn_detection_threshold: float = 0.5,
turn_detection_prefix_padding_ms: int = 300,
turn_detection_silence_duration_ms: int = 500,
temperature: Optional[float] = None,
max_completion_tokens: Optional[int] = None)-
Expand source code
@dataclass class AzureVoiceLiveConfig: """Configuration for Azure Voice Live API (Beta) Args: voice: Voice ID for audio output. Can be Azure voice (e.g., 'en-US-AvaNeural') or OpenAI voice ('alloy', 'echo', etc.). Default is 'en-US-AvaNeural' modalities: List of enabled response types. Options: [Modality.TEXT, Modality.AUDIO]. Default includes both input_audio_format: Audio format for input. Default is AudioFormat.PCM16 output_audio_format: Audio format for output. Default is AudioFormat.PCM16 turn_detection_threshold: Voice activity detection threshold (0.0-1.0). Default is 0.5 turn_detection_prefix_padding_ms: Padding before speech start (ms). Default is 300 turn_detection_silence_duration_ms: Silence duration to mark end (ms). Default is 500 temperature: Controls randomness in response generation. Higher values make output more random. Default is None max_completion_tokens: Maximum number of tokens in response. Default is None """ voice: str = "en-US-AvaNeural" modalities: List[Modality] = field( default_factory=lambda: [Modality.TEXT, Modality.AUDIO] ) input_audio_format: InputAudioFormat = InputAudioFormat.PCM16 output_audio_format: InputAudioFormat = InputAudioFormat.PCM16 turn_detection_threshold: float = 0.5 turn_detection_prefix_padding_ms: int = 300 turn_detection_silence_duration_ms: int = 500 temperature: Optional[float] = None max_completion_tokens: Optional[int] = NoneConfiguration for Azure Voice Live API (Beta)
Args
voice- Voice ID for audio output. Can be Azure voice (e.g., 'en-US-AvaNeural') or OpenAI voice ('alloy', 'echo', etc.). Default is 'en-US-AvaNeural'
modalities- List of enabled response types. Options: [Modality.TEXT, Modality.AUDIO]. Default includes both
input_audio_format- Audio format for input. Default is AudioFormat.PCM16
output_audio_format- Audio format for output. Default is AudioFormat.PCM16
turn_detection_threshold- Voice activity detection threshold (0.0-1.0). Default is 0.5
turn_detection_prefix_padding_ms- Padding before speech start (ms). Default is 300
turn_detection_silence_duration_ms- Silence duration to mark end (ms). Default is 500
temperature- Controls randomness in response generation. Higher values make output more random. Default is None
max_completion_tokens- Maximum number of tokens in response. Default is None
Instance variables
var input_audio_format : azure.ai.voicelive.models._enums.InputAudioFormatvar max_completion_tokens : int | Nonevar modalities : List[azure.ai.voicelive.models._enums.Modality]var output_audio_format : azure.ai.voicelive.models._enums.InputAudioFormatvar temperature : float | Nonevar turn_detection_prefix_padding_ms : intvar turn_detection_silence_duration_ms : intvar turn_detection_threshold : floatvar voice : str
class SpeakingStyle (style: str, degree: float | None = None)-
Expand source code
@dataclass class SpeakingStyle: """Configuration for speech expressive style.""" style: str _degree: float | None = None @property def degree(self): return self._degree @degree.setter def degree(self, value: float | None): if value is not None and not 0.1 <= value <= 2.0: raise ValueError("Style degree must be between 0.1 and 2.0") self._degree = value def __init__(self, style: str, degree: float | None = None): self.style = style self.degree = degreeConfiguration for speech expressive style.
Instance variables
prop degree-
Expand source code
@property def degree(self): return self._degree var style : str
class VoiceTuning (rate=None, volume=None, pitch=None)-
Expand source code
@dataclass class VoiceTuning: """Configuration for speech tuning (rate, volume, pitch).""" _rate: Literal["x-slow", "slow", "medium", "fast", "x-fast"] | float | None = None _volume: Literal["silent", "x-soft", "soft", "medium", "loud", "x-loud"] | float | None = None _pitch: Literal["x-low", "low", "medium", "high", "x-high"] | None = None @property def rate(self): return self._rate @rate.setter def rate(self, value): if value: if isinstance(value, float) and not 0.5 <= value <= 2.0: raise ValueError("Rate must be a float between 0.5 and 2.0") if isinstance(value, str) and value not in ["x-slow", "slow", "medium", "fast", "x-fast"]: raise ValueError("Rate must be one of 'x-slow', 'slow', 'medium', 'fast', 'x-fast'") self._rate = value @property def volume(self): return self._volume @volume.setter def volume(self, value): if value: if isinstance(value, float) and not 0 <= value <= 100.0: raise ValueError("Volume must be a float between 0 and 100") if isinstance(value, str) and value not in ["silent", "x-soft", "soft", "medium", "loud", "x-loud"]: raise ValueError("Volume must be one of 'silent', 'x-soft', 'soft', 'medium', 'loud', 'x-loud'") self._volume = value @property def pitch(self): return self._pitch @pitch.setter def pitch(self, value): if value and value not in ["x-low", "low", "medium", "high", "x-high"]: raise ValueError("Pitch must be one of 'x-low', 'low', 'medium', 'high', 'x-high'") self._pitch = value def __init__(self, rate=None, volume=None, pitch=None): self.rate = rate self.volume = volume self.pitch = pitchConfiguration for speech tuning (rate, volume, pitch).
Instance variables
prop pitch-
Expand source code
@property def pitch(self): return self._pitch prop rate-
Expand source code
@property def rate(self): return self._rate prop volume-
Expand source code
@property def volume(self): return self._volume