Package videosdk.plugins.aws

Sub-modules

videosdk.plugins.aws.aws_nova_sonic_api
videosdk.plugins.aws.tts

Classes

class AWSPollyTTS (*,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
region: str = 'us-east-1',
voice: str = 'Joanna',
engine: str = 'neural',
speed: float = 1.0,
pitch: float = 0.0,
**kwargs: Any)
Expand source code
class AWSPollyTTS(TTS):
    """
    AWS Polly TTS implementation (plug-and-play for VideoSDK Agents).
    Usage:
        tts = AWSPollyTTS()  # All config from env, or override via kwargs
    """

    def __init__(
        self,
        *,
        aws_secret_access_key: Optional[str] = None,
        aws_session_token: Optional[str] = None,
        aws_access_key_id: Optional[str] = None,
        region: str = "us-east-1",
        voice: str = "Joanna",
        engine: str = "neural",
        speed: float = 1.0,
        pitch: float = 0.0,
        **kwargs: Any,
    ):
        """Initialize the AWS Polly TTS plugin.

        Args:
            aws_secret_access_key (Optional[str], optional): AWS secret access key. Defaults to None.
            aws_session_token (Optional[str], optional): AWS session token. Defaults to None.
            aws_access_key_id (Optional[str], optional): AWS access key ID. Defaults to None.
            region (str): The region to use for the TTS plugin. Defaults to "us-east-1".
            voice (str): The voice to use for the TTS plugin, e.g. "Joanna". Defaults to "Joanna".
            engine (str): The engine to use for the TTS plugin. Defaults to "neural".
            region (str): The region to use for the TTS plugin. Defaults to "us-east-1".
            speed (float): The speed to use for the TTS plugin. Defaults to 1.0.
            pitch (float): The pitch to use for the TTS plugin. Defaults to 0.0.
            **kwargs (Any): Additional keyword arguments to pass to the TTS plugin.
        """
        super().__init__(sample_rate=VIDEOSDK_TTS_SAMPLE_RATE,
                         num_channels=VIDEOSDK_TTS_CHANNELS)
        if not BOTO3_AVAILABLE:
            raise ImportError(
                "boto3 is not installed. Please install it with 'pip install boto3'")
        self.voice = voice
        self.engine = engine
        self.region = region or os.getenv("AWS_DEFAULT_REGION")
        self.aws_access_key_id = aws_access_key_id or os.getenv(
            "AWS_ACCESS_KEY_ID")
        self.aws_secret_access_key = aws_secret_access_key or os.getenv(
            "AWS_SECRET_ACCESS_KEY")
        self.aws_session_token = aws_session_token or os.getenv(
            "AWS_SESSION_TOKEN")
        self.speed = speed
        self.pitch = pitch
        self._first_chunk_sent = False
        if not self.region:
            raise ValueError(
                "AWS region must be specified via parameter or AWS_DEFAULT_REGION env var")
        if not self.aws_access_key_id or not self.aws_secret_access_key:
            raise ValueError(
                "AWS credentials must be provided or set as environment variables.")
        client_kwargs = {
            "service_name": "polly",
            "region_name": self.region,
            "aws_access_key_id": self.aws_access_key_id,
            "aws_secret_access_key": self.aws_secret_access_key,
        }
        if self.aws_session_token:
            client_kwargs["aws_session_token"] = self.aws_session_token
        self._client = boto3.client(**client_kwargs)

    def reset_first_audio_tracking(self) -> None:
        """Reset the first audio tracking state for next TTS task"""
        self._first_chunk_sent = False

    async def synthesize(self, text_or_generator: Union[str, AsyncIterator[str]], **kwargs) -> None:
        if not self.audio_track or not self.loop:
            logger.error("Audio track or event loop not initialized.")
            return

        try:
            if isinstance(text_or_generator, str):
                await self._process_text_segment(text_or_generator)
            else:
                async for segment in segment_text(text_or_generator):
                    await self._process_text_segment(segment)

        except (BotoCoreError, ClientError) as e:
            logger.error(f"AWS Polly API error: {e}")
        except Exception as e:
            logger.error(f"Error in AWSPollyTTS synthesis: {e}")

    async def _process_text_segment(self, text_segment: str) -> None:
        """Process individual text segments for streaming TTS"""
        if not text_segment.strip():
            return

        ssml_text = self._build_ssml(text_segment)

        response = await asyncio.to_thread(
            self._client.synthesize_speech,
            Text=ssml_text,
            TextType="ssml",
            OutputFormat="pcm",
            VoiceId=self.voice,
            SampleRate="16000",
            Engine=self.engine
        )

        audio_stream = response.get("AudioStream")
        if audio_stream:
            audio_data = await asyncio.to_thread(audio_stream.read)
            await self._stream_audio(audio_data)

    async def _stream_audio(self, audio_data: bytes):
        if not audio_data:
            return

        try:
            if SCIPY_AVAILABLE:
                audio_array = np.frombuffer(audio_data, dtype=np.int16)
                target_length = int(len(audio_array) * 24000 / 16000)
                resampled_audio = signal.resample(audio_array, target_length)

                resampled_audio = np.clip(
                    resampled_audio, -32768, 32767).astype(np.int16)
                audio_data = resampled_audio.tobytes()

                logger.debug(
                    f"Resampled audio from {len(audio_array)} to {len(resampled_audio)} samples")
            else:
                logger.warning(
                    "scipy not available, using original audio without resampling")

            chunk_size = int(self.sample_rate *
                             self.num_channels * 2 * 20 / 1000)

            for i in range(0, len(audio_data), chunk_size):
                chunk = audio_data[i: i + chunk_size]
                if len(chunk) < chunk_size:
                    chunk += b'\x00' * (chunk_size - len(chunk))

                if self.audio_track and self.loop:
                    if not self._first_chunk_sent and self._first_audio_callback:
                        self._first_chunk_sent = True
                        await self._first_audio_callback()

                    asyncio.create_task(self.audio_track.add_new_bytes(chunk))
                    await asyncio.sleep(0.01)

        except Exception as e:
            logger.error(f"Error in audio streaming: {e}")
            chunk_size = int(self.sample_rate *
                             self.num_channels * 2 * 20 / 1000)

            for i in range(0, len(audio_data), chunk_size):
                chunk = audio_data[i: i + chunk_size]
                if len(chunk) < chunk_size:
                    chunk += b'\x00' * (chunk_size - len(chunk))

                if self.audio_track and self.loop:
                    if not self._first_chunk_sent and self._first_audio_callback:
                        self._first_chunk_sent = True
                        await self._first_audio_callback()

                    self.loop.create_task(
                        self.audio_track.add_new_bytes(chunk))
                    await asyncio.sleep(0.001)

    def _build_ssml(self, text: str) -> str:
        """Build SSML for AWS Polly with speed and pitch controls"""
        text = text.replace("&", "&amp;").replace(
            "<", "&lt;").replace(">", "&gt;")

        ssml_parts = ["<speak>"]

        if self.speed != 1.0:
            if self.speed <= 0.5:
                rate = "x-slow"
            elif self.speed <= 0.75:
                rate = "slow"
            elif self.speed <= 1.25:
                rate = "medium"
            elif self.speed <= 1.5:
                rate = "fast"
            else:
                rate = "x-fast"

            rate_percent = f"{int(self.speed * 100)}%"
            ssml_parts.append(f'<prosody rate="{rate_percent}">')

        if self.pitch != 0.0:
            pitch_value = f"{int(self.pitch * 100)}%"
            ssml_parts.append(f'<prosody pitch="{pitch_value}">')

        ssml_parts.append(text)

        if self.pitch != 0.0:
            ssml_parts.append("</prosody>")
        if self.speed != 1.0:
            ssml_parts.append("</prosody>")

        ssml_parts.append("</speak>")

        return "".join(ssml_parts)

    async def aclose(self):
        """Close the TTS connection"""
        pass

    async def interrupt(self) -> None:
        """Interrupt the TTS audio stream"""
        if self.audio_track:
            self.audio_track.interrupt()

AWS Polly TTS implementation (plug-and-play for VideoSDK Agents).

Usage

tts = AWSPollyTTS() # All config from env, or override via kwargs

Initialize the AWS Polly TTS plugin.

Args

aws_secret_access_key : Optional[str], optional
AWS secret access key. Defaults to None.
aws_session_token : Optional[str], optional
AWS session token. Defaults to None.
aws_access_key_id : Optional[str], optional
AWS access key ID. Defaults to None.
region : str
The region to use for the TTS plugin. Defaults to "us-east-1".
voice : str
The voice to use for the TTS plugin, e.g. "Joanna". Defaults to "Joanna".
engine : str
The engine to use for the TTS plugin. Defaults to "neural".
region : str
The region to use for the TTS plugin. Defaults to "us-east-1".
speed : float
The speed to use for the TTS plugin. Defaults to 1.0.
pitch : float
The pitch to use for the TTS plugin. Defaults to 0.0.
**kwargs : Any
Additional keyword arguments to pass to the TTS plugin.

Ancestors

  • videosdk.agents.tts.tts.TTS
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    """Close the TTS connection"""
    pass

Close the TTS connection

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt the TTS audio stream"""
    if self.audio_track:
        self.audio_track.interrupt()

Interrupt the TTS audio stream

def reset_first_audio_tracking(self) ‑> None
Expand source code
def reset_first_audio_tracking(self) -> None:
    """Reset the first audio tracking state for next TTS task"""
    self._first_chunk_sent = False

Reset the first audio tracking state for next TTS task

async def synthesize(self, text_or_generator: Union[str, AsyncIterator[str]], **kwargs) ‑> None
Expand source code
async def synthesize(self, text_or_generator: Union[str, AsyncIterator[str]], **kwargs) -> None:
    if not self.audio_track or not self.loop:
        logger.error("Audio track or event loop not initialized.")
        return

    try:
        if isinstance(text_or_generator, str):
            await self._process_text_segment(text_or_generator)
        else:
            async for segment in segment_text(text_or_generator):
                await self._process_text_segment(segment)

    except (BotoCoreError, ClientError) as e:
        logger.error(f"AWS Polly API error: {e}")
    except Exception as e:
        logger.error(f"Error in AWSPollyTTS synthesis: {e}")

Convert text to speech

Args

text
Text to convert to speech (either string or async iterator of strings)
voice_id
Optional voice identifier
**kwargs
Additional provider-specific arguments

Returns

None

class NovaSonicConfig (model_id: str = 'amazon.nova-sonic-v1:0',
voice: str = 'tiffany',
temperature: float = 0.7,
top_p: float = 0.9,
max_tokens: int = 1024)
Expand source code
@dataclass
class NovaSonicConfig:
    """Configuration for Nova Sonic API

    Args:
        model_id: The Nova Sonic model ID to use. Default is 'amazon.nova-sonic-v1:0'
        voice: Voice ID for audio output. Default is 'matthew'
        temperature: Controls randomness in responses. Default is 0.7
        top_p: Nucleus sampling parameter. Default is 0.9
        max_tokens: Maximum tokens in response. Default is 1024
    """

    model_id: str = "amazon.nova-sonic-v1:0"
    voice: str = "tiffany"
    temperature: float = 0.7
    top_p: float = 0.9
    max_tokens: int = 1024

Configuration for Nova Sonic API

Args

model_id
The Nova Sonic model ID to use. Default is 'amazon.nova-sonic-v1:0'
voice
Voice ID for audio output. Default is 'matthew'
temperature
Controls randomness in responses. Default is 0.7
top_p
Nucleus sampling parameter. Default is 0.9
max_tokens
Maximum tokens in response. Default is 1024

Instance variables

var max_tokens : int
var model_id : str
var temperature : float
var top_p : float
var voice : str
class NovaSonicRealtime (*,
aws_secret_access_key: str | None = None,
aws_access_key_id: str | None = None,
region: str | None = None,
model: str,
config: NovaSonicConfig | None = None)
Expand source code
class NovaSonicRealtime(RealtimeBaseModel[NovaSonicEventTypes]):
    """Nova Sonic's realtime model implementation"""

    def __init__(
        self,
        *,
        aws_secret_access_key: str | None = None,
        aws_access_key_id: str | None = None,
        region: str | None = None,
        model: str,
        config: NovaSonicConfig | None = None,
    ) -> None:
        """
        Initialize Nova Sonic realtime model.

        Args:
            aws_access_key_id (str | None, optional): AWS access key ID. Defaults to None.
            aws_secret_access_key (str | None, optional): AWS secret access key. Defaults to None.
            region (str | None, optional): AWS region for Bedrock. Defaults to None.
            model (str): The Nova Sonic model identifier.
            config (NovaSonicConfig | None, optional): Optional configuration object for customizing model behavior. Defaults to None.
        """
        super().__init__()
        self.model = model
        self.config = config or NovaSonicConfig()
        self.region = region or os.getenv("AWS_DEFAULT_REGION")
        self.aws_access_key_id = aws_access_key_id or os.getenv(
            "AWS_ACCESS_KEY_ID")
        self.aws_secret_access_key = aws_secret_access_key or os.getenv(
            "AWS_SECRET_ACCESS_KEY"
        )

        if not self.region:
            self.emit(
                "error",
                "AWS region is required (pass as parameter or set AWS_DEFAULT_REGIONenvironment variable)",
            )
            raise ValueError(
                "AWS region is required (pass as parameter or set AWS_DEFAULT_REGIONenvironment variable)"
            )
        if not self.aws_access_key_id or not self.aws_secret_access_key:
            self.emit(
                "error",
                "AWS credentials required (pass as parameters or set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables)",
            )
            raise ValueError(
                "AWS credentials required (pass as parameters or set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables)"
            )

        self.bedrock_client = None
        self.stream = None
        self._closing = False
        self._instructions = "You are a helpful assistant. The user and you will engage in a spoken dialog exchanging the transcripts of a natural real-time conversation. Keep your responses short, generally two or three sentences for chatty scenarios."
        self._tools = []
        self.tools_formatted = []
        self.loop = asyncio.get_event_loop()
        self.audio_track = None
        self.prompt_name = str(uuid.uuid4())
        self.system_content_name = f"system_{str(uuid.uuid4())}"
        self.audio_content_name = f"audio_{str(uuid.uuid4())}"
        self.is_active = False
        self.response_task = None
        self._agent_speaking = False
        self._initialize_bedrock_client()
        self.input_sample_rate = 48000
        self.target_sample_rate = 16000

    def set_agent(self, agent: Agent) -> None:
        self._instructions = agent.instructions
        self._tools = agent.tools
        self.tools_formatted = [
            build_nova_sonic_schema(tool)
            for tool in self._tools
            if is_function_tool(tool)
        ]
        self.formatted_tools = self.tools_formatted

    def _initialize_bedrock_client(self):
        """Initialize the Bedrock client with manual credential handling"""
        try:
            if self.region:
                os.environ["AWS_REGION"] = self.region
            if self.aws_access_key_id:
                os.environ["AWS_ACCESS_KEY_ID"] = self.aws_access_key_id
            if self.aws_secret_access_key:
                os.environ["AWS_SECRET_ACCESS_KEY"] = self.aws_secret_access_key

            config = Config(
                endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
                region=self.region,
                aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
                http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
                http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()},
            )
            self.bedrock_client = BedrockRuntimeClient(config=config)

        except Exception as e:
            self.emit("error", f"Error initializing Bedrock client: {e}")
            raise

    async def connect(self) -> None:
        """Initialize connection to Nova Sonic"""
        if self.is_active:
            await self._cleanup()

        self._closing = False

        try:
            self.loop = asyncio.get_event_loop()
            self.stream = (
                await self.bedrock_client.invoke_model_with_bidirectional_stream(
                    InvokeModelWithBidirectionalStreamOperationInput(
                        model_id=self.config.model_id
                    )
                )
            )
            self.is_active = True

            session_start_payload = {
                "event": {
                    "sessionStart": {
                        "inferenceConfiguration": {
                            "maxTokens": self.config.max_tokens,
                            "topP": self.config.top_p,
                            "temperature": self.config.temperature,
                        }
                    }
                }
            }
            await self._send_event(json.dumps(session_start_payload))

            prompt_start_event_dict = {
                "event": {
                    "promptStart": {
                        "promptName": self.prompt_name,
                        "textOutputConfiguration": {"mediaType": "text/plain"},
                        "audioOutputConfiguration": {
                            "mediaType": "audio/lpcm",
                            "sampleRateHertz": NOVA_OUTPUT_SAMPLE_RATE,
                            "sampleSizeBits": 16,
                            "channelCount": 1,
                            "voiceId": self.config.voice,
                            "encoding": "base64",
                            "audioType": "SPEECH",
                        },
                    }
                }
            }

            if self.tools_formatted:
                prompt_start_event_dict["event"]["promptStart"][
                    "toolUseOutputConfiguration"
                ] = {"mediaType": "application/json"}
                prompt_start_event_dict["event"]["promptStart"]["toolConfiguration"] = {
                    "tools": self.tools_formatted
                }

            await self._send_event(json.dumps(prompt_start_event_dict))

            system_content_start_payload = {
                "event": {
                    "contentStart": {
                        "promptName": self.prompt_name,
                        "contentName": self.system_content_name,
                        "type": "TEXT",
                        "interactive": True,
                        "role": "SYSTEM",
                        "textInputConfiguration": {"mediaType": "text/plain"},
                    }
                }
            }
            await self._send_event(json.dumps(system_content_start_payload))

            system_instructions = (
                self._instructions
                or "You are a helpful voice assistant. Keep your responses short and conversational."
            )
            text_input_payload = {
                "event": {
                    "textInput": {
                        "promptName": self.prompt_name,
                        "contentName": self.system_content_name,
                        "content": system_instructions,
                    }
                }
            }
            await self._send_event(json.dumps(text_input_payload))

            content_end_payload = {
                "event": {
                    "contentEnd": {
                        "promptName": self.prompt_name,
                        "contentName": self.system_content_name,
                    }
                }
            }
            await self._send_event(json.dumps(content_end_payload))

            self.response_task = asyncio.create_task(self._process_responses())

            await self._start_audio_input()

        except Exception as e:
            await self._cleanup()
            raise

    async def _send_event(self, event_json: str):
        """Send an event to the bidirectional stream"""
        if not self.is_active or not self.stream:
            return

        try:
            event = InvokeModelWithBidirectionalStreamInputChunk(
                value=BidirectionalInputPayloadPart(
                    bytes_=event_json.encode("utf-8"))
            )

            await self.stream.input_stream.send(event)

        except Exception as e:
            self.emit("error", f"Error sending event: {e}")

    async def _start_audio_input(self):
        """Start audio input stream"""
        if not self.is_active:
            return

        audio_content_start_payload = {
            "event": {
                "contentStart": {
                    "promptName": self.prompt_name,
                    "contentName": self.audio_content_name,
                    "type": "AUDIO",
                    "interactive": True,
                    "role": "USER",
                    "audioInputConfiguration": {
                        "mediaType": "audio/lpcm",
                        "sampleRateHertz": NOVA_INPUT_SAMPLE_RATE,
                        "sampleSizeBits": 16,
                        "channelCount": 1,
                        "audioType": "SPEECH",
                        "encoding": "base64",
                    },
                }
            }
        }
        await self._send_event(json.dumps(audio_content_start_payload))

    async def handle_audio_input(self, audio_data: bytes) -> None:
        """Handle incoming 48kHz audio from VideoSDK"""
        if not self.is_active or self._closing:
            return

        try:
            audio_array = np.frombuffer(audio_data, dtype=np.int16)

            if len(audio_array) % 2 == 0:
                audio_array = audio_array.reshape(-1, 2)
                audio_array = np.mean(audio_array, axis=1).astype(np.int16)

            target_length = int(
                len(audio_array) * self.target_sample_rate /
                self.input_sample_rate
            )
            resampled_float = signal.resample(
                audio_array.astype(np.float32), target_length
            )

            resampled_int16 = np.clip(
                resampled_float, -32768, 32767).astype(np.int16)
            resampled_bytes = resampled_int16.tobytes()

            encoded_audio = base64.b64encode(resampled_bytes).decode("utf-8")

            audio_event_payload = {
                "event": {
                    "audioInput": {
                        "promptName": self.prompt_name,
                        "contentName": self.audio_content_name,
                        "content": encoded_audio,
                    }
                }
            }

            await self._send_event(json.dumps(audio_event_payload))

        except Exception as e:
            self.emit("error", f"Resampling error: {e}")

    async def _process_responses(self):
        """Process responses from the bidirectional stream"""
        try:
            while self.is_active and not self._closing:
                try:
                    output = await self.stream.await_output()
                    result = await output[1].receive()

                    if result.value and result.value.bytes_:
                        response_data = result.value.bytes_.decode("utf-8")

                        try:
                            json_data = json.loads(response_data)

                            if "event" in json_data:
                                event_keys = list(json_data["event"].keys())

                                if "completionStart" in json_data["event"]:
                                    completion_start = json_data["event"][
                                        "completionStart"
                                    ]

                                elif "contentStart" in json_data["event"]:
                                    content_start = json_data["event"]["contentStart"]

                                    if "additionalModelFields" in content_start:
                                        try:
                                            additional_fields = json.loads(
                                                content_start["additionalModelFields"]
                                            )
                                        except (json.JSONDecodeError, KeyError) as e:
                                            self.emit(
                                                "error",
                                                f"Error parsing additionalModelFields: {e}",
                                            )
                                elif "textOutput" in json_data["event"]:
                                    text_output = json_data["event"]["textOutput"]
                                    if "content" in text_output:
                                        transcript = text_output["content"]
                                        role = text_output.get(
                                            "role", "UNKNOWN")
                                        if role == "USER":
                                            await realtime_metrics_collector.set_user_speech_start()
                                            await realtime_metrics_collector.set_user_transcript(
                                                transcript
                                            )
                                            await realtime_metrics_collector.set_user_speech_end()
                                            self._safe_emit(
                                                "user_speech_started", {
                                                    "type": "done"}
                                            )
                                            try:
                                                await self.emit(
                                                    "realtime_model_transcription",
                                                    {
                                                        "role": "user",
                                                        "text": transcript,
                                                        "is_final": True,
                                                    },
                                                )
                                            except Exception:
                                                pass
                                        elif role == "ASSISTANT":
                                            skip_emit = False
                                            try:
                                                parsed = json.loads(transcript)
                                                if (
                                                    isinstance(parsed, dict)
                                                    and parsed.get("interrupted")
                                                    is True
                                                ):
                                                    skip_emit = True
                                            except Exception:
                                                pass
                                            if not skip_emit:
                                                await realtime_metrics_collector.set_agent_response(
                                                    transcript
                                                )
                                                try:
                                                    await self.emit(
                                                        "realtime_model_transcription",
                                                        {
                                                            "role": "agent",
                                                            "text": transcript,
                                                            "is_final": True,
                                                        },
                                                    )
                                                except Exception:
                                                    pass

                                elif "audioOutput" in json_data["event"]:
                                    audio_output = json_data["event"]["audioOutput"]
                                    if "content" not in audio_output:
                                        continue

                                    audio_content = audio_output["content"]
                                    if not audio_content:
                                        continue

                                    try:
                                        audio_bytes = base64.b64decode(
                                            audio_content)
                                        if not self._agent_speaking:
                                            await realtime_metrics_collector.set_agent_speech_start()
                                            self._agent_speaking = True

                                        if (
                                            self.audio_track
                                            and self.loop
                                            and not self._closing
                                        ):
                                            asyncio.create_task(
                                                self.audio_track.add_new_bytes(
                                                    audio_bytes
                                                )
                                            )

                                    except Exception as e:
                                        self.emit(
                                            "error", f"AUDIO PROCESSING ERROR: {e}"
                                        )

                                elif "contentEnd" in json_data["event"]:
                                    content_end = json_data["event"]["contentEnd"]
                                    if (
                                        content_end.get(
                                            "stopReason", "") == "END_TURN"
                                        and self._agent_speaking
                                    ):
                                        await realtime_metrics_collector.set_agent_speech_end(
                                            timeout=1.0
                                        )
                                        self._agent_speaking = False

                                elif "usageEvent" in json_data["event"]:
                                    pass

                                elif "toolUse" in json_data["event"]:
                                    tool_use = json_data["event"]["toolUse"]
                                    await realtime_metrics_collector.add_tool_call(
                                        tool_use["toolName"]
                                    )
                                    asyncio.create_task(
                                        self._execute_tool_and_send_result(
                                            tool_use)
                                    )

                                elif "completionEnd" in json_data["event"]:
                                    completion_end = json_data["event"]["completionEnd"]
                                    print(
                                        f"Nova completionEnd received: {json.dumps(completion_end, indent=2)}"
                                    )
                                    await realtime_metrics_collector.set_agent_speech_end(
                                        timeout=1.0
                                    )
                                    self._agent_speaking = False

                                else:
                                    print(
                                        f"Unhandled event type from Nova: {event_keys} - {json.dumps(json_data['event'], indent=2)}"
                                    )
                            else:
                                print(f"Non-event response: {json_data}")

                        except json.JSONDecodeError as e:
                            self.emit(
                                "error", f"Failed to parse response: {e}")
                            self.emit(
                                "error", f"Raw data: {response_data[:200]}...")

                except Exception as e:
                    self.emit("error", f"Error processing response: {e}")
                    if not self.is_active or self._closing:
                        break

        except Exception as e:
            print(f"Unexpected error in response processing: {e}")

    async def send_message(self, message: str) -> None:
        """Send a text message to the model"""
        if not self.is_active or self._closing:
            return

        try:
            text_content_name = f"text_{str(uuid.uuid4())}"

            text_content_start_payload = {
                "event": {
                    "contentStart": {
                        "promptName": self.prompt_name,
                        "contentName": text_content_name,
                        "type": "TEXT",
                        "interactive": True,
                        "role": "USER",
                        "textInputConfiguration": {"mediaType": "text/plain"},
                    }
                }
            }
            await self._send_event(json.dumps(text_content_start_payload))

            text_input_payload = {
                "event": {
                    "textInput": {
                        "promptName": self.prompt_name,
                        "contentName": text_content_name,
                        "content": message,
                    }
                }
            }
            await self._send_event(json.dumps(text_input_payload))

            content_end_payload = {
                "event": {
                    "contentEnd": {
                        "promptName": self.prompt_name,
                        "contentName": text_content_name,
                    }
                }
            }
            await self._send_event(json.dumps(content_end_payload))

        except Exception as e:
            self.emit("error", f"Error sending message: {e}")

    async def emit(self, event_type: NovaSonicEventTypes, data: Dict[str, Any]) -> None:
        """Emit an event to subscribers"""
        try:
            super().emit(event_type, data)
        except Exception as e:
            self.emit("error", f"Error in emit for {event_type}: {e}")

    def _safe_emit(self, event_type: NovaSonicEventTypes, data: Dict[str, Any]) -> None:
        """Safely emit an event without requiring await"""
        try:
            if self.loop and not self.loop.is_closed():
                asyncio.run_coroutine_threadsafe(
                    self.emit(event_type, data), self.loop)
        except Exception as e:
            self.emit(
                "error", f"Error safely emitting event {event_type}: {e}")

    async def interrupt(self) -> None:
        """Interrupt current response"""
        if not self.is_active or self._closing:
            return

        if self.audio_track:
            self.audio_track.interrupt()
        await realtime_metrics_collector.set_interrupted()
        if self._agent_speaking:
            print("Interrupting agent speech, calling set_agent_speech_end")
            await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
            self._agent_speaking = False

        content_end_payload = {
            "event": {
                "contentEnd": {
                    "promptName": self.prompt_name,
                    "contentName": self.audio_content_name,
                }
            }
        }
        await self._send_event(json.dumps(content_end_payload))
        print(f"Sent contentEnd for {self.audio_content_name}")

        self.audio_content_name = f"audio_{str(uuid.uuid4())}"
        await self._start_audio_input()

    async def _cleanup(self) -> None:
        """Clean up resources"""
        if not self.is_active:
            return

        try:
            audio_content_end_payload = {
                "event": {
                    "contentEnd": {
                        "promptName": self.prompt_name,
                        "contentName": self.audio_content_name,
                    }
                }
            }
            await self._send_event(json.dumps(audio_content_end_payload))

            prompt_end_payload = {
                "event": {"promptEnd": {"promptName": self.prompt_name}}
            }
            await self._send_event(json.dumps(prompt_end_payload))

            session_end_payload = {"event": {"sessionEnd": {}}}
            await self._send_event(json.dumps(session_end_payload))

            if self.stream and hasattr(self.stream, "input_stream"):
                await self.stream.input_stream.close()
        except Exception as e:
            self.emit("error", f"Error during cleanup: {e}")
        finally:
            self.is_active = False

            if self.response_task and not self.response_task.done():
                self.response_task.cancel()
                try:
                    await self.response_task
                except asyncio.CancelledError:
                    pass
                print("Cancelled response task")

            self.stream = None

    async def aclose(self) -> None:
        """Clean up all resources"""
        if self._closing:
            return

        self._closing = True

        await self._cleanup()

        if self.audio_track:
            if hasattr(self.audio_track, "cleanup"):
                try:
                    await self.audio_track.cleanup()
                except Exception as e:
                    self.emit("error", f"Error cleaning up audio track: {e}")
            self.audio_track = None

    async def _execute_tool_and_send_result(
        self, tool_use_event: Dict[str, Any]
    ) -> None:
        """Executes a tool and sends the result back to Nova Sonic."""
        tool_name = tool_use_event.get("toolName")
        tool_use_id = tool_use_event.get("toolUseId")
        tool_input_str = tool_use_event.get("content", "{}")

        if not tool_name or not tool_use_id:
            self.emit(
                "error",
                f"Error: Missing toolName or toolUseId in toolUse event: {tool_use_event}",
            )
            return

        try:
            tool_input_args = json.loads(tool_input_str)
        except json.JSONDecodeError as e:
            self.emit(
                "error",
                f"Error decoding tool input JSON: {e}. Input string: {tool_input_str}",
            )
            return

        target_tool: Optional[FunctionTool] = None
        for tool in self._tools:
            if is_function_tool(tool):
                tool_info = get_tool_info(tool)
                if tool_info.name == tool_name:
                    target_tool = tool
                    break

        if not target_tool:
            self.emit(
                "error", f"Error: Tool '{tool_name}' not found in registered tools."
            )
            return

        try:
            result = await target_tool(**tool_input_args)
            result_content_str = json.dumps(result)

            tool_content_name = f"tool_result_{str(uuid.uuid4())}"

            tool_content_start_dict = {
                "event": {
                    "contentStart": {
                        "promptName": self.prompt_name,
                        "contentName": tool_content_name,
                        "interactive": False,
                        "type": "TOOL",
                        "role": "TOOL",
                        "toolResultInputConfiguration": {
                            "toolUseId": tool_use_id,
                            "type": "TEXT",
                            "textInputConfiguration": {"mediaType": "text/plain"},
                        },
                    }
                }
            }
            await self._send_event(json.dumps(tool_content_start_dict))

            tool_result_event_dict = {
                "event": {
                    "toolResult": {
                        "promptName": self.prompt_name,
                        "contentName": tool_content_name,
                        "content": result_content_str,
                    }
                }
            }
            await self._send_event(json.dumps(tool_result_event_dict))

            tool_content_end_payload = {
                "event": {
                    "contentEnd": {
                        "promptName": self.prompt_name,
                        "contentName": tool_content_name,
                    }
                }
            }
            await self._send_event(json.dumps(tool_content_end_payload))

        except Exception as e:
            self.emit(
                "error", f"Error executing tool {tool_name} or sending result: {e}"
            )

Nova Sonic's realtime model implementation

Initialize Nova Sonic realtime model.

Args

aws_access_key_id : str | None, optional
AWS access key ID. Defaults to None.
aws_secret_access_key : str | None, optional
AWS secret access key. Defaults to None.
region : str | None, optional
AWS region for Bedrock. Defaults to None.
model : str
The Nova Sonic model identifier.
config : NovaSonicConfig | None, optional
Optional configuration object for customizing model behavior. Defaults to None.

Ancestors

  • videosdk.agents.realtime_base_model.RealtimeBaseModel
  • videosdk.agents.event_emitter.EventEmitter
  • typing.Generic
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Clean up all resources"""
    if self._closing:
        return

    self._closing = True

    await self._cleanup()

    if self.audio_track:
        if hasattr(self.audio_track, "cleanup"):
            try:
                await self.audio_track.cleanup()
            except Exception as e:
                self.emit("error", f"Error cleaning up audio track: {e}")
        self.audio_track = None

Clean up all resources

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Initialize connection to Nova Sonic"""
    if self.is_active:
        await self._cleanup()

    self._closing = False

    try:
        self.loop = asyncio.get_event_loop()
        self.stream = (
            await self.bedrock_client.invoke_model_with_bidirectional_stream(
                InvokeModelWithBidirectionalStreamOperationInput(
                    model_id=self.config.model_id
                )
            )
        )
        self.is_active = True

        session_start_payload = {
            "event": {
                "sessionStart": {
                    "inferenceConfiguration": {
                        "maxTokens": self.config.max_tokens,
                        "topP": self.config.top_p,
                        "temperature": self.config.temperature,
                    }
                }
            }
        }
        await self._send_event(json.dumps(session_start_payload))

        prompt_start_event_dict = {
            "event": {
                "promptStart": {
                    "promptName": self.prompt_name,
                    "textOutputConfiguration": {"mediaType": "text/plain"},
                    "audioOutputConfiguration": {
                        "mediaType": "audio/lpcm",
                        "sampleRateHertz": NOVA_OUTPUT_SAMPLE_RATE,
                        "sampleSizeBits": 16,
                        "channelCount": 1,
                        "voiceId": self.config.voice,
                        "encoding": "base64",
                        "audioType": "SPEECH",
                    },
                }
            }
        }

        if self.tools_formatted:
            prompt_start_event_dict["event"]["promptStart"][
                "toolUseOutputConfiguration"
            ] = {"mediaType": "application/json"}
            prompt_start_event_dict["event"]["promptStart"]["toolConfiguration"] = {
                "tools": self.tools_formatted
            }

        await self._send_event(json.dumps(prompt_start_event_dict))

        system_content_start_payload = {
            "event": {
                "contentStart": {
                    "promptName": self.prompt_name,
                    "contentName": self.system_content_name,
                    "type": "TEXT",
                    "interactive": True,
                    "role": "SYSTEM",
                    "textInputConfiguration": {"mediaType": "text/plain"},
                }
            }
        }
        await self._send_event(json.dumps(system_content_start_payload))

        system_instructions = (
            self._instructions
            or "You are a helpful voice assistant. Keep your responses short and conversational."
        )
        text_input_payload = {
            "event": {
                "textInput": {
                    "promptName": self.prompt_name,
                    "contentName": self.system_content_name,
                    "content": system_instructions,
                }
            }
        }
        await self._send_event(json.dumps(text_input_payload))

        content_end_payload = {
            "event": {
                "contentEnd": {
                    "promptName": self.prompt_name,
                    "contentName": self.system_content_name,
                }
            }
        }
        await self._send_event(json.dumps(content_end_payload))

        self.response_task = asyncio.create_task(self._process_responses())

        await self._start_audio_input()

    except Exception as e:
        await self._cleanup()
        raise

Initialize connection to Nova Sonic

async def emit(self, event_type: NovaSonicEventTypes, data: Dict[str, Any]) ‑> None
Expand source code
async def emit(self, event_type: NovaSonicEventTypes, data: Dict[str, Any]) -> None:
    """Emit an event to subscribers"""
    try:
        super().emit(event_type, data)
    except Exception as e:
        self.emit("error", f"Error in emit for {event_type}: {e}")

Emit an event to subscribers

async def handle_audio_input(self, audio_data: bytes) ‑> None
Expand source code
async def handle_audio_input(self, audio_data: bytes) -> None:
    """Handle incoming 48kHz audio from VideoSDK"""
    if not self.is_active or self._closing:
        return

    try:
        audio_array = np.frombuffer(audio_data, dtype=np.int16)

        if len(audio_array) % 2 == 0:
            audio_array = audio_array.reshape(-1, 2)
            audio_array = np.mean(audio_array, axis=1).astype(np.int16)

        target_length = int(
            len(audio_array) * self.target_sample_rate /
            self.input_sample_rate
        )
        resampled_float = signal.resample(
            audio_array.astype(np.float32), target_length
        )

        resampled_int16 = np.clip(
            resampled_float, -32768, 32767).astype(np.int16)
        resampled_bytes = resampled_int16.tobytes()

        encoded_audio = base64.b64encode(resampled_bytes).decode("utf-8")

        audio_event_payload = {
            "event": {
                "audioInput": {
                    "promptName": self.prompt_name,
                    "contentName": self.audio_content_name,
                    "content": encoded_audio,
                }
            }
        }

        await self._send_event(json.dumps(audio_event_payload))

    except Exception as e:
        self.emit("error", f"Resampling error: {e}")

Handle incoming 48kHz audio from VideoSDK

async def interrupt(self) ‑> None
Expand source code
async def interrupt(self) -> None:
    """Interrupt current response"""
    if not self.is_active or self._closing:
        return

    if self.audio_track:
        self.audio_track.interrupt()
    await realtime_metrics_collector.set_interrupted()
    if self._agent_speaking:
        print("Interrupting agent speech, calling set_agent_speech_end")
        await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
        self._agent_speaking = False

    content_end_payload = {
        "event": {
            "contentEnd": {
                "promptName": self.prompt_name,
                "contentName": self.audio_content_name,
            }
        }
    }
    await self._send_event(json.dumps(content_end_payload))
    print(f"Sent contentEnd for {self.audio_content_name}")

    self.audio_content_name = f"audio_{str(uuid.uuid4())}"
    await self._start_audio_input()

Interrupt current response

async def send_message(self, message: str) ‑> None
Expand source code
async def send_message(self, message: str) -> None:
    """Send a text message to the model"""
    if not self.is_active or self._closing:
        return

    try:
        text_content_name = f"text_{str(uuid.uuid4())}"

        text_content_start_payload = {
            "event": {
                "contentStart": {
                    "promptName": self.prompt_name,
                    "contentName": text_content_name,
                    "type": "TEXT",
                    "interactive": True,
                    "role": "USER",
                    "textInputConfiguration": {"mediaType": "text/plain"},
                }
            }
        }
        await self._send_event(json.dumps(text_content_start_payload))

        text_input_payload = {
            "event": {
                "textInput": {
                    "promptName": self.prompt_name,
                    "contentName": text_content_name,
                    "content": message,
                }
            }
        }
        await self._send_event(json.dumps(text_input_payload))

        content_end_payload = {
            "event": {
                "contentEnd": {
                    "promptName": self.prompt_name,
                    "contentName": text_content_name,
                }
            }
        }
        await self._send_event(json.dumps(content_end_payload))

    except Exception as e:
        self.emit("error", f"Error sending message: {e}")

Send a text message to the model

def set_agent(self, agent: Agent) ‑> None
Expand source code
def set_agent(self, agent: Agent) -> None:
    self._instructions = agent.instructions
    self._tools = agent.tools
    self.tools_formatted = [
        build_nova_sonic_schema(tool)
        for tool in self._tools
        if is_function_tool(tool)
    ]
    self.formatted_tools = self.tools_formatted