Module agents.warm_transfer

Classes

class SIPDestination (routing_rule_id: str,
sip_call_to: str,
sip_call_from: str,
participant_display_name: str = 'Supervisor',
extra_options: dict[str, Any] = <factory>)
Expand source code
@dataclass
class SIPDestination:
    """The human supervisor, reached by dialing out over SIP to ``sip_call_to``.

    The outbound call is placed via ``POST /v2/sip/call`` and terminates at the
    consultation room so the agent and the human arrive together.

    Required:
      * ``routing_rule_id`` — the SIP routing rule to dial through (``routingRuleId``).
      * ``sip_call_to``     — the supervisor's number (``sipCallTo``).
      * ``sip_call_from``   — the caller-ID to present (``sipCallFrom``); must be
        a number the routing rule / trunk is authorised to send, otherwise the
        carrier may reject or drop the call.

    ``extra_options`` is forwarded verbatim to the request body
    (``recordAudio``, ``ringingTimeout``, ``headers``, ``metadata``, …);
    ``routingRuleId``, ``sipCallTo``, ``sipCallFrom`` and ``destinationRoomId``
    are set by the runner — don't put them in ``extra_options``.
    """

    routing_rule_id: str
    sip_call_to: str
    sip_call_from: str
    participant_display_name: str = "Supervisor"
    extra_options: dict[str, Any] = field(default_factory=dict)

The human supervisor, reached by dialing out over SIP to sip_call_to.

The outbound call is placed via POST /v2/sip/call and terminates at the consultation room so the agent and the human arrive together.

Required

  • routing_rule_id — the SIP routing rule to dial through (routingRuleId).
  • sip_call_to — the supervisor's number (sipCallTo).
  • sip_call_from — the caller-ID to present (sipCallFrom); must be a number the routing rule / trunk is authorised to send, otherwise the carrier may reject or drop the call.

extra_options is forwarded verbatim to the request body (recordAudio, ringingTimeout, headers, metadata, …); routingRuleId, sipCallTo, sipCallFrom and destinationRoomId are set by the runner — don't put them in extra_options.

Instance variables

var extra_options : dict[str, typing.Any]
var participant_display_name : str
var routing_rule_id : str
var sip_call_from : str
var sip_call_to : str
class WarmTransferAgent (summary: str, instructions: Optional[str] = None)
Expand source code
class WarmTransferAgent(Agent):
    """The built-in consultation agent that briefs the human supervisor.

    Used internally by :class:`WarmTransferRunner`. The runner drives three
    :class:`asyncio.Event` attributes on it:

      * ``supervisor_ready_event`` — set once the supervisor's audio is live;
        ``on_enter`` awaits it so the narration isn't delivered into a ringing
        phone.
      * ``briefing_done_event`` — set by the ``done_briefing`` tool when the
        supervisor confirms they're ready.
      * ``briefing_failed_event`` — set by the ``decline_transfer`` /
        ``voicemail_detected`` tools, the framework's VoiceMailDetector
        classifier, or the supervisor disconnecting mid-call;
        ``_briefing_failure_reason`` carries the reason (``declined`` /
        ``voicemail`` / ``supervisor_disconnected``).
    """

    def __init__(self, summary: str, instructions: Optional[str] = None):
        super().__init__(instructions=instructions or _AGENT_INSTRUCTIONS)
        self._summary = summary
        self.supervisor_ready_event: asyncio.Event = asyncio.Event()
        self.briefing_done_event: asyncio.Event = asyncio.Event()
        self.briefing_failed_event: asyncio.Event = asyncio.Event()
        self._briefing_failure_reason: Optional[str] = None

    async def on_enter(self) -> None:
        self.chat_context.add_message(
            role=ChatRole.SYSTEM, content=f"CALL SUMMARY TO BRIEF SUPERVISOR:\n{self._summary}"
        )
        logger.info("WarmTransferAgent: waiting for supervisor to join the consultation room…")
        try:
            await self.supervisor_ready_event.wait()
        except asyncio.CancelledError:
            return
        logger.info("WarmTransferAgent: supervisor joined — starting briefing narration")
        try:
            await self.session.say("Hi, I'm the AI assistant. Let me bring you up to speed on this call.")
            await self.session.reply(
                instructions=(
                    "Narrate the call summary to the supervisor clearly and concisely (no preamble), "
                    "then ask whether they're ready to take the call. Do NOT call any tool in this "
                    "message — wait for the supervisor to reply first, then apply the decision rules "
                    "from your system prompt."
                ),
                wait_for_playback=False,
            )
        except Exception as exc:
            logger.error(f"WarmTransferAgent: error while briefing supervisor: {exc}")

    async def on_exit(self) -> None:
        for evt in (self.supervisor_ready_event, self.briefing_done_event, self.briefing_failed_event):
            if not evt.is_set():
                evt.set()

    def _fail_briefing(self, reason: str) -> None:
        self._briefing_failure_reason = reason
        self.briefing_failed_event.set()

    @function_tool
    async def done_briefing(self) -> str:
        """Signal that the supervisor confirmed they're ready to take the call."""
        self._briefing_failure_reason = None
        self.briefing_done_event.set()
        return "Connecting the caller now. Stand by."

    @function_tool
    async def decline_transfer(self, reason: str) -> str:
        """Record that the supervisor declined to take the call.
        Args:
            reason: A short explanation of why the supervisor declined.
        """
        logger.info(f"WarmTransferAgent: supervisor declined transfer — reason={reason!r}")
        self._fail_briefing("declined")
        return "Understood. The caller will be kept with the primary agent."

    @function_tool
    async def voicemail_detected(self) -> str:
        """Call this when the consultation line reached voicemail / an answering machine."""
        logger.info("WarmTransferAgent: voicemail detected on supervisor leg")
        self._fail_briefing("voicemail")
        return "Voicemail detected; aborting transfer."

The built-in consultation agent that briefs the human supervisor.

Used internally by :class:WarmTransferRunner. The runner drives three :class:asyncio.Event attributes on it:

  • supervisor_ready_event — set once the supervisor's audio is live; on_enter awaits it so the narration isn't delivered into a ringing phone.
  • briefing_done_event — set by the done_briefing tool when the supervisor confirms they're ready.
  • briefing_failed_event — set by the decline_transfer / voicemail_detected tools, the framework's VoiceMailDetector classifier, or the supervisor disconnecting mid-call; _briefing_failure_reason carries the reason (declined / voicemail / supervisor_disconnected).

Ancestors

Methods

async def decline_transfer(self, reason: str) ‑> str
Expand source code
@function_tool
async def decline_transfer(self, reason: str) -> str:
    """Record that the supervisor declined to take the call.
    Args:
        reason: A short explanation of why the supervisor declined.
    """
    logger.info(f"WarmTransferAgent: supervisor declined transfer — reason={reason!r}")
    self._fail_briefing("declined")
    return "Understood. The caller will be kept with the primary agent."

Record that the supervisor declined to take the call.

Args

reason
A short explanation of why the supervisor declined.
async def done_briefing(self) ‑> str
Expand source code
@function_tool
async def done_briefing(self) -> str:
    """Signal that the supervisor confirmed they're ready to take the call."""
    self._briefing_failure_reason = None
    self.briefing_done_event.set()
    return "Connecting the caller now. Stand by."

Signal that the supervisor confirmed they're ready to take the call.

async def voicemail_detected(self) ‑> str
Expand source code
@function_tool
async def voicemail_detected(self) -> str:
    """Call this when the consultation line reached voicemail / an answering machine."""
    logger.info("WarmTransferAgent: voicemail detected on supervisor leg")
    self._fail_briefing("voicemail")
    return "Voicemail detected; aborting transfer."

Call this when the consultation line reached voicemail / an answering machine.

Inherited members

class WarmTransferConfig (destination: SIPDestination,
summary_llm: "Optional['LLM']" = None,
summary_prompt: Optional[str] = None,
briefing_pipeline_factory: "Optional[Callable[[], 'Pipeline']]" = None,
supervisor_join_timeout: float = 120.0,
briefing_timeout: float = 180.0)
Expand source code
@dataclass
class WarmTransferConfig:
    """Configuration for a SIP-to-SIP warm transfer.

    Only ``destination`` is required. ``summary_llm`` / ``summary_prompt``
    customize the call summary; the timeouts default to sane values.

    The consultation room always runs the built-in :class:`WarmTransferAgent`.
    By default its pipeline is auto-built by re-instantiating the primary
    session's STT/LLM/TTS/VAD/turn-detector classes with no arguments — this
    works when those providers read their credentials from the environment
    (the common case). It deliberately does **not** reuse the primary session's
    live component instances: the briefing session runs concurrently with the
    primary one, so a shared VAD/STT/TTS would corrupt both sessions and
    tearing the briefing session down would kill the primary's components.
    If your providers need constructor arguments (API keys, model/voice ids,
    …), pass ``briefing_pipeline_factory`` to build the consultation pipeline
    explicitly, e.g. ``lambda: Pipeline(stt=DeepgramSTT(model="nova-3"), …)``.
    """

    destination: SIPDestination
    summary_llm: Optional["LLM"] = None
    summary_prompt: Optional[str] = None
    briefing_pipeline_factory: Optional[Callable[[], "Pipeline"]] = None
    supervisor_join_timeout: float = 120.0
    briefing_timeout: float = 180.0

Configuration for a SIP-to-SIP warm transfer.

Only destination is required. summary_llm / summary_prompt customize the call summary; the timeouts default to sane values.

The consultation room always runs the built-in :class:WarmTransferAgent. By default its pipeline is auto-built by re-instantiating the primary session's STT/LLM/TTS/VAD/turn-detector classes with no arguments — this works when those providers read their credentials from the environment (the common case). It deliberately does not reuse the primary session's live component instances: the briefing session runs concurrently with the primary one, so a shared VAD/STT/TTS would corrupt both sessions and tearing the briefing session down would kill the primary's components. If your providers need constructor arguments (API keys, model/voice ids, …), pass briefing_pipeline_factory to build the consultation pipeline explicitly, e.g. lambda: Pipeline(stt=DeepgramSTT(model="nova-3"), …).

Instance variables

var briefing_pipeline_factory : Optional[Callable[[], 'Pipeline']]
var briefing_timeout : float
var destinationSIPDestination
var summary_llm : Optional['LLM']
var summary_prompt : Optional[str]
var supervisor_join_timeout : float
class WarmTransferError (*args, **kwargs)
Expand source code
class WarmTransferError(Exception):
    """Raised when a warm transfer cannot be performed or fails mid-flight."""

Raised when a warm transfer cannot be performed or fails mid-flight.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class WarmTransferPhase (*args, **kwds)
Expand source code
class WarmTransferPhase(str, Enum):
    """Lifecycle phase of a warm transfer, emitted on the ``warm_transfer`` event."""

    STARTED = "started"
    SUMMARY_GENERATING = "summary_generating"
    SUMMARY_READY = "summary_ready"
    CALLER_ON_HOLD = "caller_on_hold"
    CONSULTATION_ROOM_CREATED = "consultation_room_created"
    SUPERVISOR_DIALED = "supervisor_dialed"
    SUPERVISOR_JOINED = "supervisor_joined"
    BRIEFING_STARTED = "briefing_started"
    BRIEFING_COMPLETE = "briefing_complete"
    CALL_SWITCHED = "call_switched"
    TRANSFER_COMPLETE = "transfer_complete"
    TRANSFER_CANCELLED = "transfer_cancelled"
    TRANSFER_FAILED = "transfer_failed"

Lifecycle phase of a warm transfer, emitted on the warm_transfer event.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var BRIEFING_COMPLETE
var BRIEFING_STARTED
var CALLER_ON_HOLD
var CALL_SWITCHED
var CONSULTATION_ROOM_CREATED
var STARTED
var SUMMARY_GENERATING
var SUMMARY_READY
var SUPERVISOR_DIALED
var SUPERVISOR_JOINED
var TRANSFER_CANCELLED
var TRANSFER_COMPLETE
var TRANSFER_FAILED
class WarmTransferResult (success: bool,
phase: WarmTransferPhase,
consultation_room_id: Optional[str],
supervisor_call_id: Optional[str],
switched_call_id: Optional[str],
summary: str,
error: Optional[str] = None)
Expand source code
@dataclass
class WarmTransferResult:
    """Result returned by ``AgentSession.warm_transfer``."""

    success: bool
    phase: WarmTransferPhase
    consultation_room_id: Optional[str]
    supervisor_call_id: Optional[str]
    switched_call_id: Optional[str]
    summary: str
    error: Optional[str] = None

Result returned by AgentSession.warm_transfer.

Instance variables

var consultation_room_id : str | None
var error : str | None
var phase : agents.warm_transfer._runner.WarmTransferPhase
var success : bool
var summary : str
var supervisor_call_id : str | None
var switched_call_id : str | None
class WarmTransferRunner (session: "'AgentSession'",
config: WarmTransferConfig)
Expand source code
class WarmTransferRunner:
    """Runs the SIP-to-SIP warm-transfer state machine for one :class:`AgentSession`.

    Emits ``warm_transfer`` on the primary session (and ``WARM_TRANSFER`` on the
    global event bus) at every transition.
    """

    def __init__(self, session: "AgentSession", config: WarmTransferConfig) -> None:
        self._session = session
        self._config = config
        self._consultation_room_id: Optional[str] = None
        self._supervisor_call_id: Optional[str] = None
        self._caller_call_id: Optional[str] = None
        self._summary: str = ""
        self._briefing_ctx: Optional[JobContext] = None
        self._briefing_session: Optional["AgentSession"] = None
        self._briefing_start_task: Optional[asyncio.Task] = None
        self._primary_input_was_enabled: Optional[bool] = None
        self._participant_left_handler: Optional[Any] = None

    # ── Public entry ───────────────────────────────────────────────────

    async def run(self) -> WarmTransferResult:
        await self._emit(WarmTransferPhase.STARTED, {"start_time": time.time()})
        try:
            # Stop processing the caller's speech for the rest of the transfer,
            # before anything that can await: the caller shouldn't be able to
            # interrupt the agent's "I'm transferring you / please hold" lines or
            # spin up new turns while we're handing the call off.
            self._mute_primary_input()
            self._validate_caller_and_cache_callid()
            self._summary = await self._generate_summary()
            await self._place_caller_on_hold()

            self._consultation_room_id = await self._create_consultation_room()
            await self._emit(
                WarmTransferPhase.CONSULTATION_ROOM_CREATED,
                {"consultation_room_id": self._consultation_room_id},
            )

            await self._spawn_briefing_session()
            await self._dial_supervisor()

            if not await self._wait_for_supervisor():
                return await self._abort(
                    "supervisor_join_timeout",
                    apology="Sorry, I couldn't reach a supervisor; let me continue helping you.",
                    phase=WarmTransferPhase.TRANSFER_CANCELLED,
                )

            ev = getattr(self._briefing_session.agent, "supervisor_ready_event", None)
            if ev is not None:
                ev.set()
            await self._emit(WarmTransferPhase.SUPERVISOR_JOINED, {})

            outcome = await self._wait_for_briefing_complete()
            if outcome["status"] != "ready":
                return await self._abort(
                    outcome["status"], apology=outcome["apology"], phase=WarmTransferPhase.TRANSFER_CANCELLED
                )

            if not await self._switch_caller_to_consultation_room():
                return await self._abort(
                    "switch_failed",
                    apology="I couldn't complete the transfer; please try again in a moment.",
                    phase=WarmTransferPhase.TRANSFER_FAILED,
                )

            await self._close_briefing()
            with suppress(Exception):
                await self._session.leave()

            await self._emit(WarmTransferPhase.TRANSFER_COMPLETE, {})
            return WarmTransferResult(
                success=True,
                phase=WarmTransferPhase.TRANSFER_COMPLETE,
                consultation_room_id=self._consultation_room_id,
                supervisor_call_id=self._supervisor_call_id,
                switched_call_id=self._caller_call_id,
                summary=self._summary,
            )
        except asyncio.CancelledError:
            # The transfer task itself was cancelled (e.g. the session is
            # shutting down). Don't leave the caller permanently muted, then
            # propagate.
            self._restore_primary_input()
            raise
        except WarmTransferError as exc:
            await self._abort(str(exc), apology=_FAIL_APOLOGY, phase=WarmTransferPhase.TRANSFER_FAILED)
            raise
        except Exception as exc:
            logger.exception("Warm transfer failed with unexpected error")
            await self._abort(str(exc), apology=_FAIL_APOLOGY, phase=WarmTransferPhase.TRANSFER_FAILED)
            raise WarmTransferError(str(exc)) from exc

    def _validate_caller_and_cache_callid(self) -> None:
        room = getattr(getattr(self._session, "_job_context", None), "room", None)
        if room is None:
            raise WarmTransferError("Primary session has no active room; cannot perform warm transfer.")
        sip_manager = getattr(room, "sip_manager", None)
        session_id = getattr(room, "_session_id", None)
        if sip_manager is None or not session_id:
            raise WarmTransferError(
                "Primary room has no SIP manager / session id — warm transfer only works for SIP calls."
            )
        caller_call_id: Optional[str] = None
        try:
            info = sip_manager.fetch_call_info(session_id)
            if info:
                caller_call_id = info.get("callId")
        except Exception as exc:  
            logger.warning(f"WarmTransfer: fetch_call_info failed: {exc}")
        if caller_call_id is None:
            raise WarmTransferError(
                "Caller is not a SIP participant — warm transfer requires an active SIP (telephony) call."
            )
        self._caller_call_id = caller_call_id

    async def _generate_summary(self) -> str:
        await self._emit(WarmTransferPhase.SUMMARY_GENERATING, {})
        llm = self._config.summary_llm or getattr(self._session.pipeline, "llm", None)
        if llm is None:
            logger.warning("WarmTransfer: no LLM available for summarization; using fallback text")
            await self._emit(WarmTransferPhase.SUMMARY_READY, {"summary": _FALLBACK_SUMMARY})
            return _FALLBACK_SUMMARY
        try:
            history = self._session.get_context_history(include_function_calls=False, include_system_messages=False)
        except Exception as exc:  
            logger.warning(f"WarmTransfer: get_context_history failed: {exc}")
            history = []
        summary = await _summarize_history(llm, history, self._config.summary_prompt)
        await self._emit(WarmTransferPhase.SUMMARY_READY, {"summary": summary})
        return summary

    def _speak_on_primary(self, message: str) -> None:
        """Speak ``message`` to the caller without disturbing the transfer.

        Deliberately *not* ``AgentSession.say``: that flips ``_accept_user_input``
        back on (re-arming interruptions on the caller's leg) and blocks until the
        agent has fully stopped speaking. Here we push the message straight into
        the pipeline as a non-interruptible utterance, fire-and-forget — the
        caller stays muted and the transfer keeps moving.
        """
        with suppress(Exception):
            self._session.agent.chat_context.add_message(role=ChatRole.ASSISTANT, content=message)
        pipeline = getattr(self._session, "pipeline", None)
        if pipeline is None or not hasattr(pipeline, "send_message"):
            return
        handle = UtteranceHandle(utterance_id=f"warm_transfer_{uuid.uuid4().hex[:8]}", interruptible=False)
        with suppress(Exception):
            asyncio.create_task(pipeline.send_message(message, handle=handle))

    async def _place_caller_on_hold(self) -> None:
        # Queues after whatever the agent was already saying when it called the
        # escalation tool, so the caller hears that line in full and then the
        # hold message; doesn't block the transfer on playback.
        self._speak_on_primary("Please hold while I connect you with a supervisor.")
        await self._emit(WarmTransferPhase.CALLER_ON_HOLD, {})

    async def _create_consultation_room(self) -> str:
        ctx = self._session._job_context
        auth_token = getattr(ctx, "videosdk_auth", None)
        if not auth_token:
            raise WarmTransferError("No VideoSDK auth token available to create consultation room.")
        signaling_base_url = getattr(ctx.room_options, "signaling_base_url", "api.videosdk.live")
        logger.info("WarmTransfer: creating consultation room via POST /v2/rooms …")
        room_id = await asyncio.to_thread(JobContext.create_room_static, auth_token, signaling_base_url)
        logger.info(f"WarmTransfer: consultation room created: {room_id}")
        return room_id

    def _build_briefing_pipeline(self) -> "Pipeline":
        """Build the consultation-room pipeline with its **own** component instances.

        The briefing session runs concurrently with the primary one, so it must
        not share live STT/LLM/TTS/VAD/turn-detector instances with it: shared
        streaming state would corrupt both sessions, the briefing TTS would write
        to the primary room's audio track, and closing the briefing session would
        tear down the primary's components (the source of the runaway
        ``'NoneType' object has no attribute 'frame_size'`` VAD errors after a
        transfer). Use the caller-supplied factory if any, otherwise re-build each
        primary component from its class with no args (works for env-configured
        providers).
        """
        from ..pipeline import Pipeline

        factory = self._config.briefing_pipeline_factory
        if factory is not None:
            pipeline = factory()
            if not isinstance(pipeline, Pipeline):
                raise WarmTransferError(
                    "WarmTransferConfig.briefing_pipeline_factory must return a "
                    "videosdk.agents.Pipeline instance."
                )
            logger.info("WarmTransfer: using briefing_pipeline_factory for the consultation pipeline")
            return pipeline

        logger.info("WarmTransfer: auto-building the consultation pipeline from the primary component classes")
        primary = self._session.pipeline

        def _fresh(component: Any) -> Any:
            if component is None:
                return None
            try:
                return type(component)()
            except Exception as exc:
                raise WarmTransferError(
                    f"Could not auto-build a fresh {type(component).__name__} for the "
                    f"briefing pipeline ({exc}). Pass "
                    f"WarmTransferConfig.briefing_pipeline_factory to construct the "
                    f"consultation pipeline explicitly."
                ) from exc

        return Pipeline(
            stt=_fresh(getattr(primary, "stt", None)),
            llm=_fresh(getattr(primary, "llm", None)),
            tts=_fresh(getattr(primary, "tts", None)),
            vad=_fresh(getattr(primary, "vad", None)),
            turn_detector=_fresh(getattr(primary, "turn_detector", None)),
        )

    async def _spawn_briefing_session(self) -> None:
        """Spin up a second JobContext + AgentSession (WarmTransferAgent) for the consultation room."""
        from ..agent_session import AgentSession

        ctx = self._session._job_context
        briefing_ctx = JobContext(
            room_options=RoomOptions(
                room_id=self._consultation_room_id,
                auth_token=getattr(ctx, "videosdk_auth", None),
                name="Warm Transfer Agent",
                agent_participant_id=f"warm_transfer_{int(time.time())}",
                playground=False,
                background_audio=False,
                auto_end_session=False,
                signaling_base_url=getattr(ctx.room_options, "signaling_base_url", "api.videosdk.live"),
            )
        )
        self._briefing_ctx = briefing_ctx
        token = _set_current_job_context(briefing_ctx)
        try:
            briefing_pipeline = self._build_briefing_pipeline()
            vmd_llm = self._config.summary_llm or getattr(briefing_pipeline, "llm", None)
            vmd = VoiceMailDetector(llm=vmd_llm, callback=self._on_voicemail_detected) if vmd_llm else None
            briefing_session = AgentSession(
                agent=WarmTransferAgent(summary=self._summary),
                pipeline=briefing_pipeline,
                voice_mail_detector=vmd,
            )
        finally:
            _reset_current_job_context(token)

        self._briefing_session = briefing_session
        await briefing_ctx.connect()

        token = _set_current_job_context(briefing_ctx)
        try:
            self._briefing_start_task = asyncio.create_task(briefing_session.start(wait_for_participant=False))
        finally:
            _reset_current_job_context(token)

        self._install_supervisor_disconnect_watcher()

    async def _on_voicemail_detected(self) -> None:
        """Callback for the briefing session's VoiceMailDetector classifier (idempotent)."""
        self._flag_briefing_failed("voicemail", "VoiceMailDetector classifier flagged the supervisor leg as voicemail")

    def _flag_briefing_failed(self, reason: str, log_msg: Optional[str] = None) -> None:
        agent = getattr(self._briefing_session, "agent", None)
        ev = getattr(agent, "briefing_failed_event", None)
        if ev is None or ev.is_set():
            return
        if log_msg:
            logger.warning(f"WarmTransfer: {log_msg}")
        setattr(agent, "_briefing_failure_reason", reason)
        ev.set()
        if reason in ("supervisor_disconnected", "voicemail"):
            with suppress(Exception):
                asyncio.create_task(self._log_supervisor_call_status())

    async def _log_supervisor_call_status(self) -> None:
        """Best-effort: log the supervisor outbound call's final SIP state.

        When the supervisor leg drops, this fetches ``GET /v2/sip/call`` for the
        consultation room and logs each call's ``status`` / ``timelog`` so the
        hangup reason (``no-answer`` / ``busy`` / ``completed`` + reason) shows up
        in the agent logs — useful since the warm-transfer code never ends that
        call itself; a ~timeout-ish drop is always on the SIP/gateway/carrier side.
        """
        room_id = self._consultation_room_id
        auth_token = getattr(getattr(self._session, "_job_context", None), "videosdk_auth", None)
        if not room_id or not auth_token:
            return
        try:
            import requests

            from ..room._sip_manager import FETCH_CALL_INFO_URL

            resp = await asyncio.to_thread(
                requests.get,
                FETCH_CALL_INFO_URL,
                headers={"Authorization": auth_token},
                params={"roomId": room_id},
            )
            calls = (resp.json() or {}).get("data", []) if getattr(resp, "ok", False) else []
            if not calls:
                logger.info(f"WarmTransfer: no SIP call records found for consultation room {room_id}")
                return
            for call in calls:
                logger.info(
                    "WarmTransfer: supervisor SIP call %s — status=%s end=%s timelog=%s",
                    call.get("callId") or call.get("id"),
                    call.get("status"),
                    call.get("end"),
                    call.get("timelog"),
                )
        except Exception as exc:
            logger.debug(f"WarmTransfer: could not fetch supervisor call status: {exc}")

    def _install_supervisor_disconnect_watcher(self) -> None:
        room = self._briefing_ctx.room if self._briefing_ctx else None
        if room is None or getattr(self._briefing_session, "agent", None) is None:
            return

        def _on_participant_left(data):
            p = data.get("participant") if isinstance(data, dict) else None
            if not _participant_in_room(p, room) or _is_agent_name(p):
                return
            self._flag_briefing_failed(
                "supervisor_disconnected", f"supervisor left the briefing room mid-call (participant={p.id})"
            )

        global_event_emitter.on("PARTICIPANT_LEFT", _on_participant_left)
        self._participant_left_handler = _on_participant_left

    async def _dial_supervisor(self) -> None:
        dest = self._config.destination
        if not isinstance(dest, SIPDestination):
            raise WarmTransferError("WarmTransferConfig.destination must be a SIPDestination")
        sip_manager = getattr(self._session._job_context.room, "sip_manager", None)
        if sip_manager is None:
            raise WarmTransferError("Primary room has no SIP manager; cannot place an outbound call.")
        kwargs: dict[str, Any] = {
            "routing_rule_id": dest.routing_rule_id,
            "sip_call_to": dest.sip_call_to,
            "sip_call_from": dest.sip_call_from,
            "destination_room_id": self._consultation_room_id,
        }
        if dest.participant_display_name and dest.participant_display_name != "Supervisor":
            kwargs["participant"] = {"name": dest.participant_display_name}
        kwargs.update(dest.extra_options)
        response = await sip_manager.async_make_outbound_call(**kwargs)
        data = response.get("data", {}) if isinstance(response, dict) else {}
        self._supervisor_call_id = data.get("callId") or data.get("id")
        logger.info(
            f"WarmTransfer: dialed supervisor {dest.sip_call_to} (callId={self._supervisor_call_id}) "
            f"into room {self._consultation_room_id}"
        )
        await self._emit(
            WarmTransferPhase.SUPERVISOR_DIALED,
            {
                "supervisor_call_id": self._supervisor_call_id,
                "consultation_room_id": self._consultation_room_id,
                "response": response,
            },
        )

    async def _wait_for_supervisor(self) -> bool:
        """Wait until the supervisor's audio is live (not just the SIP leg ringing).

        Returns ``False`` if the wait times out, or if the supervisor leg drops /
        hits voicemail before audio ever becomes live (no point waiting out the
        full ``supervisor_join_timeout`` once we know the supervisor is gone).
        """
        room = self._briefing_ctx.room if self._briefing_ctx else None
        if room is None:
            raise WarmTransferError("Briefing room not connected; cannot wait for supervisor.")
        audio_ready = asyncio.Event()

        def _on_audio_enabled(data):
            stream = data.get("stream") if isinstance(data, dict) else None
            p = data.get("participant") if isinstance(data, dict) else None
            if not (stream and getattr(stream, "kind", None) == "audio"):
                return
            if not _participant_in_room(p, room) or _is_agent_name(p):
                return
            logger.info(f"WarmTransfer: supervisor audio stream enabled (participant={p.id})")
            audio_ready.set()

        agent = getattr(self._briefing_session, "agent", None)
        failed_ev = getattr(agent, "briefing_failed_event", None)

        global_event_emitter.on("AUDIO_STREAM_ENABLED", _on_audio_enabled)
        audio_task = asyncio.create_task(audio_ready.wait())
        failed_task = asyncio.create_task(failed_ev.wait() if failed_ev is not None else asyncio.Event().wait())
        try:
            done, _ = await asyncio.wait(
                {audio_task, failed_task},
                timeout=self._config.supervisor_join_timeout,
                return_when=asyncio.FIRST_COMPLETED,
            )
            if audio_task in done:
                return True
            if failed_task in done:
                logger.warning(
                    "WarmTransfer: supervisor leg unavailable before audio became live "
                    f"(reason={getattr(agent, '_briefing_failure_reason', 'unknown')})"
                )
                return False
            logger.warning(
                f"WarmTransfer: supervisor audio did not become live within "
                f"{self._config.supervisor_join_timeout}s"
            )
            return False
        finally:
            global_event_emitter.off("AUDIO_STREAM_ENABLED", _on_audio_enabled)
            for t in (audio_task, failed_task):
                if not t.done():
                    t.cancel()
                    with suppress(asyncio.CancelledError):
                        await t

    async def _wait_for_briefing_complete(self) -> dict[str, str]:
        """Race done / failed / timeout. Returns ``{"status": ..., "apology": ...}``."""
        await self._emit(WarmTransferPhase.BRIEFING_STARTED, {})
        agent = self._briefing_session.agent
        done_ev = getattr(agent, "briefing_done_event", None)
        failed_ev = getattr(agent, "briefing_failed_event", None)

        async def _wait(ev):
            if ev is None:
                await asyncio.Event().wait()
            else:
                await ev.wait()

        done_task = asyncio.create_task(_wait(done_ev))
        failed_task = asyncio.create_task(_wait(failed_ev))
        try:
            done, _ = await asyncio.wait(
                {done_task, failed_task}, timeout=self._config.briefing_timeout, return_when=asyncio.FIRST_COMPLETED
            )
        finally:
            for t in (done_task, failed_task):
                if not t.done():
                    t.cancel()
                    with suppress(asyncio.CancelledError):
                        await t

        if failed_task in done and done_task not in done:
            reason = getattr(agent, "_briefing_failure_reason", "briefing_failed")
            await self._emit(WarmTransferPhase.BRIEFING_COMPLETE, {"resolution": reason})
            return {"status": reason, "apology": _BRIEFING_APOLOGIES.get(reason, _DEFAULT_BRIEFING_APOLOGY)}

        if not done:
            logger.warning(
                f"WarmTransfer: briefing timeout ({self._config.briefing_timeout}s) reached; proceeding."
            )
            await self._emit(WarmTransferPhase.BRIEFING_COMPLETE, {"resolution": "timeout"})
        else:
            await self._emit(WarmTransferPhase.BRIEFING_COMPLETE, {"resolution": "ready"})
        return {"status": "ready", "apology": ""}

    async def _switch_caller_to_consultation_room(self) -> bool:
        sip_manager = getattr(self._session._job_context.room, "sip_manager", None)
        if self._caller_call_id is None or sip_manager is None:
            logger.error("WarmTransfer: no caller SIP callId / SIP manager; cannot switch")
            return False
        pid, tkn = _mint_participant_token("caller")
        try:
            response = await sip_manager.async_switch_call_room(
                call_id=self._caller_call_id,
                room_id=self._consultation_room_id,
                token=tkn,
                participant_id=pid,
            )
        except Exception as exc:  
            logger.error(f"WarmTransfer: switch_call_room failed: {exc}")
            return False
        await self._emit(
            WarmTransferPhase.CALL_SWITCHED,
            {
                "caller_call_id": self._caller_call_id,
                "consultation_room_id": self._consultation_room_id,
                "caller_participant_id": pid,
                "response": response,
            },
        )
        return True

    def _mute_primary_input(self) -> None:
        """Stop the primary session from processing the caller's stray speech while on hold.

        Idempotent: the original ``_accept_user_input`` value is captured only on
        the first call so re-muting after a ``say`` (which re-enables input)
        doesn't clobber it.
        """
        if self._primary_input_was_enabled is None:
            self._primary_input_was_enabled = bool(getattr(self._session, "_accept_user_input", True))
        with suppress(Exception):
            self._session._accept_user_input = False

    def _restore_primary_input(self) -> None:
        if self._primary_input_was_enabled is None:
            return
        with suppress(Exception):
            self._session._accept_user_input = self._primary_input_was_enabled
        self._primary_input_was_enabled = None

    async def _close_briefing(self) -> None:
        if self._participant_left_handler is not None:
            with suppress(Exception):
                global_event_emitter.off("PARTICIPANT_LEFT", self._participant_left_handler)
            self._participant_left_handler = None
        if self._briefing_session is not None:
            with suppress(Exception):
                await self._briefing_session.close()
        if self._briefing_start_task is not None and not self._briefing_start_task.done():
            self._briefing_start_task.cancel()
            with suppress(asyncio.CancelledError):
                await self._briefing_start_task
        if self._briefing_ctx is not None:
            with suppress(Exception):
                await self._briefing_ctx.shutdown()

    async def _abort(self, reason: str, *, apology: str, phase: WarmTransferPhase) -> WarmTransferResult:
        """Restore the caller, apologize, tear down the briefing session, emit ``phase``."""
        self._restore_primary_input()
        with suppress(Exception):
            asyncio.create_task(self._session.say(apology, interruptible=True))
        await self._close_briefing()
        await self._emit(phase, {"reason": reason})
        return WarmTransferResult(
            success=False,
            phase=phase,
            consultation_room_id=self._consultation_room_id,
            supervisor_call_id=self._supervisor_call_id,
            switched_call_id=self._caller_call_id,
            summary=self._summary,
            error=reason,
        )
        
    async def _emit(self, phase: WarmTransferPhase, data: dict[str, Any]) -> None:
        payload = {
            "phase": phase,
            "data": data,
            "timestamp": time.time(),
            "consultation_room_id": self._consultation_room_id,
        }
        with suppress(Exception):
            self._session.emit("warm_transfer", payload)
        with suppress(Exception):
            global_event_emitter.emit("WARM_TRANSFER", {**payload, "phase": phase.value})

Runs the SIP-to-SIP warm-transfer state machine for one :class:AgentSession.

Emits warm_transfer on the primary session (and WARM_TRANSFER on the global event bus) at every transition.

Methods

async def run(self) ‑> agents.warm_transfer._runner.WarmTransferResult
Expand source code
async def run(self) -> WarmTransferResult:
    await self._emit(WarmTransferPhase.STARTED, {"start_time": time.time()})
    try:
        # Stop processing the caller's speech for the rest of the transfer,
        # before anything that can await: the caller shouldn't be able to
        # interrupt the agent's "I'm transferring you / please hold" lines or
        # spin up new turns while we're handing the call off.
        self._mute_primary_input()
        self._validate_caller_and_cache_callid()
        self._summary = await self._generate_summary()
        await self._place_caller_on_hold()

        self._consultation_room_id = await self._create_consultation_room()
        await self._emit(
            WarmTransferPhase.CONSULTATION_ROOM_CREATED,
            {"consultation_room_id": self._consultation_room_id},
        )

        await self._spawn_briefing_session()
        await self._dial_supervisor()

        if not await self._wait_for_supervisor():
            return await self._abort(
                "supervisor_join_timeout",
                apology="Sorry, I couldn't reach a supervisor; let me continue helping you.",
                phase=WarmTransferPhase.TRANSFER_CANCELLED,
            )

        ev = getattr(self._briefing_session.agent, "supervisor_ready_event", None)
        if ev is not None:
            ev.set()
        await self._emit(WarmTransferPhase.SUPERVISOR_JOINED, {})

        outcome = await self._wait_for_briefing_complete()
        if outcome["status"] != "ready":
            return await self._abort(
                outcome["status"], apology=outcome["apology"], phase=WarmTransferPhase.TRANSFER_CANCELLED
            )

        if not await self._switch_caller_to_consultation_room():
            return await self._abort(
                "switch_failed",
                apology="I couldn't complete the transfer; please try again in a moment.",
                phase=WarmTransferPhase.TRANSFER_FAILED,
            )

        await self._close_briefing()
        with suppress(Exception):
            await self._session.leave()

        await self._emit(WarmTransferPhase.TRANSFER_COMPLETE, {})
        return WarmTransferResult(
            success=True,
            phase=WarmTransferPhase.TRANSFER_COMPLETE,
            consultation_room_id=self._consultation_room_id,
            supervisor_call_id=self._supervisor_call_id,
            switched_call_id=self._caller_call_id,
            summary=self._summary,
        )
    except asyncio.CancelledError:
        # The transfer task itself was cancelled (e.g. the session is
        # shutting down). Don't leave the caller permanently muted, then
        # propagate.
        self._restore_primary_input()
        raise
    except WarmTransferError as exc:
        await self._abort(str(exc), apology=_FAIL_APOLOGY, phase=WarmTransferPhase.TRANSFER_FAILED)
        raise
    except Exception as exc:
        logger.exception("Warm transfer failed with unexpected error")
        await self._abort(str(exc), apology=_FAIL_APOLOGY, phase=WarmTransferPhase.TRANSFER_FAILED)
        raise WarmTransferError(str(exc)) from exc