Package videosdk.plugins.azure

Sub-modules

videosdk.plugins.azure.stt
videosdk.plugins.azure.tts
videosdk.plugins.azure.voice_live

Classes

class AzureSTT (*,
speech_key: Optional[str] = None,
speech_region: Optional[str] = None,
language: str = 'en-US',
sample_rate: int = 16000,
enable_phrase_list: bool = False,
phrase_list: Optional[List[str]] = None,
**kwargs: Any)
Expand source code
class AzureSTT(BaseSTT):
    def __init__(
        self,
        *,
        speech_key: Optional[str] = None,
        speech_region: Optional[str] = None,
        language: str = "en-US",
        sample_rate: int = 16000,
        enable_phrase_list: bool = False,
        phrase_list: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize the Azure STT plugin.

        Args:
            speech_key (Optional[str]): Azure Speech API key. Uses AZURE_SPEECH_KEY environment variable if not provided.
            speech_region (Optional[str]): Azure Speech region. Uses AZURE_SPEECH_REGION environment variable if not provided.
            language (str): The language to use for the STT plugin. Defaults to "en-US".
            sample_rate (int): Sample rate to use for the STT plugin. Defaults to 16000.
            enable_phrase_list (bool): Whether to enable phrase list for better recognition. Defaults to False.
            phrase_list (Optional[List[str]]): List of phrases to boost recognition. Defaults to None.
        """
        super().__init__()

        if not SCIPY_AVAILABLE:
            raise ImportError(
                "scipy and numpy are required for Azure STT. Please install with 'pip install scipy numpy'"
            )

        self.speech_key = speech_key or os.getenv("AZURE_SPEECH_KEY")
        self.speech_region = speech_region or os.getenv("AZURE_SPEECH_REGION")

        if not self.speech_key or not self.speech_region:
            raise ValueError(
                "Azure Speech key and region must be provided either through parameters or "
                "AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environment variables"
            )

        self.config = AzureSTTConfig(
            speech_key=self.speech_key,
            speech_region=self.speech_region,
            language=language,
            sample_rate=sample_rate,
            enable_phrase_list=enable_phrase_list,
            phrase_list=phrase_list,
        )

        self.input_sample_rate = 48000
        self.target_sample_rate = sample_rate

        self._speech_processor: Optional[speechsdk.SpeechRecognizer] = None
        self._audio_stream: Optional[speechsdk.audio.PushAudioInputStream] = None
        self._is_speaking = False
        self._last_speech_time = 0.0

        self._loop = asyncio.get_running_loop()
        self._event_queue = asyncio.Queue()
        self._processing_task: Optional[asyncio.Task] = None

    async def process_audio(
        self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any
    ) -> None:
        """Process audio frames and send to Azure Speech Service"""
        try:
            if not self._speech_processor:
                await self._setup_speech_processor(language)

            if self._audio_stream and SCIPY_AVAILABLE:
                audio_data = np.frombuffer(audio_frames, dtype=np.int16)

                if len(audio_data) > 0:
                    stereo_data = audio_data.reshape(-1, 2)
                    mono_data = stereo_data.mean(axis=1)

                    resampled_data = signal.resample(
                        mono_data,
                        int(
                            len(mono_data)
                            * self.target_sample_rate
                            / self.input_sample_rate
                        ),
                    )
                    resampled_bytes = resampled_data.astype(np.int16).tobytes()
                    self._audio_stream.write(resampled_bytes)

        except Exception as e:
            logger.error(f"Error in process_audio: {str(e)}")
            self.emit("error", str(e))
            await self._cleanup_speech_processor()

    async def _setup_speech_processor(self, language: Optional[str] = None) -> None:
        """Setup Azure speech processor"""
        try:
            self._processing_task = self._loop.create_task(self._process_events())

            speech_config = speechsdk.SpeechConfig(
                subscription=self.config.speech_key, region=self.config.speech_region
            )
            speech_config.speech_recognition_language = language or self.config.language

            stream_format = speechsdk.audio.AudioStreamFormat(
                samples_per_second=self.config.sample_rate,
                bits_per_sample=16,
                channels=1,
            )
            self._audio_stream = speechsdk.audio.PushAudioInputStream(
                stream_format=stream_format
            )

            audio_config = speechsdk.audio.AudioConfig(stream=self._audio_stream)

            self._speech_processor = speechsdk.SpeechRecognizer(
                speech_config=speech_config, audio_config=audio_config
            )

            if self.config.enable_phrase_list and self.config.phrase_list:
                phrase_list_grammar = speechsdk.PhraseListGrammar.from_recognizer(
                    self._speech_processor
                )
                for phrase in self.config.phrase_list:
                    phrase_list_grammar.addPhrase(phrase)

            self._speech_processor.recognized.connect(self._on_final_transcript)
            self._speech_processor.recognizing.connect(self._on_interim_transcript)
            self._speech_processor.speech_start_detected.connect(self._on_user_started_speaking)
            self._speech_processor.speech_end_detected.connect(self._on_user_stopped_speaking)
            self._speech_processor.canceled.connect(self._on_speech_processing_error)

            self._speech_processor.start_continuous_recognition()
            logger.info("Azure STT speech processor started")

        except Exception as e:
            logger.error(f"Failed to setup speech processor: {str(e)}")
            raise

    def _on_final_transcript(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None:
        """Handle final recognition results"""
        text = evt.result.text.strip()
        if not text:
            return

        if self._transcript_callback:
            response = STTResponse(
                event_type=SpeechEventType.FINAL,
                data=SpeechData(
                    text=text, language=self.config.language, confidence=1.0
                ),
                metadata={"provider": "azure", "result_reason": str(evt.result.reason)},
            )
            self._event_queue.put_nowait(response)

    def _on_interim_transcript(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None:
        """Handle interim recognition results"""
        text = evt.result.text.strip()
        if not text:
            return

        if self._transcript_callback:
            response = STTResponse(
                event_type=SpeechEventType.INTERIM,
                data=SpeechData(
                    text=text, language=self.config.language, confidence=0.5
                ),
                metadata={"provider": "azure", "result_reason": str(evt.result.reason)},
            )
            self._event_queue.put_nowait(response)

    def _on_user_started_speaking(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None:
        """Handle speech start detection"""
        if self._is_speaking:
            return

        self._is_speaking = True
        current_time = time.time()

        if self._last_speech_time == 0.0:
            self._last_speech_time = current_time
        else:
            if current_time - self._last_speech_time < 1.0:
                global_event_emitter.emit("speech_started")

        self._last_speech_time = current_time

    def _on_user_stopped_speaking(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None:
        """Handle speech end detection"""
        if not self._is_speaking:
            return

        self._is_speaking = False
        global_event_emitter.emit("speech_stopped")

    def _on_speech_processing_error(self, evt: speechsdk.SpeechRecognitionCanceledEventArgs) -> None:
        """Handle speech processing errors and cancellations"""
        if evt.cancellation_details.reason == speechsdk.CancellationReason.Error:
            error_msg = f"Speech recognition canceled due to error: {evt.cancellation_details.error_details}"
            logger.error(error_msg)
            self.emit("error", error_msg)

    async def _process_events(self) -> None:
        """Process STT events from the queue"""
        while True:
            try:
                response = await self._event_queue.get()
                if self._transcript_callback:
                    await self._transcript_callback(response)
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error("Error processing STT event: %s", str(e), exc_info=True)

    async def _cleanup_speech_processor(self) -> None:
        """Cleanup speech processor resources"""
        try:
            if self._speech_processor:
                self._speech_processor.stop_continuous_recognition()
                self._speech_processor = None

            if self._audio_stream:
                self._audio_stream.close()
                self._audio_stream = None

        except Exception as e:
            logger.error(f"Error during speech processor cleanup: {str(e)}")

    async def aclose(self) -> None:
        """Cleanup resources"""
        if self._processing_task:
            self._processing_task.cancel()
            await asyncio.gather(self._processing_task, return_exceptions=True)

        await self._cleanup_speech_processor()
        logger.info("Azure STT closed")
        await super().aclose()

Base class for Speech-to-Text implementations

Initialize the Azure STT plugin.

Args

speech_key : Optional[str]
Azure Speech API key. Uses AZURE_SPEECH_KEY environment variable if not provided.
speech_region : Optional[str]
Azure Speech region. Uses AZURE_SPEECH_REGION environment variable if not provided.
language : str
The language to use for the STT plugin. Defaults to "en-US".
sample_rate : int
Sample rate to use for the STT plugin. Defaults to 16000.
enable_phrase_list : bool
Whether to enable phrase list for better recognition. Defaults to False.
phrase_list : Optional[List[str]]
List of phrases to boost recognition. Defaults to None.

Ancestors

  • videosdk.agents.stt.stt.STT
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Cleanup resources"""
    if self._processing_task:
        self._processing_task.cancel()
        await asyncio.gather(self._processing_task, return_exceptions=True)

    await self._cleanup_speech_processor()
    logger.info("Azure STT closed")
    await super().aclose()

Cleanup resources

async def process_audio(self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any) ‑> None
Expand source code
async def process_audio(
    self, audio_frames: bytes, language: Optional[str] = None, **kwargs: Any
) -> None:
    """Process audio frames and send to Azure Speech Service"""
    try:
        if not self._speech_processor:
            await self._setup_speech_processor(language)

        if self._audio_stream and SCIPY_AVAILABLE:
            audio_data = np.frombuffer(audio_frames, dtype=np.int16)

            if len(audio_data) > 0:
                stereo_data = audio_data.reshape(-1, 2)
                mono_data = stereo_data.mean(axis=1)

                resampled_data = signal.resample(
                    mono_data,
                    int(
                        len(mono_data)
                        * self.target_sample_rate
                        / self.input_sample_rate
                    ),
                )
                resampled_bytes = resampled_data.astype(np.int16).tobytes()
                self._audio_stream.write(resampled_bytes)

    except Exception as e:
        logger.error(f"Error in process_audio: {str(e)}")
        self.emit("error", str(e))
        await self._cleanup_speech_processor()

Process audio frames and send to Azure Speech Service

class AzureTTS (*,
voice: str = 'en-US-EmmaNeural',
language: str | None = None,
tuning: Optional[VoiceTuning] = None,
style: Optional[SpeakingStyle] = None,
speech_key: str | None = None,
speech_region: str | None = None,
speech_endpoint: str | None = None,
deployment_id: str | None = None,
speech_auth_token: str | None = None,
**kwargs: Any)
Expand source code
class AzureTTS(TTS):
    """
    Initialize the Azure TTS plugin.

    Args:
        voice (str): Name of the Azure neural voice to use (default: "en-US-EmmaNeural").
            For a full list of available voices, see:
            https://eastus2.tts.speech.microsoft.com/cognitiveservices/voices/list
            (Requires: curl --location --request GET with header 'Ocp-Apim-Subscription-Key')
        language (str, optional): Language code for the voice (e.g., "en-US"). If not provided, defaults to the voice's language.
        tuning (VoiceTuning, optional): VoiceTuning object to control speech rate, volume, and pitch.
        style (SpeakingStyle, optional): SpeakingStyle object for expressive speech synthesis.
        speech_key (str, optional): Azure Speech API key. If not provided, uses the AZURE_SPEECH_KEY environment variable.
        speech_region (str, optional): Azure Speech region. If not provided, uses the AZURE_SPEECH_REGION environment variable.
        speech_endpoint (str, optional): Custom endpoint URL. If not provided, uses the AZURE_SPEECH_ENDPOINT environment variable.
        deployment_id (str, optional): Custom deployment ID for model deployment scenarios.
        speech_auth_token (str, optional): Azure Speech authorization token for token-based authentication.

    """
    FIXED_SAMPLE_RATE = 24000
    AZURE_OUTPUT_FORMAT = "raw-24khz-16bit-mono-pcm"

    def __init__(
        self,
        *,
        voice: str = "en-US-EmmaNeural",
        language: str | None = None,
        tuning: Optional[VoiceTuning] = None,
        style: Optional[SpeakingStyle] = None,
        speech_key: str | None = None,
        speech_region: str | None = None,
        speech_endpoint: str | None = None,
        deployment_id: str | None = None,
        speech_auth_token: str | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(
            sample_rate=self.FIXED_SAMPLE_RATE,
            num_channels=1,
        )
        
        self.speech_key = speech_key or os.environ.get("AZURE_SPEECH_KEY")
        self.speech_region = speech_region or os.environ.get("AZURE_SPEECH_REGION")
        self.speech_endpoint = speech_endpoint or os.environ.get(
            "AZURE_SPEECH_ENDPOINT"
        )
        self.speech_auth_token = speech_auth_token
        self.deployment_id = deployment_id

        has_endpoint = bool(self.speech_endpoint)
        has_key_and_region = bool(self.speech_key and self.speech_region)
        has_token_and_region = bool(self.speech_auth_token and self.speech_region)

        if not (has_endpoint or has_key_and_region or has_token_and_region):
            raise ValueError(
                "Authentication requires one of: speech_endpoint, (speech_key & speech_region), or (speech_auth_token & speech_region)."
            )

        self.voice = voice
        self.language = language
        self.tuning = tuning
        self.style = style

        self._first_chunk_sent = False
        self._interrupted = False
        self._http_client: Optional[httpx.AsyncClient] = None
    
    
    def reset_first_audio_tracking(self) -> None:
        self._first_chunk_sent = False

    def _get_endpoint_url(self) -> str:
        if self.speech_endpoint:
            base = self.speech_endpoint.rstrip("/")
            if not base.endswith("/cognitiveservices/v1"):
                base = f"{base}/cognitiveservices/v1"
        else:
            base = f"https://{self.speech_region}.tts.speech.microsoft.com/cognitiveservices/v1"

        if self.deployment_id:
            return f"{base}?deploymentId={self.deployment_id}"
        return base

    def _get_http_client(self) -> httpx.AsyncClient:
        if not self._http_client:
            self._http_client = httpx.AsyncClient(
                timeout=httpx.Timeout(
                    connect=15.0, read=30.0, write=5.0, pool=5.0
                ),
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=50,
                    max_keepalive_connections=50,
                    keepalive_expiry=120,
                ),
            )
        return self._http_client

    async def synthesize(
        self,
        text: AsyncIterator[str] | str,
        voice_id: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        try:
            if not self.audio_track or not self.loop:
                self.emit("error", "Audio track or event loop not set")
                return

            self._interrupted = False

            if isinstance(text, AsyncIterator):
                async for segment in segment_text(text):
                    if self._interrupted:
                        break
                    await self._synthesize_segment(segment, voice_id, **kwargs)
            else:
                if not self._interrupted:
                    await self._synthesize_segment(text, voice_id, **kwargs)

        except Exception as e:
            logger.error("Azure TTS synthesis failed: %s", str(e), exc_info=True)
            self.emit("error", f"Azure TTS synthesis failed: {str(e)}")

    async def _synthesize_segment(
        self, text: str, voice_id: Optional[str] = None, **kwargs: Any
    ) -> None:
        if not text.strip() or self._interrupted:
            return

        try:
            
            headers = {
                "Content-Type": "application/ssml+xml",
                "X-Microsoft-OutputFormat": self.AZURE_OUTPUT_FORMAT,
                "User-Agent": "VideoSDK Agents", 
            }

            if self.speech_auth_token:
                headers["Authorization"] = f"Bearer {self.speech_auth_token}"
            elif self.speech_key:
                headers["Ocp-Apim-Subscription-Key"] = self.speech_key

            ssml_data = self._build_ssml(text, voice_id or self.voice)

            response = await self._get_http_client().post(
                url=self._get_endpoint_url(),
                headers=headers,
                content=ssml_data,
            )
            response.raise_for_status()

            audio_data = b""
            async for chunk in response.aiter_bytes(chunk_size=8192):
                if self._interrupted: 
                    break
                if chunk: 
                    audio_data += chunk

            if audio_data and not self._interrupted:
                await self._stream_audio_chunks(audio_data)
                
        except httpx.TimeoutException:
            logger.error("Azure TTS request timeout")
            self.emit("error", "Azure TTS request timeout")
        except httpx.HTTPStatusError as e:
            logger.error("Azure TTS HTTP error: %s - %s", e.response.status_code, e.response.text)
            self.emit("error", f"Azure TTS HTTP error: {e.response.status_code} - {e.response.text}")
        except Exception as e:
            if not self._interrupted:
                logger.error("Azure TTS synthesis failed: %s", str(e), exc_info=True)
                self.emit("error", f"Azure TTS synthesis failed: {str(e)}")

    def _build_ssml(self, text: str, voice: str) -> str:
        lang = self.language or "en-US"
        ssml = (
            f'<speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis" '
            f'xmlns:mstts="http://www.w3.org/2001/mstts" xml:lang="{lang}">'
        )
        ssml += f'<voice name="{voice}">'

        if self.style:
            degree = f' styledegree="{self.style.degree}"' if self.style.degree else ""
            ssml += f'<mstts:express-as style="{self.style.style}"{degree}>'

        if self.tuning:
            t = self.tuning
            rate_attr = f' rate="{t.rate}"' if t.rate is not None else ""
            vol_attr = f' volume="{t.volume}"' if t.volume is not None else ""
            pitch_attr = f' pitch="{t.pitch}"' if t.pitch is not None else ""
            ssml += f"<prosody{rate_attr}{vol_attr}{pitch_attr}>{text}</prosody>"
        else:
            ssml += text

        if self.style:
            ssml += "</mstts:express-as>"

        ssml += "</voice></speak>"
        return ssml

    async def _stream_audio_chunks(self, audio_bytes: bytes) -> None:
        chunk_size = int(self.FIXED_SAMPLE_RATE * 2 * 20 / 1000)
        for i in range(0, len(audio_bytes), chunk_size):
            if self._interrupted: break
            chunk = audio_bytes[i : i + chunk_size]
            if len(chunk) < chunk_size and len(chunk) > 0:
                padding_needed = chunk_size - len(chunk)
                chunk += b"\x00" * padding_needed
            if len(chunk) == chunk_size:
                if not self._first_chunk_sent and self._first_audio_callback:
                    self._first_chunk_sent = True
                    await self._first_audio_callback()
                if self.audio_track:
                    asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                await asyncio.sleep(0.001)

    async def interrupt(self) -> None:
        self._interrupted = True
        if self.audio_track:
            self.audio_track.interrupt()

    async def aclose(self) -> None:
        if self._http_client:
            await self._http_client.aclose()
            self._http_client = None
        await super().aclose()

Initialize the Azure TTS plugin.

Args

voice : str
Name of the Azure neural voice to use (default: "en-US-EmmaNeural"). For a full list of available voices, see: https://eastus2.tts.speech.microsoft.com/cognitiveservices/voices/list (Requires: curl –location –request GET with header 'Ocp-Apim-Subscription-Key')
language : str, optional
Language code for the voice (e.g., "en-US"). If not provided, defaults to the voice's language.
tuning : VoiceTuning, optional
VoiceTuning object to control speech rate, volume, and pitch.
style : SpeakingStyle, optional
SpeakingStyle object for expressive speech synthesis.
speech_key : str, optional
Azure Speech API key. If not provided, uses the AZURE_SPEECH_KEY environment variable.
speech_region : str, optional
Azure Speech region. If not provided, uses the AZURE_SPEECH_REGION environment variable.
speech_endpoint : str, optional
Custom endpoint URL. If not provided, uses the AZURE_SPEECH_ENDPOINT environment variable.
deployment_id : str, optional
Custom deployment ID for model deployment scenarios.
speech_auth_token : str, optional
Azure Speech authorization token for token-based authentication.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Class variables

var AZURE_OUTPUT_FORMAT
var FIXED_SAMPLE_RATE

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._http_client:
        await self._http_client.aclose()
        self._http_client = None
    await super().aclose()

Cleanup resources

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    self._interrupted = True
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS process

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self,
text: AsyncIterator[str] | str,
voice_id: Optional[str] = None,
**kwargs: Any) ‑> None
Expand source code
async def synthesize(
    self,
    text: AsyncIterator[str] | str,
    voice_id: Optional[str] = None,
    **kwargs: Any,
) -> None:
    try:
        if not self.audio_track or not self.loop:
            self.emit("error", "Audio track or event loop not set")
            return

        self._interrupted = False

        if isinstance(text, AsyncIterator):
            async for segment in segment_text(text):
                if self._interrupted:
                    break
                await self._synthesize_segment(segment, voice_id, **kwargs)
        else:
            if not self._interrupted:
                await self._synthesize_segment(text, voice_id, **kwargs)

    except Exception as e:
        logger.error("Azure TTS synthesis failed: %s", str(e), exc_info=True)
        self.emit("error", f"Azure TTS synthesis failed: {str(e)}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class AzureVoiceLive (*,
api_key: str | None = None,
endpoint: str | None = None,
model: str,
config: AzureVoiceLiveConfig | None = None,
credential: Union[AzureKeyCredential, TokenCredential] | None = None)
Expand source code
class AzureVoiceLive(RealtimeBaseModel[AzureVoiceLiveEventTypes]):
    """Azure Voice Live realtime model implementation"""

    def __init__(
        self,
        *,
        api_key: str | None = None,
        endpoint: str | None = None,
        model: str,
        config: AzureVoiceLiveConfig | None = None,
        credential: Union[AzureKeyCredential, TokenCredential] | None = None,
    ) -> None:
        """
        Initialize Azure Voice Live realtime model.

        Args:
            api_key: Azure Voice Live API key. If not provided, will attempt to read from AZURE_VOICE_LIVE_API_KEY env var
            endpoint: Azure Voice Live endpoint. If not provided, will attempt to read from AZURE_VOICE_LIVE_ENDPOINT env var
            model: The model identifier to use (e.g., 'gpt-4o-realtime-preview')
            config: Optional configuration object for customizing model behavior. Contains settings for:
                   - voice: Voice ID for audio output (Azure or OpenAI voices)
                   - modalities: List of enabled response types [TEXT, AUDIO]
                   - turn_detection: Voice activity detection settings
                   - temperature: Response randomness control
            credential: Azure credential object. If provided, takes precedence over api_key

        Raises:
            ValueError: If no API key or credential is provided and none found in environment variables
        """
        super().__init__()
        self.model = model
        self.endpoint = endpoint or os.getenv(
            "AZURE_VOICE_LIVE_ENDPOINT", "wss://api.voicelive.com/v1"
        )

        if credential:
            self.credential = credential
        elif api_key:
            self.credential = AzureKeyCredential(api_key)
        else:
            env_api_key = os.getenv("AZURE_VOICE_LIVE_API_KEY")
            if env_api_key:
                self.credential = AzureKeyCredential(env_api_key)
            else:
                try:
                    self.credential = DefaultAzureCredential()
                except Exception:
                    self.emit("error", "Azure Voice Live credentials required")
                    raise ValueError(
                        "Azure Voice Live credentials required. Provide api_key, credential, or set AZURE_VOICE_LIVE_API_KEY environment variable"
                    )

        self._session: Optional[AzureVoiceLiveSession] = None
        self._closing = False
        self._instructions: str = (
            "You are a helpful voice assistant that can answer questions and help with tasks."
        )
        self.loop = None
        self.audio_track: Optional[CustomAudioStreamTrack] = None
        self.config: AzureVoiceLiveConfig = config or AzureVoiceLiveConfig()
        self.input_sample_rate = VIDEOSDK_INPUT_SAMPLE_RATE
        self.target_sample_rate = AZURE_VOICE_LIVE_SAMPLE_RATE
        self._agent_speaking = False
        self._user_speaking = False
        self.session_ready = False
        self._session_ready_event = asyncio.Event()

    def set_agent(self, agent: Agent) -> None:
        """Set the agent configuration"""
        self._instructions = agent.instructions

    async def connect(self) -> None:
        """Connect to Azure Voice Live API"""
        if self._session:
            await self._cleanup_session(self._session)
            self._session = None

        self._closing = False

        try:
            if (
                not self.audio_track
                and self.loop
                and Modality.AUDIO in self.config.modalities
            ):
                self.audio_track = CustomAudioStreamTrack(self.loop)
            elif not self.loop and Modality.AUDIO in self.config.modalities:
                self.emit(
                    "error", "Event loop not initialized. Audio playback will not work."
                )
                raise RuntimeError(
                    "Event loop not initialized. Audio playback will not work."
                )

            session = await self._create_session()
            if session:
                self._session = session

            if self._session:
                asyncio.create_task(
                    self._process_events(), name="azure-voice-live-events"
                )
                try:
                    logger.info("Waiting for Azure Voice Live session to be ready...")
                    await asyncio.wait_for(
                        self._session_ready_event.wait(), timeout=10.0
                    )
                    logger.info("Azure Voice Live session is ready.")
                except asyncio.TimeoutError:
                    self.emit("error", "Azure Voice Live session ready timeout")
                    raise RuntimeError(
                        "Azure Voice Live session did not become ready in time"
                    )

        except Exception as e:
            self.emit("error", f"Error connecting to Azure Voice Live API: {e}")
            traceback.print_exc()
            raise

    async def _create_session(self) -> AzureVoiceLiveSession:
        """Create a new Azure Voice Live session"""
        try:
            connection_cm = connect(
                endpoint=self.endpoint,
                credential=self.credential,
                model=self.model,
                connection_options={
                    "max_msg_size": 10 * 1024 * 1024,
                    "heartbeat": 20,
                    "timeout": 20,
                },
            )

            connection = await connection_cm.__aenter__()

            await self._setup_session(connection)

            return AzureVoiceLiveSession(
                connection=connection, session_id=None, tasks=[]
            )

        except Exception as e:
            self.emit("error", f"Failed to create Azure Voice Live session: {e}")
            traceback.print_exc()
            raise

    async def _setup_session(self, connection) -> None:
        """Configure the Azure Voice Live session"""
        logger.info("Setting up Azure Voice Live session...")

        voice_config: Union[AzureStandardVoice, str]
        if (
            self.config.voice.startswith("en-US-")
            or self.config.voice.startswith("en-CA-")
            or "-" in self.config.voice
        ):
            voice_config = AzureStandardVoice(
                name=self.config.voice, type="azure-standard"
            )
        else:
            voice_config = self.config.voice

        turn_detection_config = ServerVad(
            threshold=self.config.turn_detection_threshold,
            prefix_padding_ms=self.config.turn_detection_prefix_padding_ms,
            silence_duration_ms=self.config.turn_detection_silence_duration_ms,
        )

        session_config = RequestSession(
            modalities=self.config.modalities,
            instructions=self._instructions,
            voice=voice_config,
            input_audio_format=self.config.input_audio_format,
            output_audio_format=self.config.output_audio_format,
            turn_detection=turn_detection_config,
        )

        if self.config.temperature is not None:
            session_config.temperature = self.config.temperature
        if self.config.max_completion_tokens is not None:
            session_config.max_completion_tokens = self.config.max_completion_tokens

        await connection.session.update(session=session_config)
        logger.info("Azure Voice Live session configuration sent")

    async def _process_events(self) -> None:
        """Process events from the Azure Voice Live connection"""
        try:
            if not self._session or not self._session.connection:
                return

            async for event in self._session.connection:
                if self._closing:
                    break
                await self._handle_event(event)

        except asyncio.CancelledError:
            logger.info("Event processing cancelled")
        except Exception as e:
            self.emit("error", f"Error processing events: {e}")
            traceback.print_exc()

    async def _handle_event(self, event) -> None:
        """Handle different types of events from Azure Voice Live"""
        try:
            logger.debug(f"Received event: {event.type}")

            if event.type == ServerEventType.SESSION_UPDATED:
                logger.info(f"Session ready: {event.session.id}")
                if self._session:
                    self._session.session_id = event.session.id
                self.session_ready = True
                self._session_ready_event.set()

            elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED:
                logger.info("User started speaking")
                if not self._user_speaking:
                    await realtime_metrics_collector.set_user_speech_start()
                    self._user_speaking = True
                self.emit("user_speech_started", {"type": "done"})

                if self.audio_track and Modality.AUDIO in self.config.modalities:
                    self.audio_track.interrupt()

                if self._session and self._session.connection:
                    try:
                        await self._session.connection.response.cancel()
                    except Exception as e:
                        logger.debug(f"No response to cancel: {e}")

            elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED:
                logger.info("User stopped speaking")
                if self._user_speaking:
                    await realtime_metrics_collector.set_user_speech_end()
                    self._user_speaking = False

            elif event.type == ServerEventType.RESPONSE_CREATED:
                logger.info("Assistant response created")

            elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA:
                logger.debug("Received audio delta")
                if Modality.AUDIO in self.config.modalities:
                    if not self._agent_speaking:
                        await realtime_metrics_collector.set_agent_speech_start()
                        self._agent_speaking = True

                    if self.audio_track and self.loop:
                        asyncio.create_task(self.audio_track.add_new_bytes(event.delta))

            elif event.type == ServerEventType.RESPONSE_AUDIO_DONE:
                logger.info("Assistant finished speaking")
                if self._agent_speaking:
                    await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
                    self._agent_speaking = False

            elif event.type == ServerEventType.RESPONSE_TEXT_DELTA:
                if hasattr(self, "_current_text_response"):
                    self._current_text_response += event.delta
                else:
                    self._current_text_response = event.delta

            elif event.type == ServerEventType.RESPONSE_TEXT_DONE:
                if hasattr(self, "_current_text_response"):
                    global_event_emitter.emit(
                        "text_response",
                        {"text": self._current_text_response, "type": "done"},
                    )
                    await realtime_metrics_collector.set_agent_response(
                        self._current_text_response
                    )
                    try:
                        self.emit(
                            "realtime_model_transcription",
                            {
                                "role": "agent",
                                "text": self._current_text_response,
                                "is_final": True,
                            },
                        )
                    except Exception:
                        pass
                    self._current_text_response = ""

            elif event.type == ServerEventType.RESPONSE_DONE:
                logger.info("Response complete")
                if self._agent_speaking:
                    await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
                    self._agent_speaking = False

            elif event.type == ServerEventType.ERROR:
                logger.error(f"Azure Voice Live error: {event.error.message}")
                self.emit("error", f"Azure Voice Live error: {event.error.message}")

            elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED:
                logger.debug(f"Conversation item created: {event.item.id}")

                if (
                    hasattr(event.item, "content")
                    and event.item.content
                    and hasattr(event.item.content[0], "transcript")
                ):
                    transcript = event.item.content[0].transcript
                    if transcript and event.item.role == "user":
                        await realtime_metrics_collector.set_user_transcript(transcript)
                        try:
                            self.emit(
                                "realtime_model_transcription",
                                {"role": "user", "text": transcript, "is_final": True},
                            )
                        except Exception:
                            pass

            else:
                logger.debug(f"Unhandled event type: {event.type}")

        except Exception as e:
            self.emit("error", f"Error handling event {event.type}: {e}")
            traceback.print_exc()

    async def handle_audio_input(self, audio_data: bytes) -> None:
        """Handle incoming audio data from the user"""
        if not self._session or self._closing or not self.session_ready:
            return

        if Modality.AUDIO not in self.config.modalities:
            return

        try:
            audio_array = np.frombuffer(audio_data, dtype=np.int16)

            if len(audio_array) % 2 == 0:
                audio_array = audio_array.reshape(-1, 2)
                audio_array = np.mean(audio_array, axis=1).astype(np.int16)

            target_length = int(
                len(audio_array) * self.target_sample_rate / self.input_sample_rate
            )
            resampled_float = signal.resample(
                audio_array.astype(np.float32), target_length
            )
            resampled_int16 = np.clip(resampled_float, -32768, 32767).astype(np.int16)
            resampled_bytes = resampled_int16.tobytes()

            encoded_audio = base64.b64encode(resampled_bytes).decode("utf-8")

            await self._session.connection.input_audio_buffer.append(
                audio=encoded_audio
            )

        except Exception as e:
            self.emit("error", f"Error processing audio input: {e}")

    async def interrupt(self) -> None:
        """Interrupt current response"""
        if not self._session or self._closing:
            return

        try:
            if self._session.connection:
                await self._session.connection.response.cancel()

            if self.audio_track and Modality.AUDIO in self.config.modalities:
                self.audio_track.interrupt()

            await realtime_metrics_collector.set_interrupted()

            if self._agent_speaking:
                await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
                self._agent_speaking = False

        except Exception as e:
            self.emit("error", f"Interrupt error: {e}")

    async def send_message(self, message: str) -> None:
        """Send a text message to get audio response"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self.session_ready:
            if retry_count >= max_retries:
                raise RuntimeError(
                    "No active Azure Voice Live session after maximum retries"
                )
            logger.debug("No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            await self._session.connection.conversation.item.create(
                item={
                    "type": "message",
                    "role": "assistant",
                    "content": [
                        {
                            "type": "text",
                            "text": f"Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]: {message}",
                        }
                    ],
                }
            )

            await self._session.connection.response.create()

        except Exception as e:
            self.emit("error", f"Error sending message: {e}")

    async def send_text_message(self, message: str) -> None:
        """Send a text message for text-only communication"""
        retry_count = 0
        max_retries = 5
        while not self._session or not self.session_ready:
            if retry_count >= max_retries:
                raise RuntimeError(
                    "No active Azure Voice Live session after maximum retries"
                )
            logger.debug("No active session, waiting for connection...")
            await asyncio.sleep(1)
            retry_count += 1

        try:
            await self._session.connection.conversation.item.create(
                item={
                    "type": "message",
                    "role": "user",
                    "content": [{"type": "input_text", "text": message}],
                }
            )

            await self._session.connection.response.create()

        except Exception as e:
            self.emit("error", f"Error sending text message: {e}")

    async def _cleanup_session(self, session: AzureVoiceLiveSession) -> None:
        """Clean up session resources"""
        for task in session.tasks:
            if not task.done():
                task.cancel()

        try:
            if session.connection:
                if hasattr(session.connection, "close"):
                    await session.connection.close()
        except Exception as e:
            self.emit("error", f"Error closing session: {e}")

    async def aclose(self) -> None:
        """Clean up all resources"""
        if self._closing:
            return

        self._closing = True

        if self._session:
            await self._cleanup_session(self._session)
            self._session = None

        if hasattr(self.audio_track, "cleanup") and self.audio_track:
            try:
                await self.audio_track.cleanup()
            except Exception as e:
                self.emit("error", f"Error cleaning up audio track: {e}")

Azure Voice Live realtime model implementation

Initialize Azure Voice Live realtime model.

Args

api_key
Azure Voice Live API key. If not provided, will attempt to read from AZURE_VOICE_LIVE_API_KEY env var
endpoint
Azure Voice Live endpoint. If not provided, will attempt to read from AZURE_VOICE_LIVE_ENDPOINT env var
model
The model identifier to use (e.g., 'gpt-4o-realtime-preview')
config
Optional configuration object for customizing model behavior. Contains settings for: - voice: Voice ID for audio output (Azure or OpenAI voices) - modalities: List of enabled response types [TEXT, AUDIO] - turn_detection: Voice activity detection settings - temperature: Response randomness control
credential
Azure credential object. If provided, takes precedence over api_key

Raises

ValueError
If no API key or credential is provided and none found in environment variables

Ancestors

  • videosdk.agents.realtime_base_model.RealtimeBaseModel
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Clean up all resources"""
    if self._closing:
        return

    self._closing = True

    if self._session:
        await self._cleanup_session(self._session)
        self._session = None

    if hasattr(self.audio_track, "cleanup") and self.audio_track:
        try:
            await self.audio_track.cleanup()
        except Exception as e:
            self.emit("error", f"Error cleaning up audio track: {e}")

Clean up all resources

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to Azure Voice Live API"""
    if self._session:
        await self._cleanup_session(self._session)
        self._session = None

    self._closing = False

    try:
        if (
            not self.audio_track
            and self.loop
            and Modality.AUDIO in self.config.modalities
        ):
            self.audio_track = CustomAudioStreamTrack(self.loop)
        elif not self.loop and Modality.AUDIO in self.config.modalities:
            self.emit(
                "error", "Event loop not initialized. Audio playback will not work."
            )
            raise RuntimeError(
                "Event loop not initialized. Audio playback will not work."
            )

        session = await self._create_session()
        if session:
            self._session = session

        if self._session:
            asyncio.create_task(
                self._process_events(), name="azure-voice-live-events"
            )
            try:
                logger.info("Waiting for Azure Voice Live session to be ready...")
                await asyncio.wait_for(
                    self._session_ready_event.wait(), timeout=10.0
                )
                logger.info("Azure Voice Live session is ready.")
            except asyncio.TimeoutError:
                self.emit("error", "Azure Voice Live session ready timeout")
                raise RuntimeError(
                    "Azure Voice Live session did not become ready in time"
                )

    except Exception as e:
        self.emit("error", f"Error connecting to Azure Voice Live API: {e}")
        traceback.print_exc()
        raise

Connect to Azure Voice Live API

async def handle_audio_input(self, audio_data: bytes) ‑> None
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None:
    """Handle incoming audio data from the user"""
    if not self._session or self._closing or not self.session_ready:
        return

    if Modality.AUDIO not in self.config.modalities:
        return

    try:
        audio_array = np.frombuffer(audio_data, dtype=np.int16)

        if len(audio_array) % 2 == 0:
            audio_array = audio_array.reshape(-1, 2)
            audio_array = np.mean(audio_array, axis=1).astype(np.int16)

        target_length = int(
            len(audio_array) * self.target_sample_rate / self.input_sample_rate
        )
        resampled_float = signal.resample(
            audio_array.astype(np.float32), target_length
        )
        resampled_int16 = np.clip(resampled_float, -32768, 32767).astype(np.int16)
        resampled_bytes = resampled_int16.tobytes()

        encoded_audio = base64.b64encode(resampled_bytes).decode("utf-8")

        await self._session.connection.input_audio_buffer.append(
            audio=encoded_audio
        )

    except Exception as e:
        self.emit("error", f"Error processing audio input: {e}")

Handle incoming audio data from the user

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt current response"""
    if not self._session or self._closing:
        return

    try:
        if self._session.connection:
            await self._session.connection.response.cancel()

        if self.audio_track and Modality.AUDIO in self.config.modalities:
            self.audio_track.interrupt()

        await realtime_metrics_collector.set_interrupted()

        if self._agent_speaking:
            await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
            self._agent_speaking = False

    except Exception as e:
        self.emit("error", f"Interrupt error: {e}")

Interrupt current response

async def send_message(self, message: str) ‑> None
Expand source code
async def send_message(self, message: str) -> None:
    """Send a text message to get audio response"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self.session_ready:
        if retry_count >= max_retries:
            raise RuntimeError(
                "No active Azure Voice Live session after maximum retries"
            )
        logger.debug("No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        await self._session.connection.conversation.item.create(
            item={
                "type": "message",
                "role": "assistant",
                "content": [
                    {
                        "type": "text",
                        "text": f"Repeat the user's exact message back to them [DO NOT ADD ANYTHING ELSE]: {message}",
                    }
                ],
            }
        )

        await self._session.connection.response.create()

    except Exception as e:
        self.emit("error", f"Error sending message: {e}")

Send a text message to get audio response

async def send_text_message(self, message: str) ‑> None
Expand source code
async def send_text_message(self, message: str) -> None:
    """Send a text message for text-only communication"""
    retry_count = 0
    max_retries = 5
    while not self._session or not self.session_ready:
        if retry_count >= max_retries:
            raise RuntimeError(
                "No active Azure Voice Live session after maximum retries"
            )
        logger.debug("No active session, waiting for connection...")
        await asyncio.sleep(1)
        retry_count += 1

    try:
        await self._session.connection.conversation.item.create(
            item={
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": message}],
            }
        )

        await self._session.connection.response.create()

    except Exception as e:
        self.emit("error", f"Error sending text message: {e}")

Send a text message for text-only communication

def set_agent(self, agent: Agent) ‑> None
Expand source code
def set_agent(self, agent: Agent) -> None:
    """Set the agent configuration"""
    self._instructions = agent.instructions

Set the agent configuration

class AzureVoiceLiveConfig (voice: str = 'en-US-AvaNeural',
modalities: List[Modality] = <factory>,
input_audio_format: InputAudioFormat = InputAudioFormat.PCM16,
output_audio_format: InputAudioFormat = InputAudioFormat.PCM16,
turn_detection_threshold: float = 0.5,
turn_detection_prefix_padding_ms: int = 300,
turn_detection_silence_duration_ms: int = 500,
temperature: Optional[float] = None,
max_completion_tokens: Optional[int] = None)
Expand source code
@dataclass
class AzureVoiceLiveConfig:
    """Configuration for Azure Voice Live API (Beta)

    Args:
        voice: Voice ID for audio output. Can be Azure voice (e.g., 'en-US-AvaNeural') or OpenAI voice ('alloy', 'echo', etc.). Default is 'en-US-AvaNeural'
        modalities: List of enabled response types. Options: [Modality.TEXT, Modality.AUDIO]. Default includes both
        input_audio_format: Audio format for input. Default is AudioFormat.PCM16
        output_audio_format: Audio format for output. Default is AudioFormat.PCM16
        turn_detection_threshold: Voice activity detection threshold (0.0-1.0). Default is 0.5
        turn_detection_prefix_padding_ms: Padding before speech start (ms). Default is 300
        turn_detection_silence_duration_ms: Silence duration to mark end (ms). Default is 500
        temperature: Controls randomness in response generation. Higher values make output more random. Default is None
        max_completion_tokens: Maximum number of tokens in response. Default is None
    """

    voice: str = "en-US-AvaNeural"
    modalities: List[Modality] = field(
        default_factory=lambda: [Modality.TEXT, Modality.AUDIO]
    )
    input_audio_format: InputAudioFormat = InputAudioFormat.PCM16
    output_audio_format: InputAudioFormat = InputAudioFormat.PCM16
    turn_detection_threshold: float = 0.5
    turn_detection_prefix_padding_ms: int = 300
    turn_detection_silence_duration_ms: int = 500
    temperature: Optional[float] = None
    max_completion_tokens: Optional[int] = None

Configuration for Azure Voice Live API (Beta)

Args

voice
Voice ID for audio output. Can be Azure voice (e.g., 'en-US-AvaNeural') or OpenAI voice ('alloy', 'echo', etc.). Default is 'en-US-AvaNeural'
modalities
List of enabled response types. Options: [Modality.TEXT, Modality.AUDIO]. Default includes both
input_audio_format
Audio format for input. Default is AudioFormat.PCM16
output_audio_format
Audio format for output. Default is AudioFormat.PCM16
turn_detection_threshold
Voice activity detection threshold (0.0-1.0). Default is 0.5
turn_detection_prefix_padding_ms
Padding before speech start (ms). Default is 300
turn_detection_silence_duration_ms
Silence duration to mark end (ms). Default is 500
temperature
Controls randomness in response generation. Higher values make output more random. Default is None
max_completion_tokens
Maximum number of tokens in response. Default is None

Instance variables

var input_audio_format : azure.ai.voicelive.models._enums.InputAudioFormat
var max_completion_tokens : int | None
var modalities : List[azure.ai.voicelive.models._enums.Modality]
var output_audio_format : azure.ai.voicelive.models._enums.InputAudioFormat
var temperature : float | None
var turn_detection_prefix_padding_ms : int
var turn_detection_silence_duration_ms : int
var turn_detection_threshold : float
var voice : str
class SpeakingStyle (style: str, degree: float | None = None)
Expand source code
@dataclass
class SpeakingStyle:
    """Configuration for speech expressive style."""
    style: str
    _degree: float | None = None

    @property
    def degree(self):
        return self._degree

    @degree.setter
    def degree(self, value: float | None):
        if value is not None and not 0.1 <= value <= 2.0:
            raise ValueError("Style degree must be between 0.1 and 2.0")
        self._degree = value
    
    def __init__(self, style: str, degree: float | None = None):
        self.style = style
        self.degree = degree

Configuration for speech expressive style.

Instance variables

prop degree
Expand source code
@property
def degree(self):
    return self._degree
var style : str
class VoiceTuning (rate=None, volume=None, pitch=None)
Expand source code
@dataclass
class VoiceTuning:
    """Configuration for speech tuning (rate, volume, pitch)."""

    _rate: Literal["x-slow", "slow", "medium", "fast", "x-fast"] | float | None = None
    _volume: Literal["silent", "x-soft", "soft", "medium", "loud", "x-loud"] | float | None = None
    _pitch: Literal["x-low", "low", "medium", "high", "x-high"] | None = None

    @property
    def rate(self):
        return self._rate

    @rate.setter
    def rate(self, value):
        if value:
            if isinstance(value, float) and not 0.5 <= value <= 2.0:
                raise ValueError("Rate must be a float between 0.5 and 2.0")
            if isinstance(value, str) and value not in ["x-slow", "slow", "medium", "fast", "x-fast"]:
                raise ValueError("Rate must be one of 'x-slow', 'slow', 'medium', 'fast', 'x-fast'")
        self._rate = value

    @property
    def volume(self):
        return self._volume
    
    @volume.setter
    def volume(self, value):
        if value:
            if isinstance(value, float) and not 0 <= value <= 100.0:
                raise ValueError("Volume must be a float between 0 and 100")
            if isinstance(value, str) and value not in ["silent", "x-soft", "soft", "medium", "loud", "x-loud"]:
                raise ValueError("Volume must be one of 'silent', 'x-soft', 'soft', 'medium', 'loud', 'x-loud'")
        self._volume = value

    @property
    def pitch(self):
        return self._pitch

    @pitch.setter
    def pitch(self, value):
        if value and value not in ["x-low", "low", "medium", "high", "x-high"]:
            raise ValueError("Pitch must be one of 'x-low', 'low', 'medium', 'high', 'x-high'")
        self._pitch = value

    def __init__(self, rate=None, volume=None, pitch=None):
        self.rate = rate
        self.volume = volume
        self.pitch = pitch

Configuration for speech tuning (rate, volume, pitch).

Instance variables

prop pitch
Expand source code
@property
def pitch(self):
    return self._pitch
prop rate
Expand source code
@property
def rate(self):
    return self._rate
prop volume
Expand source code
@property
def volume(self):
    return self._volume