Module agents.llm

Sub-modules

agents.llm.chat_context

Backward-compatibility shim. The implementation now lives in the videosdk.agents.llm.context package. Import from there for new code.

agents.llm.context
agents.llm.context_window

Backward-compatibility shim. ContextWindow now lives in the videosdk.agents.llm.context package.

agents.llm.fallback_llm
agents.llm.format_converters
agents.llm.llm

Classes

class AgentConfigUpdate (**data: Any)
Expand source code
class AgentConfigUpdate(_ChatItemBase):
    """Records a mid-conversation change to an agent's instructions or tools.

    Structural item — excluded from provider conversion; feeds active-config
    resolution.
    """

    id: str = Field(default_factory=lambda: f"cfgupd_{uuid.uuid4().hex[:12]}")
    type: Literal["agent_config_update"] = "agent_config_update"
    instructions: Optional[str] = None
    tools: Optional[List[str]] = None

Records a mid-conversation change to an agent's instructions or tools.

Structural item — excluded from provider conversion; feeds active-config resolution.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • agents.llm.context.items._ChatItemBase
  • pydantic.main.BaseModel

Class variables

var id : str
var instructions : str | None
var model_config
var tools : List[str] | None
var type : Literal['agent_config_update']
class AgentHandoff (**data: Any)
Expand source code
class AgentHandoff(_ChatItemBase):
    """Records a transfer of control between agents.

    Structural item — excluded from provider conversion.
    """

    id: str = Field(default_factory=lambda: f"handoff_{uuid.uuid4().hex[:12]}")
    type: Literal["agent_handoff"] = "agent_handoff"
    from_agent: Optional[str] = None
    to_agent: str
    reason: Optional[str] = None

Records a transfer of control between agents.

Structural item — excluded from provider conversion.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • agents.llm.context.items._ChatItemBase
  • pydantic.main.BaseModel

Class variables

var from_agent : str | None
var id : str
var model_config
var reason : str | None
var to_agent : str
var type : Literal['agent_handoff']
class ChatContext (items: Optional[List[ChatItem]] = None)
Expand source code
class ChatContext:
    """
    Manages a conversation context for LLM interactions.
    """

    def __init__(self, items: Optional[List[ChatItem]] = None):
        """
        Initialize the chat context.

        Args:
            items (Optional[List[ChatItem]]): Initial list of chat items. If None, starts with empty context.
        """
        self._items: List[ChatItem] = items or []

    @classmethod
    def empty(cls) -> ChatContext:
        """
        Create an empty chat context.

        Returns:
            ChatContext: A new empty chat context instance.
        """
        return cls([])

    @property
    def items(self) -> List[ChatItem]:
        """
        Get all items in the context.

        Returns:
            List[ChatItem]: List of all conversation items (messages, function calls, outputs).
        """
        return self._items

    def messages(self) -> List[ChatMessage]:
        """
        Return only ChatMessage items, filtering out function calls and outputs.

        Returns:
            List[ChatMessage]: List of all chat messages in the context.
        """
        return [item for item in self._items if isinstance(item, ChatMessage)]

    def turn_count(self) -> int:
        """
        Count the number of user turns (user-assistant exchange pairs).

        Returns:
            int: Number of user messages in the context.
        """
        return sum(
            1 for item in self._items
            if isinstance(item, ChatMessage) and item.role == ChatRole.USER
        )

    def estimated_tokens(self) -> int:
        """
        Rough token estimate for the current context using a ~4 chars per token heuristic.
        Good enough for budget decisions — not a replacement for provider-reported usage.

        Returns:
            int: Estimated token count.
        """
        total = 0
        for item in self._items:
            total += self._estimate_item_tokens(item)
        return total

    def _estimate_item_tokens(self, item: ChatItem) -> int:
        """Estimate tokens for a single chat item."""
        tokens = 4
        if isinstance(item, ChatMessage):
            parts = item.content if isinstance(item.content, list) else [item.content]
            for part in parts:
                if part is None:
                    continue
                if isinstance(part, str):
                    tokens += len(part) // 4
                elif isinstance(part, ImageContent):
                    tokens += 300
        elif isinstance(item, FunctionCall):
            tokens += len(item.name) // 4 + 5
            if item.arguments:
                tokens += len(item.arguments) // 4
        elif isinstance(item, FunctionCallOutput):
            tokens += len(item.name) // 4 + 5
            if item.output:
                tokens += len(item.output) // 4
        return tokens

    def add_message(
        self,
        role: ChatRole,
        content: Union[str, List[ChatContent]],
        message_id: Optional[str] = None,
        created_at: Optional[float] = None,
        replace: bool = False,
        agent_id: Optional[str] = None,
    ) -> ChatMessage:
        """Add a new message to the context."""
        if replace and role == ChatRole.SYSTEM:
            self._items = [
                item for item in self._items
                if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM)
            ]

        if isinstance(content, str):
            content = [content]

        message = ChatMessage(
            role=role,
            content=content,
            id=message_id or f"msg_{uuid.uuid4().hex[:12]}",
            created_at=created_at or time.time(),
            agent_id=agent_id,
        )
        self._items.append(message)
        return message

    def add_function_call(
        self,
        name: str,
        arguments: str,
        call_id: Optional[str] = None,
        agent_id: Optional[str] = None,
        metadata: Optional[dict] = None,
    ) -> FunctionCall:
        """Add a function call to the context.

        ``metadata`` carries provider-specific per-call data — notably the
        Gemini ``thought_signature`` — which must travel with this exact call
        when the context is later converted for the provider.
        """
        call = FunctionCall(
            name=name,
            arguments=arguments,
            call_id=call_id or f"call_{uuid.uuid4().hex[:12]}",
            agent_id=agent_id,
            metadata=metadata,
        )
        self._items.append(call)
        return call

    def add_function_output(
        self,
        name: str,
        output: str,
        call_id: str,
        is_error: bool = False,
        agent_id: Optional[str] = None,
    ) -> FunctionCallOutput:
        """Add a function output to the context."""
        function_output = FunctionCallOutput(
            name=name,
            output=output,
            call_id=call_id,
            is_error=is_error,
            agent_id=agent_id,
        )
        self._items.append(function_output)
        return function_output

    def add_handoff(
        self,
        to_agent: str,
        from_agent: Optional[str] = None,
        reason: Optional[str] = None,
    ) -> AgentHandoff:
        """Record a transfer of control between agents."""
        handoff = AgentHandoff(from_agent=from_agent, to_agent=to_agent, reason=reason)
        self._items.append(handoff)
        return handoff

    def add_config_update(
        self,
        instructions: Optional[str] = None,
        tools: Optional[List[str]] = None,
        agent_id: Optional[str] = None,
    ) -> AgentConfigUpdate:
        """Record a mid-conversation change to instructions or tools."""
        update = AgentConfigUpdate(
            instructions=instructions, tools=tools, agent_id=agent_id
        )
        self._items.append(update)
        return update

    def get_by_id(self, item_id: str) -> Optional[ChatItem]:
        """
        Find an item by its ID.

        Args:
            item_id (str): The ID of the item to find.

        Returns:
            Optional[ChatItem]: The found item or None if not found.
        """
        return next(
            (item for item in self._items if item.id == item_id),
            None
        )

    def insert(self, item: ChatItem) -> ChatItem:
        """Insert an item at the position determined by its ``created_at``."""
        pos = len(self._items)
        for i, existing in enumerate(self._items):
            if existing.created_at > item.created_at:
                pos = i
                break
        self._items.insert(pos, item)
        return item

    def insert_many(self, items: List[ChatItem]) -> None:
        """Batch-insert items, each placed in timestamp order."""
        for item in sorted(items, key=lambda it: it.created_at):
            self.insert(item)

    def active_config_at(
        self, target: Union[str, int, None] = None
    ) -> tuple[Optional[str], Optional[List[str]]]:
        """Resolve the effective (instructions, valid_tools) at a point in the context.

        Walks SYSTEM/DEVELOPER messages and AgentConfigUpdate items from the
        start up to and including ``target``.

        Args:
            target: An item id, a list index, or None for the end of the context.

        Returns:
            tuple: (instructions, valid_tools). Either element may be None.
        """
        if target is None:
            end = len(self._items)
        elif isinstance(target, int):
            end = target + 1
        else:
            end = next(
                (i + 1 for i, item in enumerate(self._items) if item.id == target),
                len(self._items),
            )

        instructions: Optional[str] = None
        tools: Optional[List[str]] = None
        for item in self._items[:end]:
            if isinstance(item, ChatMessage) and item.role in (
                ChatRole.SYSTEM,
                ChatRole.DEVELOPER,
            ):
                if isinstance(item.content, str):
                    instructions = item.content
                else:
                    instructions = " ".join(
                        p for p in item.content if isinstance(p, str)
                    )
            elif isinstance(item, AgentConfigUpdate):
                if item.instructions is not None:
                    instructions = item.instructions
                if item.tools is not None:
                    tools = item.tools
        return instructions, tools

    def copy(
        self,
        *,
        exclude_system_messages: bool = False,
        exclude_instructions: bool = False,
        exclude_empty_messages: bool = False,
        exclude_handoffs: bool = False,
        exclude_config_updates: bool = False,
        tools: Optional[List[FunctionTool]] = None,
        filter_agent_id: Optional[str] = None,
    ) -> ChatContext:
        """Create a filtered copy of the chat context.

        Args:
            exclude_system_messages: Drop SYSTEM-role messages.
            exclude_instructions: Drop SYSTEM- and DEVELOPER-role messages.
            exclude_empty_messages: Drop messages with no meaningful content.
            exclude_handoffs: Drop AgentHandoff items.
            exclude_config_updates: Drop AgentConfigUpdate items.
            tools: Tool-scoping for function calls/outputs. ``None`` (the
                default) keeps every function call/output; an empty list drops
                them all; a non-empty list keeps only calls/outputs whose tool
                is in the list.
            filter_agent_id: When given, keep only items with this agent_id.

        Returns:
            ChatContext: A new ChatContext with the filtered items.
        """
        items: List[ChatItem] = []
        valid_tool_names = {
            get_tool_info(tool).name
            for tool in (tools or [])
            if is_function_tool(tool)
        }

        for item in self._items:
            if isinstance(item, ChatMessage):
                if exclude_system_messages and item.role == ChatRole.SYSTEM:
                    continue
                if exclude_instructions and item.role in (
                    ChatRole.SYSTEM,
                    ChatRole.DEVELOPER,
                ):
                    continue
                if exclude_empty_messages and not self._has_content(item):
                    continue

            if exclude_handoffs and isinstance(item, AgentHandoff):
                continue
            if exclude_config_updates and isinstance(item, AgentConfigUpdate):
                continue

            if tools is not None and isinstance(
                item, (FunctionCall, FunctionCallOutput)
            ):
                if item.name not in valid_tool_names:
                    continue

            if filter_agent_id is not None:
                _structural = (
                    isinstance(item, (AgentHandoff, AgentConfigUpdate))
                    or (
                        isinstance(item, ChatMessage)
                        and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)
                    )
                )
                if not _structural and item.agent_id != filter_agent_id:
                    continue

            items.append(item)

        return ChatContext(items)

    @staticmethod
    def _has_content(msg: ChatMessage) -> bool:
        """Return True if the message has any non-empty content."""
        if isinstance(msg.content, str):
            return bool(msg.content.strip())
        if isinstance(msg.content, list):
            for part in msg.content:
                if isinstance(part, str) and part.strip():
                    return True
                if part is not None and not isinstance(part, str):
                    return True
        return False

    def fork(self) -> ChatContext:
        """Fork a complete, independent deep copy for a sub-agent.

        Returns:
            ChatContext: A new context; later mutations never touch this one.
        """
        return ChatContext([item.model_copy(deep=True) for item in self._items])

    def fork_filtered(
        self,
        recent_turns: int = 3,
        tools: Optional[List[FunctionTool]] = None,
    ) -> ChatContext:
        """Fork a context scoped to instructions + the most recent turns.

        Args:
            recent_turns: Number of recent user turns to keep (must be >= 1).
            tools: When given, function calls/outputs are limited to these tools.

        Returns:
            ChatContext: A new, independent context.
        """
        if recent_turns < 1:
            raise ValueError("recent_turns must be >= 1")

        scoped = self.copy(tools=tools)
        instruction_items = [
            item for item in scoped.items
            if isinstance(item, ChatMessage)
            and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)
        ]
        instruction_ids = {item.id for item in instruction_items}

        user_indices = [
            i for i, item in enumerate(scoped.items)
            if isinstance(item, ChatMessage) and item.role == ChatRole.USER
        ]
        if len(user_indices) > recent_turns:
            split_idx = user_indices[-recent_turns]
            tail = scoped.items[split_idx:]
        else:
            tail = scoped.items

        new_items = [item.model_copy(deep=True) for item in instruction_items]
        new_items += [
            item.model_copy(deep=True)
            for item in tail
            if item.id not in instruction_ids
        ]
        return ChatContext(new_items)

    def fork_brief(
        self,
        instructions: str,
        task_brief: Optional[str] = None,
        agent_id: Optional[str] = None,
    ) -> ChatContext:
        """Fork a fresh context: sub-agent instructions + an optional task brief.

        Args:
            instructions: System instructions for the sub-agent (required,
                non-empty).
            task_brief: Optional task-description message.
            agent_id: Attribution stamped on the created items.

        Returns:
            ChatContext: A new context with no conversation history.
        """
        if not instructions:
            raise ValueError("fork_brief() requires non-empty instructions")

        items: List[ChatItem] = [
            ChatMessage(
                role=ChatRole.SYSTEM, content=[instructions], agent_id=agent_id
            )
        ]
        if task_brief:
            items.append(
                ChatMessage(
                    role=ChatRole.USER, content=[task_brief], agent_id=agent_id
                )
            )
        return ChatContext(items)

    async def merge(self, other: ChatContext) -> ChatContext:
        """Merge a sub-agent's full transcript into this context, in-place.

        Every item from ``other`` is merged, timestamp-ordered and
        de-duplicated by id. This is the most complete merge-back; the
        ``merge_result`` and ``merge_with_summary`` variants merge less.

        Args:
            other: The sub-agent's context.

        Returns:
            ChatContext: This context instance (modified in-place).
        """
        existing_ids = {item.id for item in self._items}
        incoming = [
            item.model_copy(deep=True)
            for item in other.items
            if item.id not in existing_ids
        ]
        combined = self._items + incoming
        combined.sort(key=lambda item: item.created_at)
        self._items = combined
        return self

    async def merge_result(
        self, other: ChatContext, *, agent_id: Optional[str] = None
    ) -> ChatContext:
        """Merge only a sub-agent's final assistant message, in-place.

        Args:
            other: The sub-agent's context.
            agent_id: Attribution stamped on the merged-in message.

        Returns:
            ChatContext: This context instance (modified in-place).
        """
        final = next(
            (
                item for item in reversed(other.items)
                if isinstance(item, ChatMessage)
                and item.role == ChatRole.ASSISTANT
            ),
            None,
        )
        if final is not None:
            merged = final.model_copy(deep=True)
            if agent_id is not None:
                merged.agent_id = agent_id
            self._items.append(merged)
        return self

    async def merge_with_summary(
        self,
        other: ChatContext,
        *,
        llm: "LLM",
        agent_id: Optional[str] = None,
    ) -> ChatContext:
        """Merge an LLM-generated summary of a sub-agent's work, in-place.

        Args:
            other: The sub-agent's context.
            llm: LLM used to generate the summary (required keyword argument).
            agent_id: Attribution stamped on the summary message.

        Returns:
            ChatContext: This context instance (modified in-place).
        """
        from .window import render_items, generate_summary

        text = render_items(other.items)
        summary_text = await generate_summary(llm, text) if text.strip() else ""
        if summary_text:
            self._items.append(
                ChatMessage(
                    role=ChatRole.ASSISTANT,
                    content=[f"[Sub-agent Summary]\n{summary_text}"],
                    agent_id=agent_id,
                    extra={"summary": True},
                )
            )
        return self

    def truncate(
        self,
        max_items: int | None = None,
        max_tokens: int | None = None,
    ) -> ChatContext:
        """
        Truncate the context while preserving system message and summary messages.

        Removes oldest non-system items until both constraints are satisfied.
        Keeps function call/output pairs together to avoid orphaned tool calls.

        Args:
            max_items: Maximum number of items to keep. None means no item limit.
            max_tokens: Maximum estimated token budget. None means no token limit.

        Returns:
            ChatContext: The current context instance after truncation.
        """
        if max_items is None and max_tokens is None:
            return self

        logger.debug(f"Truncating context: {len(self._items)} items, {self.estimated_tokens()} tokens")

        # Identify protected items that must never be removed:
        # - System message (agent instructions)
        # - Summary message (compressed history)
        # - Last user message (LLMs require conversation to end with user turn)
        system_msg = next(
            (item for item in self._items
             if isinstance(item, ChatMessage)
             and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)),
            None
        )
        summary_msg = next(
            (item for item in self._items
             if isinstance(item, ChatMessage) and item.extra.get("summary")),
            None
        )
        last_user_msg = next(
            (item for item in reversed(self._items)
             if isinstance(item, ChatMessage) and item.role == ChatRole.USER),
            None
        )
        structural_items = [
            item for item in self._items
            if isinstance(item, (AgentHandoff, AgentConfigUpdate))
        ]
        protected = {
            id(m)
            for m in (system_msg, summary_msg, last_user_msg, *structural_items)
            if m is not None
        }

        # Start with all items; remove oldest non-protected until constraints met
        new_items = list(self._items)

        def _needs_trim() -> bool:
            if max_items is not None and len(new_items) > max_items:
                return True
            if max_tokens is not None:
                token_est = sum(self._estimate_item_tokens(it) for it in new_items)
                if token_est > max_tokens:
                    return True
            return False

        while _needs_trim():
            removed = False
            for i, item in enumerate(new_items):
                # Skip protected items
                if id(item) in protected:
                    continue
                # Don't orphan function call pairs — remove them together
                if isinstance(item, FunctionCall):
                    output_idx = next(
                        (j for j in range(i + 1, len(new_items))
                         if isinstance(new_items[j], FunctionCallOutput) and new_items[j].call_id == item.call_id),
                        None
                    )
                    if output_idx is not None:
                        new_items.pop(output_idx)
                        new_items.pop(i)
                    else:
                        new_items.pop(i)
                    removed = True
                    break
                elif isinstance(item, FunctionCallOutput):
                    new_items.pop(i)
                    removed = True
                    break
                else:
                    new_items.pop(i)
                    removed = True
                    break
            if not removed:
                break  # Only protected items remain — stop even if over budget

        # Clean up ALL orphaned function items (call without output, or output without call)
        call_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCall)}
        output_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCallOutput)}
        new_items = [
            item for item in new_items
            if not (
                (isinstance(item, FunctionCall) and item.call_id not in output_ids_in_list)
                or
                (isinstance(item, FunctionCallOutput) and item.call_id not in call_ids_in_list)
            )
        ]

        # Re-insert protected items if they were accidentally removed by orphan cleanup
        if system_msg and system_msg not in new_items:
            new_items.insert(0, system_msg)
        if summary_msg and summary_msg not in new_items:
            insert_pos = 1 if system_msg in new_items else 0
            new_items.insert(insert_pos, summary_msg)
        if last_user_msg and last_user_msg not in new_items:
            new_items.append(last_user_msg)

        self._items = new_items
        logger.debug(f"Truncation complete: {len(self._items)} items, {self.estimated_tokens()} tokens")
        return self

    async def summarize(
        self, llm: "LLM", *, keep_recent_turns: int = 3
    ) -> ChatContext:
        """Compress old conversation turns into a single summary message, in-place.

        Splits the context into head (older) and tail (recent). The head is
        rendered and summarized by ``llm``; structural items (system/developer
        messages, prior summaries, handoffs) are preserved.

        Args:
            llm: LLM used to generate the summary.
            keep_recent_turns: Number of recent user turns kept verbatim.

        Returns:
            ChatContext: This context instance (modified in-place).
        """
        from .window import render_items, generate_summary

        user_indices = [
            i for i, item in enumerate(self._items)
            if isinstance(item, ChatMessage) and item.role == ChatRole.USER
        ]
        if len(user_indices) <= keep_recent_turns:
            return self

        split_idx = user_indices[-keep_recent_turns]
        head = self._items[:split_idx]
        recent_items = list(self._items[split_idx:])

        def _is_structural(item: ChatItem) -> bool:
            if isinstance(item, (AgentHandoff, AgentConfigUpdate)):
                return True
            if isinstance(item, ChatMessage):
                return (
                    item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)
                    or bool(item.extra.get("summary"))
                )
            return False

        structural = [item for item in head if _is_structural(item)]
        summarizable = [item for item in head if not _is_structural(item)]
        if not summarizable:
            return self

        conversation_text = render_items(summarizable)
        if not conversation_text.strip():
            return self

        summary_text = await generate_summary(llm, conversation_text)
        if not summary_text:
            logger.warning("Compression produced empty summary")
            return self

        summary_msg = ChatMessage(
            role=ChatRole.ASSISTANT,
            content=[f"[Conversation Summary]\n{summary_text}"],
            extra={"summary": True},
        )
        self._items = structural + [summary_msg] + recent_items
        logger.info(
            f"Compressed {len(summarizable)} items into summary. "
            f"Context: {len(self._items)} items"
        )
        return self

    # ── Provider format conversions ────────────────────────────────────
    # Actual logic lives in llm/format_converters.py. These methods
    # delegate to keep the public API on ChatContext unchanged.

    def to_openai_messages(self, *, reasoning_model: bool = False) -> list[dict]:
        """Convert context to OpenAI chat completion messages format."""
        from ..format_converters import to_openai_messages
        return to_openai_messages(self, reasoning_model=reasoning_model)

    def to_anthropic_messages(self, *, caching: bool = False) -> tuple[list[dict], Optional[str]]:
        """Convert context to Anthropic messages format with role alternation enforced."""
        from ..format_converters import to_anthropic_messages
        return to_anthropic_messages(self, caching=caching)

    async def to_google_contents(self, *, thought_signatures: dict | None = None) -> tuple[list, Optional[str]]:
        """Convert context to Google Gemini contents format."""
        from ..format_converters import to_google_contents
        return await to_google_contents(self, thought_signatures=thought_signatures)

    # ── Serialization ────────────────────────────────────────────────

    def to_dict(self) -> dict:
        """Convert the context to a dictionary representation."""
        items = []
        for item in self._items:
            base = {
                "type": item.type,
                "id": item.id,
                "created_at": item.created_at,
                "agent_id": item.agent_id,
            }
            if isinstance(item, ChatMessage):
                base.update({
                    "role": item.role.value,
                    "content": item.content,
                    "interrupted": item.interrupted,
                    "extra": item.extra,
                    "confidence": item.confidence,
                    "metrics": item.metrics,
                    "audio_instructions": item.audio_instructions,
                    "text_instructions": item.text_instructions,
                })
            elif isinstance(item, FunctionCall):
                base.update({
                    "name": item.name,
                    "arguments": item.arguments,
                    "call_id": item.call_id,
                    "metadata": item.metadata,
                })
            elif isinstance(item, FunctionCallOutput):
                base.update({
                    "name": item.name,
                    "output": item.output,
                    "call_id": item.call_id,
                    "is_error": item.is_error,
                })
            elif isinstance(item, AgentHandoff):
                base.update({
                    "from_agent": item.from_agent,
                    "to_agent": item.to_agent,
                    "reason": item.reason,
                })
            elif isinstance(item, AgentConfigUpdate):
                base.update({
                    "instructions": item.instructions,
                    "tools": item.tools,
                })
            items.append(base)
        return {"items": items}

    @classmethod
    def from_dict(cls, data: dict) -> ChatContext:
        """Reconstruct a ChatContext from a dictionary representation."""
        items: List[ChatItem] = []
        for d in data["items"]:
            common = {"id": d["id"]}
            if d.get("created_at") is not None:
                common["created_at"] = d["created_at"]
            if "agent_id" in d:
                common["agent_id"] = d.get("agent_id")

            item_type = d["type"]
            if item_type == "message":
                items.append(ChatMessage(
                    role=ChatRole(d["role"]),
                    content=d["content"],
                    interrupted=d.get("interrupted", False),
                    extra=d.get("extra", {}) or {},
                    confidence=d.get("confidence"),
                    metrics=d.get("metrics"),
                    audio_instructions=d.get("audio_instructions"),
                    text_instructions=d.get("text_instructions"),
                    **common,
                ))
            elif item_type == "function_call":
                items.append(FunctionCall(
                    name=d["name"],
                    arguments=d["arguments"],
                    call_id=d["call_id"],
                    metadata=d.get("metadata"),
                    **common,
                ))
            elif item_type == "function_call_output":
                items.append(FunctionCallOutput(
                    name=d["name"],
                    output=d["output"],
                    call_id=d["call_id"],
                    is_error=d.get("is_error", False),
                    **common,
                ))
            elif item_type == "agent_handoff":
                items.append(AgentHandoff(
                    from_agent=d.get("from_agent"),
                    to_agent=d["to_agent"],
                    reason=d.get("reason"),
                    **common,
                ))
            elif item_type == "agent_config_update":
                items.append(AgentConfigUpdate(
                    instructions=d.get("instructions"),
                    tools=d.get("tools"),
                    **common,
                ))
        return cls(items)

    def cleanup(self) -> None:
        """
        Clear all chat context items and references to free memory.
        """
        logger.info(f"Cleaning up ChatContext with {len(self._items)} items")
        for item in self._items:
            if isinstance(item, ChatMessage):
                if isinstance(item.content, list):
                    for content_item in item.content:
                        if isinstance(content_item, ImageContent):
                            content_item.image = None
                item.content = None
            elif isinstance(item, FunctionCall):
                item.arguments = None
            elif isinstance(item, FunctionCallOutput):
                item.output = None
        self._items.clear()
        try:
            import gc
            gc.collect()
            logger.info("ChatContext garbage collection completed")
        except Exception as e:
            logger.error(f"Error during ChatContext garbage collection: {e}")

        logger.info("ChatContext cleanup completed")

Manages a conversation context for LLM interactions.

Initialize the chat context.

Args

items : Optional[List[ChatItem]]
Initial list of chat items. If None, starts with empty context.

Subclasses

Static methods

def empty() ‑> ChatContext

Create an empty chat context.

Returns

ChatContext
A new empty chat context instance.
def from_dict(data: dict) ‑> ChatContext

Reconstruct a ChatContext from a dictionary representation.

Instance variables

prop items : List[ChatItem]
Expand source code
@property
def items(self) -> List[ChatItem]:
    """
    Get all items in the context.

    Returns:
        List[ChatItem]: List of all conversation items (messages, function calls, outputs).
    """
    return self._items

Get all items in the context.

Returns

List[ChatItem]
List of all conversation items (messages, function calls, outputs).

Methods

def active_config_at(self, target: Union[str, int, None] = None) ‑> tuple[str | None, List[str] | None]
Expand source code
def active_config_at(
    self, target: Union[str, int, None] = None
) -> tuple[Optional[str], Optional[List[str]]]:
    """Resolve the effective (instructions, valid_tools) at a point in the context.

    Walks SYSTEM/DEVELOPER messages and AgentConfigUpdate items from the
    start up to and including ``target``.

    Args:
        target: An item id, a list index, or None for the end of the context.

    Returns:
        tuple: (instructions, valid_tools). Either element may be None.
    """
    if target is None:
        end = len(self._items)
    elif isinstance(target, int):
        end = target + 1
    else:
        end = next(
            (i + 1 for i, item in enumerate(self._items) if item.id == target),
            len(self._items),
        )

    instructions: Optional[str] = None
    tools: Optional[List[str]] = None
    for item in self._items[:end]:
        if isinstance(item, ChatMessage) and item.role in (
            ChatRole.SYSTEM,
            ChatRole.DEVELOPER,
        ):
            if isinstance(item.content, str):
                instructions = item.content
            else:
                instructions = " ".join(
                    p for p in item.content if isinstance(p, str)
                )
        elif isinstance(item, AgentConfigUpdate):
            if item.instructions is not None:
                instructions = item.instructions
            if item.tools is not None:
                tools = item.tools
    return instructions, tools

Resolve the effective (instructions, valid_tools) at a point in the context.

Walks SYSTEM/DEVELOPER messages and AgentConfigUpdate items from the start up to and including target.

Args

target
An item id, a list index, or None for the end of the context.

Returns

tuple
(instructions, valid_tools). Either element may be None.
def add_config_update(self,
instructions: Optional[str] = None,
tools: Optional[List[str]] = None,
agent_id: Optional[str] = None) ‑> AgentConfigUpdate
Expand source code
def add_config_update(
    self,
    instructions: Optional[str] = None,
    tools: Optional[List[str]] = None,
    agent_id: Optional[str] = None,
) -> AgentConfigUpdate:
    """Record a mid-conversation change to instructions or tools."""
    update = AgentConfigUpdate(
        instructions=instructions, tools=tools, agent_id=agent_id
    )
    self._items.append(update)
    return update

Record a mid-conversation change to instructions or tools.

def add_function_call(self,
name: str,
arguments: str,
call_id: Optional[str] = None,
agent_id: Optional[str] = None,
metadata: Optional[dict] = None) ‑> FunctionCall
Expand source code
def add_function_call(
    self,
    name: str,
    arguments: str,
    call_id: Optional[str] = None,
    agent_id: Optional[str] = None,
    metadata: Optional[dict] = None,
) -> FunctionCall:
    """Add a function call to the context.

    ``metadata`` carries provider-specific per-call data — notably the
    Gemini ``thought_signature`` — which must travel with this exact call
    when the context is later converted for the provider.
    """
    call = FunctionCall(
        name=name,
        arguments=arguments,
        call_id=call_id or f"call_{uuid.uuid4().hex[:12]}",
        agent_id=agent_id,
        metadata=metadata,
    )
    self._items.append(call)
    return call

Add a function call to the context.

metadata carries provider-specific per-call data — notably the Gemini thought_signature — which must travel with this exact call when the context is later converted for the provider.

def add_function_output(self,
name: str,
output: str,
call_id: str,
is_error: bool = False,
agent_id: Optional[str] = None) ‑> FunctionCallOutput
Expand source code
def add_function_output(
    self,
    name: str,
    output: str,
    call_id: str,
    is_error: bool = False,
    agent_id: Optional[str] = None,
) -> FunctionCallOutput:
    """Add a function output to the context."""
    function_output = FunctionCallOutput(
        name=name,
        output=output,
        call_id=call_id,
        is_error=is_error,
        agent_id=agent_id,
    )
    self._items.append(function_output)
    return function_output

Add a function output to the context.

def add_handoff(self,
to_agent: str,
from_agent: Optional[str] = None,
reason: Optional[str] = None) ‑> AgentHandoff
Expand source code
def add_handoff(
    self,
    to_agent: str,
    from_agent: Optional[str] = None,
    reason: Optional[str] = None,
) -> AgentHandoff:
    """Record a transfer of control between agents."""
    handoff = AgentHandoff(from_agent=from_agent, to_agent=to_agent, reason=reason)
    self._items.append(handoff)
    return handoff

Record a transfer of control between agents.

def add_message(self,
role: ChatRole,
content: Union[str, List[ChatContent]],
message_id: Optional[str] = None,
created_at: Optional[float] = None,
replace: bool = False,
agent_id: Optional[str] = None) ‑> ChatMessage
Expand source code
def add_message(
    self,
    role: ChatRole,
    content: Union[str, List[ChatContent]],
    message_id: Optional[str] = None,
    created_at: Optional[float] = None,
    replace: bool = False,
    agent_id: Optional[str] = None,
) -> ChatMessage:
    """Add a new message to the context."""
    if replace and role == ChatRole.SYSTEM:
        self._items = [
            item for item in self._items
            if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM)
        ]

    if isinstance(content, str):
        content = [content]

    message = ChatMessage(
        role=role,
        content=content,
        id=message_id or f"msg_{uuid.uuid4().hex[:12]}",
        created_at=created_at or time.time(),
        agent_id=agent_id,
    )
    self._items.append(message)
    return message

Add a new message to the context.

def cleanup(self) ‑> None
Expand source code
def cleanup(self) -> None:
    """
    Clear all chat context items and references to free memory.
    """
    logger.info(f"Cleaning up ChatContext with {len(self._items)} items")
    for item in self._items:
        if isinstance(item, ChatMessage):
            if isinstance(item.content, list):
                for content_item in item.content:
                    if isinstance(content_item, ImageContent):
                        content_item.image = None
            item.content = None
        elif isinstance(item, FunctionCall):
            item.arguments = None
        elif isinstance(item, FunctionCallOutput):
            item.output = None
    self._items.clear()
    try:
        import gc
        gc.collect()
        logger.info("ChatContext garbage collection completed")
    except Exception as e:
        logger.error(f"Error during ChatContext garbage collection: {e}")

    logger.info("ChatContext cleanup completed")

Clear all chat context items and references to free memory.

def copy(self,
*,
exclude_system_messages: bool = False,
exclude_instructions: bool = False,
exclude_empty_messages: bool = False,
exclude_handoffs: bool = False,
exclude_config_updates: bool = False,
tools: Optional[List[FunctionTool]] = None,
filter_agent_id: Optional[str] = None) ‑> ChatContext
Expand source code
def copy(
    self,
    *,
    exclude_system_messages: bool = False,
    exclude_instructions: bool = False,
    exclude_empty_messages: bool = False,
    exclude_handoffs: bool = False,
    exclude_config_updates: bool = False,
    tools: Optional[List[FunctionTool]] = None,
    filter_agent_id: Optional[str] = None,
) -> ChatContext:
    """Create a filtered copy of the chat context.

    Args:
        exclude_system_messages: Drop SYSTEM-role messages.
        exclude_instructions: Drop SYSTEM- and DEVELOPER-role messages.
        exclude_empty_messages: Drop messages with no meaningful content.
        exclude_handoffs: Drop AgentHandoff items.
        exclude_config_updates: Drop AgentConfigUpdate items.
        tools: Tool-scoping for function calls/outputs. ``None`` (the
            default) keeps every function call/output; an empty list drops
            them all; a non-empty list keeps only calls/outputs whose tool
            is in the list.
        filter_agent_id: When given, keep only items with this agent_id.

    Returns:
        ChatContext: A new ChatContext with the filtered items.
    """
    items: List[ChatItem] = []
    valid_tool_names = {
        get_tool_info(tool).name
        for tool in (tools or [])
        if is_function_tool(tool)
    }

    for item in self._items:
        if isinstance(item, ChatMessage):
            if exclude_system_messages and item.role == ChatRole.SYSTEM:
                continue
            if exclude_instructions and item.role in (
                ChatRole.SYSTEM,
                ChatRole.DEVELOPER,
            ):
                continue
            if exclude_empty_messages and not self._has_content(item):
                continue

        if exclude_handoffs and isinstance(item, AgentHandoff):
            continue
        if exclude_config_updates and isinstance(item, AgentConfigUpdate):
            continue

        if tools is not None and isinstance(
            item, (FunctionCall, FunctionCallOutput)
        ):
            if item.name not in valid_tool_names:
                continue

        if filter_agent_id is not None:
            _structural = (
                isinstance(item, (AgentHandoff, AgentConfigUpdate))
                or (
                    isinstance(item, ChatMessage)
                    and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)
                )
            )
            if not _structural and item.agent_id != filter_agent_id:
                continue

        items.append(item)

    return ChatContext(items)

Create a filtered copy of the chat context.

Args

exclude_system_messages
Drop SYSTEM-role messages.
exclude_instructions
Drop SYSTEM- and DEVELOPER-role messages.
exclude_empty_messages
Drop messages with no meaningful content.
exclude_handoffs
Drop AgentHandoff items.
exclude_config_updates
Drop AgentConfigUpdate items.
tools
Tool-scoping for function calls/outputs. None (the default) keeps every function call/output; an empty list drops them all; a non-empty list keeps only calls/outputs whose tool is in the list.
filter_agent_id
When given, keep only items with this agent_id.

Returns

ChatContext
A new ChatContext with the filtered items.
def estimated_tokens(self) ‑> int
Expand source code
def estimated_tokens(self) -> int:
    """
    Rough token estimate for the current context using a ~4 chars per token heuristic.
    Good enough for budget decisions — not a replacement for provider-reported usage.

    Returns:
        int: Estimated token count.
    """
    total = 0
    for item in self._items:
        total += self._estimate_item_tokens(item)
    return total

Rough token estimate for the current context using a ~4 chars per token heuristic. Good enough for budget decisions — not a replacement for provider-reported usage.

Returns

int
Estimated token count.
def fork(self) ‑> ChatContext
Expand source code
def fork(self) -> ChatContext:
    """Fork a complete, independent deep copy for a sub-agent.

    Returns:
        ChatContext: A new context; later mutations never touch this one.
    """
    return ChatContext([item.model_copy(deep=True) for item in self._items])

Fork a complete, independent deep copy for a sub-agent.

Returns

ChatContext
A new context; later mutations never touch this one.
def fork_brief(self,
instructions: str,
task_brief: Optional[str] = None,
agent_id: Optional[str] = None) ‑> ChatContext
Expand source code
def fork_brief(
    self,
    instructions: str,
    task_brief: Optional[str] = None,
    agent_id: Optional[str] = None,
) -> ChatContext:
    """Fork a fresh context: sub-agent instructions + an optional task brief.

    Args:
        instructions: System instructions for the sub-agent (required,
            non-empty).
        task_brief: Optional task-description message.
        agent_id: Attribution stamped on the created items.

    Returns:
        ChatContext: A new context with no conversation history.
    """
    if not instructions:
        raise ValueError("fork_brief() requires non-empty instructions")

    items: List[ChatItem] = [
        ChatMessage(
            role=ChatRole.SYSTEM, content=[instructions], agent_id=agent_id
        )
    ]
    if task_brief:
        items.append(
            ChatMessage(
                role=ChatRole.USER, content=[task_brief], agent_id=agent_id
            )
        )
    return ChatContext(items)

Fork a fresh context: sub-agent instructions + an optional task brief.

Args

instructions
System instructions for the sub-agent (required, non-empty).
task_brief
Optional task-description message.
agent_id
Attribution stamped on the created items.

Returns

ChatContext
A new context with no conversation history.
def fork_filtered(self, recent_turns: int = 3, tools: Optional[List[FunctionTool]] = None) ‑> ChatContext
Expand source code
def fork_filtered(
    self,
    recent_turns: int = 3,
    tools: Optional[List[FunctionTool]] = None,
) -> ChatContext:
    """Fork a context scoped to instructions + the most recent turns.

    Args:
        recent_turns: Number of recent user turns to keep (must be >= 1).
        tools: When given, function calls/outputs are limited to these tools.

    Returns:
        ChatContext: A new, independent context.
    """
    if recent_turns < 1:
        raise ValueError("recent_turns must be >= 1")

    scoped = self.copy(tools=tools)
    instruction_items = [
        item for item in scoped.items
        if isinstance(item, ChatMessage)
        and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)
    ]
    instruction_ids = {item.id for item in instruction_items}

    user_indices = [
        i for i, item in enumerate(scoped.items)
        if isinstance(item, ChatMessage) and item.role == ChatRole.USER
    ]
    if len(user_indices) > recent_turns:
        split_idx = user_indices[-recent_turns]
        tail = scoped.items[split_idx:]
    else:
        tail = scoped.items

    new_items = [item.model_copy(deep=True) for item in instruction_items]
    new_items += [
        item.model_copy(deep=True)
        for item in tail
        if item.id not in instruction_ids
    ]
    return ChatContext(new_items)

Fork a context scoped to instructions + the most recent turns.

Args

recent_turns
Number of recent user turns to keep (must be >= 1).
tools
When given, function calls/outputs are limited to these tools.

Returns

ChatContext
A new, independent context.
def get_by_id(self, item_id: str) ‑> ChatMessage | FunctionCall | FunctionCallOutput | AgentHandoff | AgentConfigUpdate | None
Expand source code
def get_by_id(self, item_id: str) -> Optional[ChatItem]:
    """
    Find an item by its ID.

    Args:
        item_id (str): The ID of the item to find.

    Returns:
        Optional[ChatItem]: The found item or None if not found.
    """
    return next(
        (item for item in self._items if item.id == item_id),
        None
    )

Find an item by its ID.

Args

item_id : str
The ID of the item to find.

Returns

Optional[ChatItem]
The found item or None if not found.
def insert(self, item: ChatItem) ‑> ChatMessage | FunctionCall | FunctionCallOutput | AgentHandoff | AgentConfigUpdate
Expand source code
def insert(self, item: ChatItem) -> ChatItem:
    """Insert an item at the position determined by its ``created_at``."""
    pos = len(self._items)
    for i, existing in enumerate(self._items):
        if existing.created_at > item.created_at:
            pos = i
            break
    self._items.insert(pos, item)
    return item

Insert an item at the position determined by its created_at.

def insert_many(self, items: List[ChatItem]) ‑> None
Expand source code
def insert_many(self, items: List[ChatItem]) -> None:
    """Batch-insert items, each placed in timestamp order."""
    for item in sorted(items, key=lambda it: it.created_at):
        self.insert(item)

Batch-insert items, each placed in timestamp order.

async def merge(self,
other: ChatContext) ‑> ChatContext
Expand source code
async def merge(self, other: ChatContext) -> ChatContext:
    """Merge a sub-agent's full transcript into this context, in-place.

    Every item from ``other`` is merged, timestamp-ordered and
    de-duplicated by id. This is the most complete merge-back; the
    ``merge_result`` and ``merge_with_summary`` variants merge less.

    Args:
        other: The sub-agent's context.

    Returns:
        ChatContext: This context instance (modified in-place).
    """
    existing_ids = {item.id for item in self._items}
    incoming = [
        item.model_copy(deep=True)
        for item in other.items
        if item.id not in existing_ids
    ]
    combined = self._items + incoming
    combined.sort(key=lambda item: item.created_at)
    self._items = combined
    return self

Merge a sub-agent's full transcript into this context, in-place.

Every item from other is merged, timestamp-ordered and de-duplicated by id. This is the most complete merge-back; the merge_result and merge_with_summary variants merge less.

Args

other
The sub-agent's context.

Returns

ChatContext
This context instance (modified in-place).
async def merge_result(self,
other: ChatContext,
*,
agent_id: Optional[str] = None) ‑> ChatContext
Expand source code
async def merge_result(
    self, other: ChatContext, *, agent_id: Optional[str] = None
) -> ChatContext:
    """Merge only a sub-agent's final assistant message, in-place.

    Args:
        other: The sub-agent's context.
        agent_id: Attribution stamped on the merged-in message.

    Returns:
        ChatContext: This context instance (modified in-place).
    """
    final = next(
        (
            item for item in reversed(other.items)
            if isinstance(item, ChatMessage)
            and item.role == ChatRole.ASSISTANT
        ),
        None,
    )
    if final is not None:
        merged = final.model_copy(deep=True)
        if agent_id is not None:
            merged.agent_id = agent_id
        self._items.append(merged)
    return self

Merge only a sub-agent's final assistant message, in-place.

Args

other
The sub-agent's context.
agent_id
Attribution stamped on the merged-in message.

Returns

ChatContext
This context instance (modified in-place).
async def merge_with_summary(self,
other: ChatContext,
*,
llm: "'LLM'",
agent_id: Optional[str] = None) ‑> ChatContext
Expand source code
async def merge_with_summary(
    self,
    other: ChatContext,
    *,
    llm: "LLM",
    agent_id: Optional[str] = None,
) -> ChatContext:
    """Merge an LLM-generated summary of a sub-agent's work, in-place.

    Args:
        other: The sub-agent's context.
        llm: LLM used to generate the summary (required keyword argument).
        agent_id: Attribution stamped on the summary message.

    Returns:
        ChatContext: This context instance (modified in-place).
    """
    from .window import render_items, generate_summary

    text = render_items(other.items)
    summary_text = await generate_summary(llm, text) if text.strip() else ""
    if summary_text:
        self._items.append(
            ChatMessage(
                role=ChatRole.ASSISTANT,
                content=[f"[Sub-agent Summary]\n{summary_text}"],
                agent_id=agent_id,
                extra={"summary": True},
            )
        )
    return self

Merge an LLM-generated summary of a sub-agent's work, in-place.

Args

other
The sub-agent's context.
llm
LLM used to generate the summary (required keyword argument).
agent_id
Attribution stamped on the summary message.

Returns

ChatContext
This context instance (modified in-place).
def messages(self) ‑> List[ChatMessage]
Expand source code
def messages(self) -> List[ChatMessage]:
    """
    Return only ChatMessage items, filtering out function calls and outputs.

    Returns:
        List[ChatMessage]: List of all chat messages in the context.
    """
    return [item for item in self._items if isinstance(item, ChatMessage)]

Return only ChatMessage items, filtering out function calls and outputs.

Returns

List[ChatMessage]
List of all chat messages in the context.
async def summarize(self,
llm: "'LLM'",
*,
keep_recent_turns: int = 3) ‑> ChatContext
Expand source code
async def summarize(
    self, llm: "LLM", *, keep_recent_turns: int = 3
) -> ChatContext:
    """Compress old conversation turns into a single summary message, in-place.

    Splits the context into head (older) and tail (recent). The head is
    rendered and summarized by ``llm``; structural items (system/developer
    messages, prior summaries, handoffs) are preserved.

    Args:
        llm: LLM used to generate the summary.
        keep_recent_turns: Number of recent user turns kept verbatim.

    Returns:
        ChatContext: This context instance (modified in-place).
    """
    from .window import render_items, generate_summary

    user_indices = [
        i for i, item in enumerate(self._items)
        if isinstance(item, ChatMessage) and item.role == ChatRole.USER
    ]
    if len(user_indices) <= keep_recent_turns:
        return self

    split_idx = user_indices[-keep_recent_turns]
    head = self._items[:split_idx]
    recent_items = list(self._items[split_idx:])

    def _is_structural(item: ChatItem) -> bool:
        if isinstance(item, (AgentHandoff, AgentConfigUpdate)):
            return True
        if isinstance(item, ChatMessage):
            return (
                item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)
                or bool(item.extra.get("summary"))
            )
        return False

    structural = [item for item in head if _is_structural(item)]
    summarizable = [item for item in head if not _is_structural(item)]
    if not summarizable:
        return self

    conversation_text = render_items(summarizable)
    if not conversation_text.strip():
        return self

    summary_text = await generate_summary(llm, conversation_text)
    if not summary_text:
        logger.warning("Compression produced empty summary")
        return self

    summary_msg = ChatMessage(
        role=ChatRole.ASSISTANT,
        content=[f"[Conversation Summary]\n{summary_text}"],
        extra={"summary": True},
    )
    self._items = structural + [summary_msg] + recent_items
    logger.info(
        f"Compressed {len(summarizable)} items into summary. "
        f"Context: {len(self._items)} items"
    )
    return self

Compress old conversation turns into a single summary message, in-place.

Splits the context into head (older) and tail (recent). The head is rendered and summarized by agents.llm.llm; structural items (system/developer messages, prior summaries, handoffs) are preserved.

Args

llm
LLM used to generate the summary.
keep_recent_turns
Number of recent user turns kept verbatim.

Returns

ChatContext
This context instance (modified in-place).
def to_anthropic_messages(self, *, caching: bool = False) ‑> tuple[list[dict], str | None]
Expand source code
def to_anthropic_messages(self, *, caching: bool = False) -> tuple[list[dict], Optional[str]]:
    """Convert context to Anthropic messages format with role alternation enforced."""
    from ..format_converters import to_anthropic_messages
    return to_anthropic_messages(self, caching=caching)

Convert context to Anthropic messages format with role alternation enforced.

def to_dict(self) ‑> dict
Expand source code
def to_dict(self) -> dict:
    """Convert the context to a dictionary representation."""
    items = []
    for item in self._items:
        base = {
            "type": item.type,
            "id": item.id,
            "created_at": item.created_at,
            "agent_id": item.agent_id,
        }
        if isinstance(item, ChatMessage):
            base.update({
                "role": item.role.value,
                "content": item.content,
                "interrupted": item.interrupted,
                "extra": item.extra,
                "confidence": item.confidence,
                "metrics": item.metrics,
                "audio_instructions": item.audio_instructions,
                "text_instructions": item.text_instructions,
            })
        elif isinstance(item, FunctionCall):
            base.update({
                "name": item.name,
                "arguments": item.arguments,
                "call_id": item.call_id,
                "metadata": item.metadata,
            })
        elif isinstance(item, FunctionCallOutput):
            base.update({
                "name": item.name,
                "output": item.output,
                "call_id": item.call_id,
                "is_error": item.is_error,
            })
        elif isinstance(item, AgentHandoff):
            base.update({
                "from_agent": item.from_agent,
                "to_agent": item.to_agent,
                "reason": item.reason,
            })
        elif isinstance(item, AgentConfigUpdate):
            base.update({
                "instructions": item.instructions,
                "tools": item.tools,
            })
        items.append(base)
    return {"items": items}

Convert the context to a dictionary representation.

async def to_google_contents(self, *, thought_signatures: dict | None = None) ‑> tuple[list, str | None]
Expand source code
async def to_google_contents(self, *, thought_signatures: dict | None = None) -> tuple[list, Optional[str]]:
    """Convert context to Google Gemini contents format."""
    from ..format_converters import to_google_contents
    return await to_google_contents(self, thought_signatures=thought_signatures)

Convert context to Google Gemini contents format.

def to_openai_messages(self, *, reasoning_model: bool = False) ‑> list[dict]
Expand source code
def to_openai_messages(self, *, reasoning_model: bool = False) -> list[dict]:
    """Convert context to OpenAI chat completion messages format."""
    from ..format_converters import to_openai_messages
    return to_openai_messages(self, reasoning_model=reasoning_model)

Convert context to OpenAI chat completion messages format.

def truncate(self, max_items: int | None = None, max_tokens: int | None = None) ‑> ChatContext
Expand source code
def truncate(
    self,
    max_items: int | None = None,
    max_tokens: int | None = None,
) -> ChatContext:
    """
    Truncate the context while preserving system message and summary messages.

    Removes oldest non-system items until both constraints are satisfied.
    Keeps function call/output pairs together to avoid orphaned tool calls.

    Args:
        max_items: Maximum number of items to keep. None means no item limit.
        max_tokens: Maximum estimated token budget. None means no token limit.

    Returns:
        ChatContext: The current context instance after truncation.
    """
    if max_items is None and max_tokens is None:
        return self

    logger.debug(f"Truncating context: {len(self._items)} items, {self.estimated_tokens()} tokens")

    # Identify protected items that must never be removed:
    # - System message (agent instructions)
    # - Summary message (compressed history)
    # - Last user message (LLMs require conversation to end with user turn)
    system_msg = next(
        (item for item in self._items
         if isinstance(item, ChatMessage)
         and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)),
        None
    )
    summary_msg = next(
        (item for item in self._items
         if isinstance(item, ChatMessage) and item.extra.get("summary")),
        None
    )
    last_user_msg = next(
        (item for item in reversed(self._items)
         if isinstance(item, ChatMessage) and item.role == ChatRole.USER),
        None
    )
    structural_items = [
        item for item in self._items
        if isinstance(item, (AgentHandoff, AgentConfigUpdate))
    ]
    protected = {
        id(m)
        for m in (system_msg, summary_msg, last_user_msg, *structural_items)
        if m is not None
    }

    # Start with all items; remove oldest non-protected until constraints met
    new_items = list(self._items)

    def _needs_trim() -> bool:
        if max_items is not None and len(new_items) > max_items:
            return True
        if max_tokens is not None:
            token_est = sum(self._estimate_item_tokens(it) for it in new_items)
            if token_est > max_tokens:
                return True
        return False

    while _needs_trim():
        removed = False
        for i, item in enumerate(new_items):
            # Skip protected items
            if id(item) in protected:
                continue
            # Don't orphan function call pairs — remove them together
            if isinstance(item, FunctionCall):
                output_idx = next(
                    (j for j in range(i + 1, len(new_items))
                     if isinstance(new_items[j], FunctionCallOutput) and new_items[j].call_id == item.call_id),
                    None
                )
                if output_idx is not None:
                    new_items.pop(output_idx)
                    new_items.pop(i)
                else:
                    new_items.pop(i)
                removed = True
                break
            elif isinstance(item, FunctionCallOutput):
                new_items.pop(i)
                removed = True
                break
            else:
                new_items.pop(i)
                removed = True
                break
        if not removed:
            break  # Only protected items remain — stop even if over budget

    # Clean up ALL orphaned function items (call without output, or output without call)
    call_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCall)}
    output_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCallOutput)}
    new_items = [
        item for item in new_items
        if not (
            (isinstance(item, FunctionCall) and item.call_id not in output_ids_in_list)
            or
            (isinstance(item, FunctionCallOutput) and item.call_id not in call_ids_in_list)
        )
    ]

    # Re-insert protected items if they were accidentally removed by orphan cleanup
    if system_msg and system_msg not in new_items:
        new_items.insert(0, system_msg)
    if summary_msg and summary_msg not in new_items:
        insert_pos = 1 if system_msg in new_items else 0
        new_items.insert(insert_pos, summary_msg)
    if last_user_msg and last_user_msg not in new_items:
        new_items.append(last_user_msg)

    self._items = new_items
    logger.debug(f"Truncation complete: {len(self._items)} items, {self.estimated_tokens()} tokens")
    return self

Truncate the context while preserving system message and summary messages.

Removes oldest non-system items until both constraints are satisfied. Keeps function call/output pairs together to avoid orphaned tool calls.

Args

max_items
Maximum number of items to keep. None means no item limit.
max_tokens
Maximum estimated token budget. None means no token limit.

Returns

ChatContext
The current context instance after truncation.
def turn_count(self) ‑> int
Expand source code
def turn_count(self) -> int:
    """
    Count the number of user turns (user-assistant exchange pairs).

    Returns:
        int: Number of user messages in the context.
    """
    return sum(
        1 for item in self._items
        if isinstance(item, ChatMessage) and item.role == ChatRole.USER
    )

Count the number of user turns (user-assistant exchange pairs).

Returns

int
Number of user messages in the context.
class ChatMessage (**data: Any)
Expand source code
class ChatMessage(_ChatItemBase):
    """A user, assistant, system, or developer utterance."""

    role: ChatRole
    content: Union[str, List[ChatContent]]
    id: str = Field(default_factory=lambda: f"msg_{uuid.uuid4().hex[:12]}")
    type: Literal["message"] = "message"
    interrupted: bool = False
    extra: dict[str, Any] = Field(default_factory=dict)
    confidence: Optional[float] = None
    metrics: Optional[dict] = None
    audio_instructions: Optional[str] = None
    text_instructions: Optional[str] = None

    def instructions_for_modality(
        self, modality: Literal["audio", "text"]
    ) -> Union[str, List[ChatContent]]:
        """Return the instruction variant for the given input modality.

        Falls back to ``content`` when no modality-specific variant is set.
        """
        if modality == "audio" and self.audio_instructions is not None:
            return self.audio_instructions
        if modality == "text" and self.text_instructions is not None:
            return self.text_instructions
        return self.content

A user, assistant, system, or developer utterance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • agents.llm.context.items._ChatItemBase
  • pydantic.main.BaseModel

Class variables

var audio_instructions : str | None
var confidence : float | None
var content : str | List[str | ImageContent]
var extra : dict[str, typing.Any]
var id : str
var interrupted : bool
var metrics : dict | None
var model_config
var roleChatRole
var text_instructions : str | None
var type : Literal['message']

Methods

def instructions_for_modality(self, modality: "Literal['audio', 'text']") ‑> str | List[str | ImageContent]
Expand source code
def instructions_for_modality(
    self, modality: Literal["audio", "text"]
) -> Union[str, List[ChatContent]]:
    """Return the instruction variant for the given input modality.

    Falls back to ``content`` when no modality-specific variant is set.
    """
    if modality == "audio" and self.audio_instructions is not None:
        return self.audio_instructions
    if modality == "text" and self.text_instructions is not None:
        return self.text_instructions
    return self.content

Return the instruction variant for the given input modality.

Falls back to content when no modality-specific variant is set.

class ChatRole (*args, **kwds)
Expand source code
class ChatRole(str, Enum):
    """Roles used in chat conversations."""

    SYSTEM = "system"
    DEVELOPER = "developer"
    USER = "user"
    ASSISTANT = "assistant"

Roles used in chat conversations.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var ASSISTANT
var DEVELOPER
var SYSTEM
var USER
class FallbackLLM (providers: List[LLM],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3,
latency_threshold_ms: float | None = None,
consecutive_latency_hits: int = 3)
Expand source code
class FallbackLLM(LLM, FallbackBase):
    """LLM wrapper that automatically fails over to backup providers on errors, latency degradation, and attempts recovery of higher-priority ones."""
    def __init__(
        self,
        providers: List[LLM],
        temporary_disable_sec: float = 60.0,
        permanent_disable_after_attempts: int = 3,
        latency_threshold_ms: Optional[float] = None,
        consecutive_latency_hits: int = 3,
    ):
        LLM.__init__(self)
        FallbackBase.__init__(
            self,
            providers,
            "LLM",
            temporary_disable_sec=temporary_disable_sec,
            permanent_disable_after_attempts=permanent_disable_after_attempts,
            latency_threshold_ms=latency_threshold_ms,
            consecutive_latency_hits=consecutive_latency_hits,
        )
        self._setup_event_listeners()
        self._setup_latency_listener()

    def _setup_event_listeners(self):
        self.active_provider.on("error", self._on_provider_error)

    def _setup_latency_listener(self):
        if self.latency_threshold_ms is None:
            return
        global_event_emitter.on("TURN_METRICS_ADDED", self._on_turn_metrics)

    def _on_turn_metrics(self, event: dict):
        metrics = event.get("metrics") or {}
        function_tools = metrics.get("functionToolsCalled") or []
        mcp_tools = metrics.get("mcpToolMetrics") or []
        if function_tools or mcp_tools:
            return

        ttft = metrics.get("ttft")
        if ttft is None:
            return
        asyncio.create_task(self._record_latency(float(ttft)))

    def _on_provider_error(self, error_msg):
        failed_p = self.active_provider
        asyncio.create_task(self._handle_async_error(str(error_msg), failed_p))

    async def _handle_async_error(self, error_msg: str, failed_provider: Any):
        switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider)
        self.emit("error", error_msg)

    async def _switch_provider(self, reason: str, failed_provider: Any = None):
        provider_to_cleanup = failed_provider if failed_provider else self.active_provider
        try:
            provider_to_cleanup.off("error", self._on_provider_error)
        except: pass

        active_before = self.active_provider
        switched = await super()._switch_provider(reason, failed_provider)
        active_after = self.active_provider
        
        if switched:
            if active_before != active_after:
                self.active_provider.on("error", self._on_provider_error)
            return True
        return False

    async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]:
        """
        Attempts to chat with current provider. 
        Loops until one succeeds or all fail.
        Checks for recovery of primary providers before starting.
        """
        self.check_recovery()

        while True:
            current_provider = self.active_provider
            try:
                async for chunk in current_provider.chat(messages, **kwargs):
                    yield chunk
                return 
            except Exception as e:
                switched = await self._switch_provider(str(e), failed_provider=current_provider)
                self.emit("error", str(e))
                if not switched:
                    raise e

    async def cancel_current_generation(self) -> None:
        await self.active_provider.cancel_current_generation()

    async def aclose(self) -> None:
        if self.latency_threshold_ms is not None:
            try:
                global_event_emitter.off("TURN_METRICS_ADDED", self._on_turn_metrics)
            except Exception:
                pass
        for p in self.providers:
            await p.aclose()
        await super().aclose()

LLM wrapper that automatically fails over to backup providers on errors, latency degradation, and attempts recovery of higher-priority ones.

Initialize the LLM base class.

Ancestors

Methods

async def chat(self,
messages: ChatContext,
**kwargs) ‑> AsyncIterator[LLMResponse]
Expand source code
async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]:
    """
    Attempts to chat with current provider. 
    Loops until one succeeds or all fail.
    Checks for recovery of primary providers before starting.
    """
    self.check_recovery()

    while True:
        current_provider = self.active_provider
        try:
            async for chunk in current_provider.chat(messages, **kwargs):
                yield chunk
            return 
        except Exception as e:
            switched = await self._switch_provider(str(e), failed_provider=current_provider)
            self.emit("error", str(e))
            if not switched:
                raise e

Attempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting.

Inherited members

class FunctionCall (**data: Any)
Expand source code
class FunctionCall(_ChatItemBase):
    """A tool invocation initiated by the language model."""

    id: str = Field(default_factory=lambda: f"call_{uuid.uuid4().hex[:12]}")
    type: Literal["function_call"] = "function_call"
    name: str
    arguments: str
    call_id: str
    metadata: Optional[dict] = None

A tool invocation initiated by the language model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • agents.llm.context.items._ChatItemBase
  • pydantic.main.BaseModel

Class variables

var arguments : str
var call_id : str
var id : str
var metadata : dict | None
var model_config
var name : str
var type : Literal['function_call']
class FunctionCallOutput (**data: Any)
Expand source code
class FunctionCallOutput(_ChatItemBase):
    """The result of a tool execution."""

    id: str = Field(default_factory=lambda: f"output_{uuid.uuid4().hex[:12]}")
    type: Literal["function_call_output"] = "function_call_output"
    name: str
    call_id: str
    output: str
    is_error: bool = False

The result of a tool execution.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • agents.llm.context.items._ChatItemBase
  • pydantic.main.BaseModel

Class variables

var call_id : str
var id : str
var is_error : bool
var model_config
var name : str
var output : str
var type : Literal['function_call_output']
class ImageContent (**data: Any)
Expand source code
class ImageContent(BaseModel):
    """Image content in a chat message."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: str = Field(default_factory=lambda: f"img_{uuid.uuid4().hex[:12]}")
    type: Literal["image"] = "image"
    image: Union[av.VideoFrame, str]
    inference_detail: Literal["auto", "high", "low"] = "auto"
    encode_options: EncodeOptions = Field(
        default_factory=lambda: EncodeOptions(
            format="JPEG",
            quality=90,
            resize_options=ResizeOptions(width=320, height=240),
        )
    )

    def to_data_url(self) -> str:
        """Convert the image to a data URL string."""
        if isinstance(self.image, str):
            return self.image
        encoded_image = images.encode(self.image, self.encode_options)
        b64_image = base64.b64encode(encoded_image).decode("utf-8")
        return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"

Image content in a chat message.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var encode_optionsEncodeOptions
var id : str
var image : av.video.frame.VideoFrame | str
var inference_detail : Literal['auto', 'high', 'low']
var model_config
var type : Literal['image']

Methods

def to_data_url(self) ‑> str
Expand source code
def to_data_url(self) -> str:
    """Convert the image to a data URL string."""
    if isinstance(self.image, str):
        return self.image
    encoded_image = images.encode(self.image, self.encode_options)
    b64_image = base64.b64encode(encoded_image).decode("utf-8")
    return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"

Convert the image to a data URL string.

class LLM
Expand source code
class LLM(EventEmitter[Literal["error"]]):
    """
    Base class for LLM implementations.
    """

    def __init__(self) -> None:
        """
        Initialize the LLM base class.
        """
        super().__init__()
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def label(self) -> str:
        """
        Get the LLM provider label.

        Returns:
            str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
        """
        return self._label

    @abstractmethod
    async def chat(
        self,
        messages: ChatContext,
        tools: list[FunctionTool] | None = None,
        conversational_graph:Optional[Any] = None,
        **kwargs: Any
    ) -> AsyncIterator[LLMResponse]:
        """
        Main method to interact with the LLM.

        Args:
            messages (ChatContext): The conversation context containing message history.
            tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use.
            conversational_graph(Any  | None, optional): GraphAdapter object for using graph methods.
            **kwargs (Any): Additional arguments specific to the LLM provider implementation.

        Returns:
            AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated.

        Raises:
            NotImplementedError: This method must be implemented by subclasses.
        """
        raise NotImplementedError

    @abstractmethod
    async def cancel_current_generation(self) -> None:
        """
        Cancel the current LLM generation if active.

        Raises:
            NotImplementedError: This method must be implemented by subclasses.
        """
        # override in subclasses
        pass

    async def aclose(self) -> None:
        """
        Cleanup resources.
        """
        logger.info(f"Cleaning up LLM: {self.label}")
        
        await self.cancel_current_generation()
        
        try:
            import gc
            gc.collect()
            logger.info(f"LLM garbage collection completed: {self.label}")
        except Exception as e:
            logger.error(f"Error during LLM garbage collection: {e}")
        
        logger.info(f"LLM cleanup completed: {self.label}")

    async def __aenter__(self) -> LLM:
        """
        Async context manager entry point.
        """
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Async context manager exit point.
        """
        await self.aclose()

Base class for LLM implementations.

Initialize the LLM base class.

Ancestors

Subclasses

Instance variables

prop label : str
Expand source code
@property
def label(self) -> str:
    """
    Get the LLM provider label.

    Returns:
        str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
    """
    return self._label

Get the LLM provider label.

Returns

str
A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """
    Cleanup resources.
    """
    logger.info(f"Cleaning up LLM: {self.label}")
    
    await self.cancel_current_generation()
    
    try:
        import gc
        gc.collect()
        logger.info(f"LLM garbage collection completed: {self.label}")
    except Exception as e:
        logger.error(f"Error during LLM garbage collection: {e}")
    
    logger.info(f"LLM cleanup completed: {self.label}")

Cleanup resources.

async def cancel_current_generation(self) ‑> None
Expand source code
@abstractmethod
async def cancel_current_generation(self) -> None:
    """
    Cancel the current LLM generation if active.

    Raises:
        NotImplementedError: This method must be implemented by subclasses.
    """
    # override in subclasses
    pass

Cancel the current LLM generation if active.

Raises

NotImplementedError
This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Optional[Any] = None,
**kwargs: Any) ‑> AsyncIterator[LLMResponse]
Expand source code
@abstractmethod
async def chat(
    self,
    messages: ChatContext,
    tools: list[FunctionTool] | None = None,
    conversational_graph:Optional[Any] = None,
    **kwargs: Any
) -> AsyncIterator[LLMResponse]:
    """
    Main method to interact with the LLM.

    Args:
        messages (ChatContext): The conversation context containing message history.
        tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use.
        conversational_graph(Any  | None, optional): GraphAdapter object for using graph methods.
        **kwargs (Any): Additional arguments specific to the LLM provider implementation.

    Returns:
        AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated.

    Raises:
        NotImplementedError: This method must be implemented by subclasses.
    """
    raise NotImplementedError

Main method to interact with the LLM.

Args

messages : ChatContext
The conversation context containing message history.
tools : list[FunctionTool] | None, optional
List of available function tools for the LLM to use.
conversational_graph(Any | None, optional): GraphAdapter object for using graph methods.
**kwargs : Any
Additional arguments specific to the LLM provider implementation.

Returns

AsyncIterator[LLMResponse]
An async iterator yielding LLMResponse objects as they're generated.

Raises

NotImplementedError
This method must be implemented by subclasses.

Inherited members

class LLMResponse (**data: Any)
Expand source code
class LLMResponse(BaseModel):
    """
    Data model to hold LLM response data.

    Attributes:
        content (str): The text content generated by the LLM.
        role (ChatRole): The role of the response (typically ASSISTANT).
        metadata (Optional[dict[str, Any]]): Additional response metadata from the LLM provider.
    """
    content: str
    role: ChatRole
    metadata: Optional[dict[str, Any]] = None

Data model to hold LLM response data.

Attributes

content : str
The text content generated by the LLM.
role : ChatRole
The role of the response (typically ASSISTANT).
metadata : Optional[dict[str, Any]]
Additional response metadata from the LLM provider.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var content : str
var metadata : dict[str, typing.Any] | None
var model_config
var roleChatRole
class ReadOnlyChatContext (items: List[ChatItem])
Expand source code
class ReadOnlyChatContext(ChatContext):
    """A read-only view over a ChatContext's items.

    All read operations (items, messages, copy, fork, get_by_id,
    active_config_at, serialization) work normally. Every mutating operation
    raises ``RuntimeError``. Used when a shared context is handed to an agent
    that must not mutate the parent.
    """

    def __init__(self, items: List[ChatItem]):
        # Hold a reference to the live items list — this is a view, not a copy.
        self._items = items

    def _readonly(self, *args, **kwargs):
        raise RuntimeError(_MUTATION_ERROR)

    async def _readonly_async(self, *args, **kwargs):
        raise RuntimeError(_MUTATION_ERROR)

    # Sync mutators
    add_message = _readonly
    add_function_call = _readonly
    add_function_output = _readonly
    add_handoff = _readonly
    add_config_update = _readonly
    insert = _readonly
    insert_many = _readonly
    truncate = _readonly
    cleanup = _readonly

    # Async mutators
    summarize = _readonly_async
    merge = _readonly_async
    merge_result = _readonly_async
    merge_with_summary = _readonly_async

A read-only view over a ChatContext's items.

All read operations (items, messages, copy, fork, get_by_id, active_config_at, serialization) work normally. Every mutating operation raises RuntimeError. Used when a shared context is handed to an agent that must not mutate the parent.

Initialize the chat context.

Args

items : Optional[List[ChatItem]]
Initial list of chat items. If None, starts with empty context.

Ancestors

Inherited members

class ResponseChunk (content: str, metadata: dict[str, Any] | None = None, role: str | None = None)
Expand source code
class ResponseChunk(str):
    """A string subclass representing a single chunk of an LLM response, carrying optional metadata and role."""
    def __new__(cls, content: str, metadata: dict[str, Any] | None = None, role: str | None = None):
        obj = super().__new__(cls, content or "")
        obj.metadata = metadata
        obj.role = role
        return obj

    @property
    def content(self) -> str:
        return str(self)

A string subclass representing a single chunk of an LLM response, carrying optional metadata and role.

Ancestors

  • builtins.str

Instance variables

prop content : str
Expand source code
@property
def content(self) -> str:
    return str(self)