Module agents.llm
Sub-modules
agents.llm.chat_context-
Backward-compatibility shim. The implementation now lives in the
videosdk.agents.llm.contextpackage. Import from there for new code. agents.llm.contextagents.llm.context_window-
Backward-compatibility shim. ContextWindow now lives in the
videosdk.agents.llm.contextpackage. agents.llm.fallback_llmagents.llm.format_convertersagents.llm.llm
Classes
class AgentConfigUpdate (**data: Any)-
Expand source code
class AgentConfigUpdate(_ChatItemBase): """Records a mid-conversation change to an agent's instructions or tools. Structural item — excluded from provider conversion; feeds active-config resolution. """ id: str = Field(default_factory=lambda: f"cfgupd_{uuid.uuid4().hex[:12]}") type: Literal["agent_config_update"] = "agent_config_update" instructions: Optional[str] = None tools: Optional[List[str]] = NoneRecords a mid-conversation change to an agent's instructions or tools.
Structural item — excluded from provider conversion; feeds active-config resolution.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- agents.llm.context.items._ChatItemBase
- pydantic.main.BaseModel
Class variables
var id : strvar instructions : str | Nonevar model_configvar tools : List[str] | Nonevar type : Literal['agent_config_update']
class AgentHandoff (**data: Any)-
Expand source code
class AgentHandoff(_ChatItemBase): """Records a transfer of control between agents. Structural item — excluded from provider conversion. """ id: str = Field(default_factory=lambda: f"handoff_{uuid.uuid4().hex[:12]}") type: Literal["agent_handoff"] = "agent_handoff" from_agent: Optional[str] = None to_agent: str reason: Optional[str] = NoneRecords a transfer of control between agents.
Structural item — excluded from provider conversion.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- agents.llm.context.items._ChatItemBase
- pydantic.main.BaseModel
Class variables
var from_agent : str | Nonevar id : strvar model_configvar reason : str | Nonevar to_agent : strvar type : Literal['agent_handoff']
class ChatContext (items: Optional[List[ChatItem]] = None)-
Expand source code
class ChatContext: """ Manages a conversation context for LLM interactions. """ def __init__(self, items: Optional[List[ChatItem]] = None): """ Initialize the chat context. Args: items (Optional[List[ChatItem]]): Initial list of chat items. If None, starts with empty context. """ self._items: List[ChatItem] = items or [] @classmethod def empty(cls) -> ChatContext: """ Create an empty chat context. Returns: ChatContext: A new empty chat context instance. """ return cls([]) @property def items(self) -> List[ChatItem]: """ Get all items in the context. Returns: List[ChatItem]: List of all conversation items (messages, function calls, outputs). """ return self._items def messages(self) -> List[ChatMessage]: """ Return only ChatMessage items, filtering out function calls and outputs. Returns: List[ChatMessage]: List of all chat messages in the context. """ return [item for item in self._items if isinstance(item, ChatMessage)] def turn_count(self) -> int: """ Count the number of user turns (user-assistant exchange pairs). Returns: int: Number of user messages in the context. """ return sum( 1 for item in self._items if isinstance(item, ChatMessage) and item.role == ChatRole.USER ) def estimated_tokens(self) -> int: """ Rough token estimate for the current context using a ~4 chars per token heuristic. Good enough for budget decisions — not a replacement for provider-reported usage. Returns: int: Estimated token count. """ total = 0 for item in self._items: total += self._estimate_item_tokens(item) return total def _estimate_item_tokens(self, item: ChatItem) -> int: """Estimate tokens for a single chat item.""" tokens = 4 if isinstance(item, ChatMessage): parts = item.content if isinstance(item.content, list) else [item.content] for part in parts: if part is None: continue if isinstance(part, str): tokens += len(part) // 4 elif isinstance(part, ImageContent): tokens += 300 elif isinstance(item, FunctionCall): tokens += len(item.name) // 4 + 5 if item.arguments: tokens += len(item.arguments) // 4 elif isinstance(item, FunctionCallOutput): tokens += len(item.name) // 4 + 5 if item.output: tokens += len(item.output) // 4 return tokens def add_message( self, role: ChatRole, content: Union[str, List[ChatContent]], message_id: Optional[str] = None, created_at: Optional[float] = None, replace: bool = False, agent_id: Optional[str] = None, ) -> ChatMessage: """Add a new message to the context.""" if replace and role == ChatRole.SYSTEM: self._items = [ item for item in self._items if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM) ] if isinstance(content, str): content = [content] message = ChatMessage( role=role, content=content, id=message_id or f"msg_{uuid.uuid4().hex[:12]}", created_at=created_at or time.time(), agent_id=agent_id, ) self._items.append(message) return message def add_function_call( self, name: str, arguments: str, call_id: Optional[str] = None, agent_id: Optional[str] = None, metadata: Optional[dict] = None, ) -> FunctionCall: """Add a function call to the context. ``metadata`` carries provider-specific per-call data — notably the Gemini ``thought_signature`` — which must travel with this exact call when the context is later converted for the provider. """ call = FunctionCall( name=name, arguments=arguments, call_id=call_id or f"call_{uuid.uuid4().hex[:12]}", agent_id=agent_id, metadata=metadata, ) self._items.append(call) return call def add_function_output( self, name: str, output: str, call_id: str, is_error: bool = False, agent_id: Optional[str] = None, ) -> FunctionCallOutput: """Add a function output to the context.""" function_output = FunctionCallOutput( name=name, output=output, call_id=call_id, is_error=is_error, agent_id=agent_id, ) self._items.append(function_output) return function_output def add_handoff( self, to_agent: str, from_agent: Optional[str] = None, reason: Optional[str] = None, ) -> AgentHandoff: """Record a transfer of control between agents.""" handoff = AgentHandoff(from_agent=from_agent, to_agent=to_agent, reason=reason) self._items.append(handoff) return handoff def add_config_update( self, instructions: Optional[str] = None, tools: Optional[List[str]] = None, agent_id: Optional[str] = None, ) -> AgentConfigUpdate: """Record a mid-conversation change to instructions or tools.""" update = AgentConfigUpdate( instructions=instructions, tools=tools, agent_id=agent_id ) self._items.append(update) return update def get_by_id(self, item_id: str) -> Optional[ChatItem]: """ Find an item by its ID. Args: item_id (str): The ID of the item to find. Returns: Optional[ChatItem]: The found item or None if not found. """ return next( (item for item in self._items if item.id == item_id), None ) def insert(self, item: ChatItem) -> ChatItem: """Insert an item at the position determined by its ``created_at``.""" pos = len(self._items) for i, existing in enumerate(self._items): if existing.created_at > item.created_at: pos = i break self._items.insert(pos, item) return item def insert_many(self, items: List[ChatItem]) -> None: """Batch-insert items, each placed in timestamp order.""" for item in sorted(items, key=lambda it: it.created_at): self.insert(item) def active_config_at( self, target: Union[str, int, None] = None ) -> tuple[Optional[str], Optional[List[str]]]: """Resolve the effective (instructions, valid_tools) at a point in the context. Walks SYSTEM/DEVELOPER messages and AgentConfigUpdate items from the start up to and including ``target``. Args: target: An item id, a list index, or None for the end of the context. Returns: tuple: (instructions, valid_tools). Either element may be None. """ if target is None: end = len(self._items) elif isinstance(target, int): end = target + 1 else: end = next( (i + 1 for i, item in enumerate(self._items) if item.id == target), len(self._items), ) instructions: Optional[str] = None tools: Optional[List[str]] = None for item in self._items[:end]: if isinstance(item, ChatMessage) and item.role in ( ChatRole.SYSTEM, ChatRole.DEVELOPER, ): if isinstance(item.content, str): instructions = item.content else: instructions = " ".join( p for p in item.content if isinstance(p, str) ) elif isinstance(item, AgentConfigUpdate): if item.instructions is not None: instructions = item.instructions if item.tools is not None: tools = item.tools return instructions, tools def copy( self, *, exclude_system_messages: bool = False, exclude_instructions: bool = False, exclude_empty_messages: bool = False, exclude_handoffs: bool = False, exclude_config_updates: bool = False, tools: Optional[List[FunctionTool]] = None, filter_agent_id: Optional[str] = None, ) -> ChatContext: """Create a filtered copy of the chat context. Args: exclude_system_messages: Drop SYSTEM-role messages. exclude_instructions: Drop SYSTEM- and DEVELOPER-role messages. exclude_empty_messages: Drop messages with no meaningful content. exclude_handoffs: Drop AgentHandoff items. exclude_config_updates: Drop AgentConfigUpdate items. tools: Tool-scoping for function calls/outputs. ``None`` (the default) keeps every function call/output; an empty list drops them all; a non-empty list keeps only calls/outputs whose tool is in the list. filter_agent_id: When given, keep only items with this agent_id. Returns: ChatContext: A new ChatContext with the filtered items. """ items: List[ChatItem] = [] valid_tool_names = { get_tool_info(tool).name for tool in (tools or []) if is_function_tool(tool) } for item in self._items: if isinstance(item, ChatMessage): if exclude_system_messages and item.role == ChatRole.SYSTEM: continue if exclude_instructions and item.role in ( ChatRole.SYSTEM, ChatRole.DEVELOPER, ): continue if exclude_empty_messages and not self._has_content(item): continue if exclude_handoffs and isinstance(item, AgentHandoff): continue if exclude_config_updates and isinstance(item, AgentConfigUpdate): continue if tools is not None and isinstance( item, (FunctionCall, FunctionCallOutput) ): if item.name not in valid_tool_names: continue if filter_agent_id is not None: _structural = ( isinstance(item, (AgentHandoff, AgentConfigUpdate)) or ( isinstance(item, ChatMessage) and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER) ) ) if not _structural and item.agent_id != filter_agent_id: continue items.append(item) return ChatContext(items) @staticmethod def _has_content(msg: ChatMessage) -> bool: """Return True if the message has any non-empty content.""" if isinstance(msg.content, str): return bool(msg.content.strip()) if isinstance(msg.content, list): for part in msg.content: if isinstance(part, str) and part.strip(): return True if part is not None and not isinstance(part, str): return True return False def fork(self) -> ChatContext: """Fork a complete, independent deep copy for a sub-agent. Returns: ChatContext: A new context; later mutations never touch this one. """ return ChatContext([item.model_copy(deep=True) for item in self._items]) def fork_filtered( self, recent_turns: int = 3, tools: Optional[List[FunctionTool]] = None, ) -> ChatContext: """Fork a context scoped to instructions + the most recent turns. Args: recent_turns: Number of recent user turns to keep (must be >= 1). tools: When given, function calls/outputs are limited to these tools. Returns: ChatContext: A new, independent context. """ if recent_turns < 1: raise ValueError("recent_turns must be >= 1") scoped = self.copy(tools=tools) instruction_items = [ item for item in scoped.items if isinstance(item, ChatMessage) and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER) ] instruction_ids = {item.id for item in instruction_items} user_indices = [ i for i, item in enumerate(scoped.items) if isinstance(item, ChatMessage) and item.role == ChatRole.USER ] if len(user_indices) > recent_turns: split_idx = user_indices[-recent_turns] tail = scoped.items[split_idx:] else: tail = scoped.items new_items = [item.model_copy(deep=True) for item in instruction_items] new_items += [ item.model_copy(deep=True) for item in tail if item.id not in instruction_ids ] return ChatContext(new_items) def fork_brief( self, instructions: str, task_brief: Optional[str] = None, agent_id: Optional[str] = None, ) -> ChatContext: """Fork a fresh context: sub-agent instructions + an optional task brief. Args: instructions: System instructions for the sub-agent (required, non-empty). task_brief: Optional task-description message. agent_id: Attribution stamped on the created items. Returns: ChatContext: A new context with no conversation history. """ if not instructions: raise ValueError("fork_brief() requires non-empty instructions") items: List[ChatItem] = [ ChatMessage( role=ChatRole.SYSTEM, content=[instructions], agent_id=agent_id ) ] if task_brief: items.append( ChatMessage( role=ChatRole.USER, content=[task_brief], agent_id=agent_id ) ) return ChatContext(items) async def merge(self, other: ChatContext) -> ChatContext: """Merge a sub-agent's full transcript into this context, in-place. Every item from ``other`` is merged, timestamp-ordered and de-duplicated by id. This is the most complete merge-back; the ``merge_result`` and ``merge_with_summary`` variants merge less. Args: other: The sub-agent's context. Returns: ChatContext: This context instance (modified in-place). """ existing_ids = {item.id for item in self._items} incoming = [ item.model_copy(deep=True) for item in other.items if item.id not in existing_ids ] combined = self._items + incoming combined.sort(key=lambda item: item.created_at) self._items = combined return self async def merge_result( self, other: ChatContext, *, agent_id: Optional[str] = None ) -> ChatContext: """Merge only a sub-agent's final assistant message, in-place. Args: other: The sub-agent's context. agent_id: Attribution stamped on the merged-in message. Returns: ChatContext: This context instance (modified in-place). """ final = next( ( item for item in reversed(other.items) if isinstance(item, ChatMessage) and item.role == ChatRole.ASSISTANT ), None, ) if final is not None: merged = final.model_copy(deep=True) if agent_id is not None: merged.agent_id = agent_id self._items.append(merged) return self async def merge_with_summary( self, other: ChatContext, *, llm: "LLM", agent_id: Optional[str] = None, ) -> ChatContext: """Merge an LLM-generated summary of a sub-agent's work, in-place. Args: other: The sub-agent's context. llm: LLM used to generate the summary (required keyword argument). agent_id: Attribution stamped on the summary message. Returns: ChatContext: This context instance (modified in-place). """ from .window import render_items, generate_summary text = render_items(other.items) summary_text = await generate_summary(llm, text) if text.strip() else "" if summary_text: self._items.append( ChatMessage( role=ChatRole.ASSISTANT, content=[f"[Sub-agent Summary]\n{summary_text}"], agent_id=agent_id, extra={"summary": True}, ) ) return self def truncate( self, max_items: int | None = None, max_tokens: int | None = None, ) -> ChatContext: """ Truncate the context while preserving system message and summary messages. Removes oldest non-system items until both constraints are satisfied. Keeps function call/output pairs together to avoid orphaned tool calls. Args: max_items: Maximum number of items to keep. None means no item limit. max_tokens: Maximum estimated token budget. None means no token limit. Returns: ChatContext: The current context instance after truncation. """ if max_items is None and max_tokens is None: return self logger.debug(f"Truncating context: {len(self._items)} items, {self.estimated_tokens()} tokens") # Identify protected items that must never be removed: # - System message (agent instructions) # - Summary message (compressed history) # - Last user message (LLMs require conversation to end with user turn) system_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)), None ) summary_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.extra.get("summary")), None ) last_user_msg = next( (item for item in reversed(self._items) if isinstance(item, ChatMessage) and item.role == ChatRole.USER), None ) structural_items = [ item for item in self._items if isinstance(item, (AgentHandoff, AgentConfigUpdate)) ] protected = { id(m) for m in (system_msg, summary_msg, last_user_msg, *structural_items) if m is not None } # Start with all items; remove oldest non-protected until constraints met new_items = list(self._items) def _needs_trim() -> bool: if max_items is not None and len(new_items) > max_items: return True if max_tokens is not None: token_est = sum(self._estimate_item_tokens(it) for it in new_items) if token_est > max_tokens: return True return False while _needs_trim(): removed = False for i, item in enumerate(new_items): # Skip protected items if id(item) in protected: continue # Don't orphan function call pairs — remove them together if isinstance(item, FunctionCall): output_idx = next( (j for j in range(i + 1, len(new_items)) if isinstance(new_items[j], FunctionCallOutput) and new_items[j].call_id == item.call_id), None ) if output_idx is not None: new_items.pop(output_idx) new_items.pop(i) else: new_items.pop(i) removed = True break elif isinstance(item, FunctionCallOutput): new_items.pop(i) removed = True break else: new_items.pop(i) removed = True break if not removed: break # Only protected items remain — stop even if over budget # Clean up ALL orphaned function items (call without output, or output without call) call_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCall)} output_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCallOutput)} new_items = [ item for item in new_items if not ( (isinstance(item, FunctionCall) and item.call_id not in output_ids_in_list) or (isinstance(item, FunctionCallOutput) and item.call_id not in call_ids_in_list) ) ] # Re-insert protected items if they were accidentally removed by orphan cleanup if system_msg and system_msg not in new_items: new_items.insert(0, system_msg) if summary_msg and summary_msg not in new_items: insert_pos = 1 if system_msg in new_items else 0 new_items.insert(insert_pos, summary_msg) if last_user_msg and last_user_msg not in new_items: new_items.append(last_user_msg) self._items = new_items logger.debug(f"Truncation complete: {len(self._items)} items, {self.estimated_tokens()} tokens") return self async def summarize( self, llm: "LLM", *, keep_recent_turns: int = 3 ) -> ChatContext: """Compress old conversation turns into a single summary message, in-place. Splits the context into head (older) and tail (recent). The head is rendered and summarized by ``llm``; structural items (system/developer messages, prior summaries, handoffs) are preserved. Args: llm: LLM used to generate the summary. keep_recent_turns: Number of recent user turns kept verbatim. Returns: ChatContext: This context instance (modified in-place). """ from .window import render_items, generate_summary user_indices = [ i for i, item in enumerate(self._items) if isinstance(item, ChatMessage) and item.role == ChatRole.USER ] if len(user_indices) <= keep_recent_turns: return self split_idx = user_indices[-keep_recent_turns] head = self._items[:split_idx] recent_items = list(self._items[split_idx:]) def _is_structural(item: ChatItem) -> bool: if isinstance(item, (AgentHandoff, AgentConfigUpdate)): return True if isinstance(item, ChatMessage): return ( item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER) or bool(item.extra.get("summary")) ) return False structural = [item for item in head if _is_structural(item)] summarizable = [item for item in head if not _is_structural(item)] if not summarizable: return self conversation_text = render_items(summarizable) if not conversation_text.strip(): return self summary_text = await generate_summary(llm, conversation_text) if not summary_text: logger.warning("Compression produced empty summary") return self summary_msg = ChatMessage( role=ChatRole.ASSISTANT, content=[f"[Conversation Summary]\n{summary_text}"], extra={"summary": True}, ) self._items = structural + [summary_msg] + recent_items logger.info( f"Compressed {len(summarizable)} items into summary. " f"Context: {len(self._items)} items" ) return self # ── Provider format conversions ──────────────────────────────────── # Actual logic lives in llm/format_converters.py. These methods # delegate to keep the public API on ChatContext unchanged. def to_openai_messages(self, *, reasoning_model: bool = False) -> list[dict]: """Convert context to OpenAI chat completion messages format.""" from ..format_converters import to_openai_messages return to_openai_messages(self, reasoning_model=reasoning_model) def to_anthropic_messages(self, *, caching: bool = False) -> tuple[list[dict], Optional[str]]: """Convert context to Anthropic messages format with role alternation enforced.""" from ..format_converters import to_anthropic_messages return to_anthropic_messages(self, caching=caching) async def to_google_contents(self, *, thought_signatures: dict | None = None) -> tuple[list, Optional[str]]: """Convert context to Google Gemini contents format.""" from ..format_converters import to_google_contents return await to_google_contents(self, thought_signatures=thought_signatures) # ── Serialization ──────────────────────────────────────────────── def to_dict(self) -> dict: """Convert the context to a dictionary representation.""" items = [] for item in self._items: base = { "type": item.type, "id": item.id, "created_at": item.created_at, "agent_id": item.agent_id, } if isinstance(item, ChatMessage): base.update({ "role": item.role.value, "content": item.content, "interrupted": item.interrupted, "extra": item.extra, "confidence": item.confidence, "metrics": item.metrics, "audio_instructions": item.audio_instructions, "text_instructions": item.text_instructions, }) elif isinstance(item, FunctionCall): base.update({ "name": item.name, "arguments": item.arguments, "call_id": item.call_id, "metadata": item.metadata, }) elif isinstance(item, FunctionCallOutput): base.update({ "name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error, }) elif isinstance(item, AgentHandoff): base.update({ "from_agent": item.from_agent, "to_agent": item.to_agent, "reason": item.reason, }) elif isinstance(item, AgentConfigUpdate): base.update({ "instructions": item.instructions, "tools": item.tools, }) items.append(base) return {"items": items} @classmethod def from_dict(cls, data: dict) -> ChatContext: """Reconstruct a ChatContext from a dictionary representation.""" items: List[ChatItem] = [] for d in data["items"]: common = {"id": d["id"]} if d.get("created_at") is not None: common["created_at"] = d["created_at"] if "agent_id" in d: common["agent_id"] = d.get("agent_id") item_type = d["type"] if item_type == "message": items.append(ChatMessage( role=ChatRole(d["role"]), content=d["content"], interrupted=d.get("interrupted", False), extra=d.get("extra", {}) or {}, confidence=d.get("confidence"), metrics=d.get("metrics"), audio_instructions=d.get("audio_instructions"), text_instructions=d.get("text_instructions"), **common, )) elif item_type == "function_call": items.append(FunctionCall( name=d["name"], arguments=d["arguments"], call_id=d["call_id"], metadata=d.get("metadata"), **common, )) elif item_type == "function_call_output": items.append(FunctionCallOutput( name=d["name"], output=d["output"], call_id=d["call_id"], is_error=d.get("is_error", False), **common, )) elif item_type == "agent_handoff": items.append(AgentHandoff( from_agent=d.get("from_agent"), to_agent=d["to_agent"], reason=d.get("reason"), **common, )) elif item_type == "agent_config_update": items.append(AgentConfigUpdate( instructions=d.get("instructions"), tools=d.get("tools"), **common, )) return cls(items) def cleanup(self) -> None: """ Clear all chat context items and references to free memory. """ logger.info(f"Cleaning up ChatContext with {len(self._items)} items") for item in self._items: if isinstance(item, ChatMessage): if isinstance(item.content, list): for content_item in item.content: if isinstance(content_item, ImageContent): content_item.image = None item.content = None elif isinstance(item, FunctionCall): item.arguments = None elif isinstance(item, FunctionCallOutput): item.output = None self._items.clear() try: import gc gc.collect() logger.info("ChatContext garbage collection completed") except Exception as e: logger.error(f"Error during ChatContext garbage collection: {e}") logger.info("ChatContext cleanup completed")Manages a conversation context for LLM interactions.
Initialize the chat context.
Args
items:Optional[List[ChatItem]]- Initial list of chat items. If None, starts with empty context.
Subclasses
Static methods
def empty() ‑> ChatContextdef from_dict(data: dict) ‑> ChatContext-
Reconstruct a ChatContext from a dictionary representation.
Instance variables
prop items : List[ChatItem]-
Expand source code
@property def items(self) -> List[ChatItem]: """ Get all items in the context. Returns: List[ChatItem]: List of all conversation items (messages, function calls, outputs). """ return self._itemsGet all items in the context.
Returns
List[ChatItem]- List of all conversation items (messages, function calls, outputs).
Methods
def active_config_at(self, target: Union[str, int, None] = None) ‑> tuple[str | None, List[str] | None]-
Expand source code
def active_config_at( self, target: Union[str, int, None] = None ) -> tuple[Optional[str], Optional[List[str]]]: """Resolve the effective (instructions, valid_tools) at a point in the context. Walks SYSTEM/DEVELOPER messages and AgentConfigUpdate items from the start up to and including ``target``. Args: target: An item id, a list index, or None for the end of the context. Returns: tuple: (instructions, valid_tools). Either element may be None. """ if target is None: end = len(self._items) elif isinstance(target, int): end = target + 1 else: end = next( (i + 1 for i, item in enumerate(self._items) if item.id == target), len(self._items), ) instructions: Optional[str] = None tools: Optional[List[str]] = None for item in self._items[:end]: if isinstance(item, ChatMessage) and item.role in ( ChatRole.SYSTEM, ChatRole.DEVELOPER, ): if isinstance(item.content, str): instructions = item.content else: instructions = " ".join( p for p in item.content if isinstance(p, str) ) elif isinstance(item, AgentConfigUpdate): if item.instructions is not None: instructions = item.instructions if item.tools is not None: tools = item.tools return instructions, toolsResolve the effective (instructions, valid_tools) at a point in the context.
Walks SYSTEM/DEVELOPER messages and AgentConfigUpdate items from the start up to and including
target.Args
target- An item id, a list index, or None for the end of the context.
Returns
tuple- (instructions, valid_tools). Either element may be None.
def add_config_update(self,
instructions: Optional[str] = None,
tools: Optional[List[str]] = None,
agent_id: Optional[str] = None) ‑> AgentConfigUpdate-
Expand source code
def add_config_update( self, instructions: Optional[str] = None, tools: Optional[List[str]] = None, agent_id: Optional[str] = None, ) -> AgentConfigUpdate: """Record a mid-conversation change to instructions or tools.""" update = AgentConfigUpdate( instructions=instructions, tools=tools, agent_id=agent_id ) self._items.append(update) return updateRecord a mid-conversation change to instructions or tools.
def add_function_call(self,
name: str,
arguments: str,
call_id: Optional[str] = None,
agent_id: Optional[str] = None,
metadata: Optional[dict] = None) ‑> FunctionCall-
Expand source code
def add_function_call( self, name: str, arguments: str, call_id: Optional[str] = None, agent_id: Optional[str] = None, metadata: Optional[dict] = None, ) -> FunctionCall: """Add a function call to the context. ``metadata`` carries provider-specific per-call data — notably the Gemini ``thought_signature`` — which must travel with this exact call when the context is later converted for the provider. """ call = FunctionCall( name=name, arguments=arguments, call_id=call_id or f"call_{uuid.uuid4().hex[:12]}", agent_id=agent_id, metadata=metadata, ) self._items.append(call) return callAdd a function call to the context.
metadatacarries provider-specific per-call data — notably the Geminithought_signature— which must travel with this exact call when the context is later converted for the provider. def add_function_output(self,
name: str,
output: str,
call_id: str,
is_error: bool = False,
agent_id: Optional[str] = None) ‑> FunctionCallOutput-
Expand source code
def add_function_output( self, name: str, output: str, call_id: str, is_error: bool = False, agent_id: Optional[str] = None, ) -> FunctionCallOutput: """Add a function output to the context.""" function_output = FunctionCallOutput( name=name, output=output, call_id=call_id, is_error=is_error, agent_id=agent_id, ) self._items.append(function_output) return function_outputAdd a function output to the context.
def add_handoff(self,
to_agent: str,
from_agent: Optional[str] = None,
reason: Optional[str] = None) ‑> AgentHandoff-
Expand source code
def add_handoff( self, to_agent: str, from_agent: Optional[str] = None, reason: Optional[str] = None, ) -> AgentHandoff: """Record a transfer of control between agents.""" handoff = AgentHandoff(from_agent=from_agent, to_agent=to_agent, reason=reason) self._items.append(handoff) return handoffRecord a transfer of control between agents.
def add_message(self,
role: ChatRole,
content: Union[str, List[ChatContent]],
message_id: Optional[str] = None,
created_at: Optional[float] = None,
replace: bool = False,
agent_id: Optional[str] = None) ‑> ChatMessage-
Expand source code
def add_message( self, role: ChatRole, content: Union[str, List[ChatContent]], message_id: Optional[str] = None, created_at: Optional[float] = None, replace: bool = False, agent_id: Optional[str] = None, ) -> ChatMessage: """Add a new message to the context.""" if replace and role == ChatRole.SYSTEM: self._items = [ item for item in self._items if not (isinstance(item, ChatMessage) and item.role == ChatRole.SYSTEM) ] if isinstance(content, str): content = [content] message = ChatMessage( role=role, content=content, id=message_id or f"msg_{uuid.uuid4().hex[:12]}", created_at=created_at or time.time(), agent_id=agent_id, ) self._items.append(message) return messageAdd a new message to the context.
def cleanup(self) ‑> None-
Expand source code
def cleanup(self) -> None: """ Clear all chat context items and references to free memory. """ logger.info(f"Cleaning up ChatContext with {len(self._items)} items") for item in self._items: if isinstance(item, ChatMessage): if isinstance(item.content, list): for content_item in item.content: if isinstance(content_item, ImageContent): content_item.image = None item.content = None elif isinstance(item, FunctionCall): item.arguments = None elif isinstance(item, FunctionCallOutput): item.output = None self._items.clear() try: import gc gc.collect() logger.info("ChatContext garbage collection completed") except Exception as e: logger.error(f"Error during ChatContext garbage collection: {e}") logger.info("ChatContext cleanup completed")Clear all chat context items and references to free memory.
def copy(self,
*,
exclude_system_messages: bool = False,
exclude_instructions: bool = False,
exclude_empty_messages: bool = False,
exclude_handoffs: bool = False,
exclude_config_updates: bool = False,
tools: Optional[List[FunctionTool]] = None,
filter_agent_id: Optional[str] = None) ‑> ChatContext-
Expand source code
def copy( self, *, exclude_system_messages: bool = False, exclude_instructions: bool = False, exclude_empty_messages: bool = False, exclude_handoffs: bool = False, exclude_config_updates: bool = False, tools: Optional[List[FunctionTool]] = None, filter_agent_id: Optional[str] = None, ) -> ChatContext: """Create a filtered copy of the chat context. Args: exclude_system_messages: Drop SYSTEM-role messages. exclude_instructions: Drop SYSTEM- and DEVELOPER-role messages. exclude_empty_messages: Drop messages with no meaningful content. exclude_handoffs: Drop AgentHandoff items. exclude_config_updates: Drop AgentConfigUpdate items. tools: Tool-scoping for function calls/outputs. ``None`` (the default) keeps every function call/output; an empty list drops them all; a non-empty list keeps only calls/outputs whose tool is in the list. filter_agent_id: When given, keep only items with this agent_id. Returns: ChatContext: A new ChatContext with the filtered items. """ items: List[ChatItem] = [] valid_tool_names = { get_tool_info(tool).name for tool in (tools or []) if is_function_tool(tool) } for item in self._items: if isinstance(item, ChatMessage): if exclude_system_messages and item.role == ChatRole.SYSTEM: continue if exclude_instructions and item.role in ( ChatRole.SYSTEM, ChatRole.DEVELOPER, ): continue if exclude_empty_messages and not self._has_content(item): continue if exclude_handoffs and isinstance(item, AgentHandoff): continue if exclude_config_updates and isinstance(item, AgentConfigUpdate): continue if tools is not None and isinstance( item, (FunctionCall, FunctionCallOutput) ): if item.name not in valid_tool_names: continue if filter_agent_id is not None: _structural = ( isinstance(item, (AgentHandoff, AgentConfigUpdate)) or ( isinstance(item, ChatMessage) and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER) ) ) if not _structural and item.agent_id != filter_agent_id: continue items.append(item) return ChatContext(items)Create a filtered copy of the chat context.
Args
exclude_system_messages- Drop SYSTEM-role messages.
exclude_instructions- Drop SYSTEM- and DEVELOPER-role messages.
exclude_empty_messages- Drop messages with no meaningful content.
exclude_handoffs- Drop AgentHandoff items.
exclude_config_updates- Drop AgentConfigUpdate items.
tools- Tool-scoping for function calls/outputs.
None(the default) keeps every function call/output; an empty list drops them all; a non-empty list keeps only calls/outputs whose tool is in the list. filter_agent_id- When given, keep only items with this agent_id.
Returns
ChatContext- A new ChatContext with the filtered items.
def estimated_tokens(self) ‑> int-
Expand source code
def estimated_tokens(self) -> int: """ Rough token estimate for the current context using a ~4 chars per token heuristic. Good enough for budget decisions — not a replacement for provider-reported usage. Returns: int: Estimated token count. """ total = 0 for item in self._items: total += self._estimate_item_tokens(item) return totalRough token estimate for the current context using a ~4 chars per token heuristic. Good enough for budget decisions — not a replacement for provider-reported usage.
Returns
int- Estimated token count.
def fork(self) ‑> ChatContext-
Expand source code
def fork(self) -> ChatContext: """Fork a complete, independent deep copy for a sub-agent. Returns: ChatContext: A new context; later mutations never touch this one. """ return ChatContext([item.model_copy(deep=True) for item in self._items])Fork a complete, independent deep copy for a sub-agent.
Returns
ChatContext- A new context; later mutations never touch this one.
def fork_brief(self,
instructions: str,
task_brief: Optional[str] = None,
agent_id: Optional[str] = None) ‑> ChatContext-
Expand source code
def fork_brief( self, instructions: str, task_brief: Optional[str] = None, agent_id: Optional[str] = None, ) -> ChatContext: """Fork a fresh context: sub-agent instructions + an optional task brief. Args: instructions: System instructions for the sub-agent (required, non-empty). task_brief: Optional task-description message. agent_id: Attribution stamped on the created items. Returns: ChatContext: A new context with no conversation history. """ if not instructions: raise ValueError("fork_brief() requires non-empty instructions") items: List[ChatItem] = [ ChatMessage( role=ChatRole.SYSTEM, content=[instructions], agent_id=agent_id ) ] if task_brief: items.append( ChatMessage( role=ChatRole.USER, content=[task_brief], agent_id=agent_id ) ) return ChatContext(items)Fork a fresh context: sub-agent instructions + an optional task brief.
Args
instructions- System instructions for the sub-agent (required, non-empty).
task_brief- Optional task-description message.
agent_id- Attribution stamped on the created items.
Returns
ChatContext- A new context with no conversation history.
def fork_filtered(self, recent_turns: int = 3, tools: Optional[List[FunctionTool]] = None) ‑> ChatContext-
Expand source code
def fork_filtered( self, recent_turns: int = 3, tools: Optional[List[FunctionTool]] = None, ) -> ChatContext: """Fork a context scoped to instructions + the most recent turns. Args: recent_turns: Number of recent user turns to keep (must be >= 1). tools: When given, function calls/outputs are limited to these tools. Returns: ChatContext: A new, independent context. """ if recent_turns < 1: raise ValueError("recent_turns must be >= 1") scoped = self.copy(tools=tools) instruction_items = [ item for item in scoped.items if isinstance(item, ChatMessage) and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER) ] instruction_ids = {item.id for item in instruction_items} user_indices = [ i for i, item in enumerate(scoped.items) if isinstance(item, ChatMessage) and item.role == ChatRole.USER ] if len(user_indices) > recent_turns: split_idx = user_indices[-recent_turns] tail = scoped.items[split_idx:] else: tail = scoped.items new_items = [item.model_copy(deep=True) for item in instruction_items] new_items += [ item.model_copy(deep=True) for item in tail if item.id not in instruction_ids ] return ChatContext(new_items)Fork a context scoped to instructions + the most recent turns.
Args
recent_turns- Number of recent user turns to keep (must be >= 1).
tools- When given, function calls/outputs are limited to these tools.
Returns
ChatContext- A new, independent context.
def get_by_id(self, item_id: str) ‑> ChatMessage | FunctionCall | FunctionCallOutput | AgentHandoff | AgentConfigUpdate | None-
Expand source code
def get_by_id(self, item_id: str) -> Optional[ChatItem]: """ Find an item by its ID. Args: item_id (str): The ID of the item to find. Returns: Optional[ChatItem]: The found item or None if not found. """ return next( (item for item in self._items if item.id == item_id), None )Find an item by its ID.
Args
item_id:str- The ID of the item to find.
Returns
Optional[ChatItem]- The found item or None if not found.
def insert(self, item: ChatItem) ‑> ChatMessage | FunctionCall | FunctionCallOutput | AgentHandoff | AgentConfigUpdate-
Expand source code
def insert(self, item: ChatItem) -> ChatItem: """Insert an item at the position determined by its ``created_at``.""" pos = len(self._items) for i, existing in enumerate(self._items): if existing.created_at > item.created_at: pos = i break self._items.insert(pos, item) return itemInsert an item at the position determined by its
created_at. def insert_many(self, items: List[ChatItem]) ‑> None-
Expand source code
def insert_many(self, items: List[ChatItem]) -> None: """Batch-insert items, each placed in timestamp order.""" for item in sorted(items, key=lambda it: it.created_at): self.insert(item)Batch-insert items, each placed in timestamp order.
async def merge(self,
other: ChatContext) ‑> ChatContext-
Expand source code
async def merge(self, other: ChatContext) -> ChatContext: """Merge a sub-agent's full transcript into this context, in-place. Every item from ``other`` is merged, timestamp-ordered and de-duplicated by id. This is the most complete merge-back; the ``merge_result`` and ``merge_with_summary`` variants merge less. Args: other: The sub-agent's context. Returns: ChatContext: This context instance (modified in-place). """ existing_ids = {item.id for item in self._items} incoming = [ item.model_copy(deep=True) for item in other.items if item.id not in existing_ids ] combined = self._items + incoming combined.sort(key=lambda item: item.created_at) self._items = combined return selfMerge a sub-agent's full transcript into this context, in-place.
Every item from
otheris merged, timestamp-ordered and de-duplicated by id. This is the most complete merge-back; themerge_resultandmerge_with_summaryvariants merge less.Args
other- The sub-agent's context.
Returns
ChatContext- This context instance (modified in-place).
async def merge_result(self,
other: ChatContext,
*,
agent_id: Optional[str] = None) ‑> ChatContext-
Expand source code
async def merge_result( self, other: ChatContext, *, agent_id: Optional[str] = None ) -> ChatContext: """Merge only a sub-agent's final assistant message, in-place. Args: other: The sub-agent's context. agent_id: Attribution stamped on the merged-in message. Returns: ChatContext: This context instance (modified in-place). """ final = next( ( item for item in reversed(other.items) if isinstance(item, ChatMessage) and item.role == ChatRole.ASSISTANT ), None, ) if final is not None: merged = final.model_copy(deep=True) if agent_id is not None: merged.agent_id = agent_id self._items.append(merged) return selfMerge only a sub-agent's final assistant message, in-place.
Args
other- The sub-agent's context.
agent_id- Attribution stamped on the merged-in message.
Returns
ChatContext- This context instance (modified in-place).
async def merge_with_summary(self,
other: ChatContext,
*,
llm: "'LLM'",
agent_id: Optional[str] = None) ‑> ChatContext-
Expand source code
async def merge_with_summary( self, other: ChatContext, *, llm: "LLM", agent_id: Optional[str] = None, ) -> ChatContext: """Merge an LLM-generated summary of a sub-agent's work, in-place. Args: other: The sub-agent's context. llm: LLM used to generate the summary (required keyword argument). agent_id: Attribution stamped on the summary message. Returns: ChatContext: This context instance (modified in-place). """ from .window import render_items, generate_summary text = render_items(other.items) summary_text = await generate_summary(llm, text) if text.strip() else "" if summary_text: self._items.append( ChatMessage( role=ChatRole.ASSISTANT, content=[f"[Sub-agent Summary]\n{summary_text}"], agent_id=agent_id, extra={"summary": True}, ) ) return selfMerge an LLM-generated summary of a sub-agent's work, in-place.
Args
other- The sub-agent's context.
llm- LLM used to generate the summary (required keyword argument).
agent_id- Attribution stamped on the summary message.
Returns
ChatContext- This context instance (modified in-place).
def messages(self) ‑> List[ChatMessage]-
Expand source code
def messages(self) -> List[ChatMessage]: """ Return only ChatMessage items, filtering out function calls and outputs. Returns: List[ChatMessage]: List of all chat messages in the context. """ return [item for item in self._items if isinstance(item, ChatMessage)]Return only ChatMessage items, filtering out function calls and outputs.
Returns
List[ChatMessage]- List of all chat messages in the context.
async def summarize(self,
llm: "'LLM'",
*,
keep_recent_turns: int = 3) ‑> ChatContext-
Expand source code
async def summarize( self, llm: "LLM", *, keep_recent_turns: int = 3 ) -> ChatContext: """Compress old conversation turns into a single summary message, in-place. Splits the context into head (older) and tail (recent). The head is rendered and summarized by ``llm``; structural items (system/developer messages, prior summaries, handoffs) are preserved. Args: llm: LLM used to generate the summary. keep_recent_turns: Number of recent user turns kept verbatim. Returns: ChatContext: This context instance (modified in-place). """ from .window import render_items, generate_summary user_indices = [ i for i, item in enumerate(self._items) if isinstance(item, ChatMessage) and item.role == ChatRole.USER ] if len(user_indices) <= keep_recent_turns: return self split_idx = user_indices[-keep_recent_turns] head = self._items[:split_idx] recent_items = list(self._items[split_idx:]) def _is_structural(item: ChatItem) -> bool: if isinstance(item, (AgentHandoff, AgentConfigUpdate)): return True if isinstance(item, ChatMessage): return ( item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER) or bool(item.extra.get("summary")) ) return False structural = [item for item in head if _is_structural(item)] summarizable = [item for item in head if not _is_structural(item)] if not summarizable: return self conversation_text = render_items(summarizable) if not conversation_text.strip(): return self summary_text = await generate_summary(llm, conversation_text) if not summary_text: logger.warning("Compression produced empty summary") return self summary_msg = ChatMessage( role=ChatRole.ASSISTANT, content=[f"[Conversation Summary]\n{summary_text}"], extra={"summary": True}, ) self._items = structural + [summary_msg] + recent_items logger.info( f"Compressed {len(summarizable)} items into summary. " f"Context: {len(self._items)} items" ) return selfCompress old conversation turns into a single summary message, in-place.
Splits the context into head (older) and tail (recent). The head is rendered and summarized by
agents.llm.llm; structural items (system/developer messages, prior summaries, handoffs) are preserved.Args
llm- LLM used to generate the summary.
keep_recent_turns- Number of recent user turns kept verbatim.
Returns
ChatContext- This context instance (modified in-place).
def to_anthropic_messages(self, *, caching: bool = False) ‑> tuple[list[dict], str | None]-
Expand source code
def to_anthropic_messages(self, *, caching: bool = False) -> tuple[list[dict], Optional[str]]: """Convert context to Anthropic messages format with role alternation enforced.""" from ..format_converters import to_anthropic_messages return to_anthropic_messages(self, caching=caching)Convert context to Anthropic messages format with role alternation enforced.
def to_dict(self) ‑> dict-
Expand source code
def to_dict(self) -> dict: """Convert the context to a dictionary representation.""" items = [] for item in self._items: base = { "type": item.type, "id": item.id, "created_at": item.created_at, "agent_id": item.agent_id, } if isinstance(item, ChatMessage): base.update({ "role": item.role.value, "content": item.content, "interrupted": item.interrupted, "extra": item.extra, "confidence": item.confidence, "metrics": item.metrics, "audio_instructions": item.audio_instructions, "text_instructions": item.text_instructions, }) elif isinstance(item, FunctionCall): base.update({ "name": item.name, "arguments": item.arguments, "call_id": item.call_id, "metadata": item.metadata, }) elif isinstance(item, FunctionCallOutput): base.update({ "name": item.name, "output": item.output, "call_id": item.call_id, "is_error": item.is_error, }) elif isinstance(item, AgentHandoff): base.update({ "from_agent": item.from_agent, "to_agent": item.to_agent, "reason": item.reason, }) elif isinstance(item, AgentConfigUpdate): base.update({ "instructions": item.instructions, "tools": item.tools, }) items.append(base) return {"items": items}Convert the context to a dictionary representation.
async def to_google_contents(self, *, thought_signatures: dict | None = None) ‑> tuple[list, str | None]-
Expand source code
async def to_google_contents(self, *, thought_signatures: dict | None = None) -> tuple[list, Optional[str]]: """Convert context to Google Gemini contents format.""" from ..format_converters import to_google_contents return await to_google_contents(self, thought_signatures=thought_signatures)Convert context to Google Gemini contents format.
def to_openai_messages(self, *, reasoning_model: bool = False) ‑> list[dict]-
Expand source code
def to_openai_messages(self, *, reasoning_model: bool = False) -> list[dict]: """Convert context to OpenAI chat completion messages format.""" from ..format_converters import to_openai_messages return to_openai_messages(self, reasoning_model=reasoning_model)Convert context to OpenAI chat completion messages format.
def truncate(self, max_items: int | None = None, max_tokens: int | None = None) ‑> ChatContext-
Expand source code
def truncate( self, max_items: int | None = None, max_tokens: int | None = None, ) -> ChatContext: """ Truncate the context while preserving system message and summary messages. Removes oldest non-system items until both constraints are satisfied. Keeps function call/output pairs together to avoid orphaned tool calls. Args: max_items: Maximum number of items to keep. None means no item limit. max_tokens: Maximum estimated token budget. None means no token limit. Returns: ChatContext: The current context instance after truncation. """ if max_items is None and max_tokens is None: return self logger.debug(f"Truncating context: {len(self._items)} items, {self.estimated_tokens()} tokens") # Identify protected items that must never be removed: # - System message (agent instructions) # - Summary message (compressed history) # - Last user message (LLMs require conversation to end with user turn) system_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.role in (ChatRole.SYSTEM, ChatRole.DEVELOPER)), None ) summary_msg = next( (item for item in self._items if isinstance(item, ChatMessage) and item.extra.get("summary")), None ) last_user_msg = next( (item for item in reversed(self._items) if isinstance(item, ChatMessage) and item.role == ChatRole.USER), None ) structural_items = [ item for item in self._items if isinstance(item, (AgentHandoff, AgentConfigUpdate)) ] protected = { id(m) for m in (system_msg, summary_msg, last_user_msg, *structural_items) if m is not None } # Start with all items; remove oldest non-protected until constraints met new_items = list(self._items) def _needs_trim() -> bool: if max_items is not None and len(new_items) > max_items: return True if max_tokens is not None: token_est = sum(self._estimate_item_tokens(it) for it in new_items) if token_est > max_tokens: return True return False while _needs_trim(): removed = False for i, item in enumerate(new_items): # Skip protected items if id(item) in protected: continue # Don't orphan function call pairs — remove them together if isinstance(item, FunctionCall): output_idx = next( (j for j in range(i + 1, len(new_items)) if isinstance(new_items[j], FunctionCallOutput) and new_items[j].call_id == item.call_id), None ) if output_idx is not None: new_items.pop(output_idx) new_items.pop(i) else: new_items.pop(i) removed = True break elif isinstance(item, FunctionCallOutput): new_items.pop(i) removed = True break else: new_items.pop(i) removed = True break if not removed: break # Only protected items remain — stop even if over budget # Clean up ALL orphaned function items (call without output, or output without call) call_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCall)} output_ids_in_list = {item.call_id for item in new_items if isinstance(item, FunctionCallOutput)} new_items = [ item for item in new_items if not ( (isinstance(item, FunctionCall) and item.call_id not in output_ids_in_list) or (isinstance(item, FunctionCallOutput) and item.call_id not in call_ids_in_list) ) ] # Re-insert protected items if they were accidentally removed by orphan cleanup if system_msg and system_msg not in new_items: new_items.insert(0, system_msg) if summary_msg and summary_msg not in new_items: insert_pos = 1 if system_msg in new_items else 0 new_items.insert(insert_pos, summary_msg) if last_user_msg and last_user_msg not in new_items: new_items.append(last_user_msg) self._items = new_items logger.debug(f"Truncation complete: {len(self._items)} items, {self.estimated_tokens()} tokens") return selfTruncate the context while preserving system message and summary messages.
Removes oldest non-system items until both constraints are satisfied. Keeps function call/output pairs together to avoid orphaned tool calls.
Args
max_items- Maximum number of items to keep. None means no item limit.
max_tokens- Maximum estimated token budget. None means no token limit.
Returns
ChatContext- The current context instance after truncation.
def turn_count(self) ‑> int-
Expand source code
def turn_count(self) -> int: """ Count the number of user turns (user-assistant exchange pairs). Returns: int: Number of user messages in the context. """ return sum( 1 for item in self._items if isinstance(item, ChatMessage) and item.role == ChatRole.USER )Count the number of user turns (user-assistant exchange pairs).
Returns
int- Number of user messages in the context.
class ChatMessage (**data: Any)-
Expand source code
class ChatMessage(_ChatItemBase): """A user, assistant, system, or developer utterance.""" role: ChatRole content: Union[str, List[ChatContent]] id: str = Field(default_factory=lambda: f"msg_{uuid.uuid4().hex[:12]}") type: Literal["message"] = "message" interrupted: bool = False extra: dict[str, Any] = Field(default_factory=dict) confidence: Optional[float] = None metrics: Optional[dict] = None audio_instructions: Optional[str] = None text_instructions: Optional[str] = None def instructions_for_modality( self, modality: Literal["audio", "text"] ) -> Union[str, List[ChatContent]]: """Return the instruction variant for the given input modality. Falls back to ``content`` when no modality-specific variant is set. """ if modality == "audio" and self.audio_instructions is not None: return self.audio_instructions if modality == "text" and self.text_instructions is not None: return self.text_instructions return self.contentA user, assistant, system, or developer utterance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- agents.llm.context.items._ChatItemBase
- pydantic.main.BaseModel
Class variables
var audio_instructions : str | Nonevar confidence : float | Nonevar content : str | List[str | ImageContent]var extra : dict[str, typing.Any]var id : strvar interrupted : boolvar metrics : dict | Nonevar model_configvar role : ChatRolevar text_instructions : str | Nonevar type : Literal['message']
Methods
def instructions_for_modality(self, modality: "Literal['audio', 'text']") ‑> str | List[str | ImageContent]-
Expand source code
def instructions_for_modality( self, modality: Literal["audio", "text"] ) -> Union[str, List[ChatContent]]: """Return the instruction variant for the given input modality. Falls back to ``content`` when no modality-specific variant is set. """ if modality == "audio" and self.audio_instructions is not None: return self.audio_instructions if modality == "text" and self.text_instructions is not None: return self.text_instructions return self.contentReturn the instruction variant for the given input modality.
Falls back to
contentwhen no modality-specific variant is set.
class ChatRole (*args, **kwds)-
Expand source code
class ChatRole(str, Enum): """Roles used in chat conversations.""" SYSTEM = "system" DEVELOPER = "developer" USER = "user" ASSISTANT = "assistant"Roles used in chat conversations.
Ancestors
- builtins.str
- enum.Enum
Class variables
var ASSISTANTvar DEVELOPERvar SYSTEMvar USER
class FallbackLLM (providers: List[LLM],
temporary_disable_sec: float = 60.0,
permanent_disable_after_attempts: int = 3,
latency_threshold_ms: float | None = None,
consecutive_latency_hits: int = 3)-
Expand source code
class FallbackLLM(LLM, FallbackBase): """LLM wrapper that automatically fails over to backup providers on errors, latency degradation, and attempts recovery of higher-priority ones.""" def __init__( self, providers: List[LLM], temporary_disable_sec: float = 60.0, permanent_disable_after_attempts: int = 3, latency_threshold_ms: Optional[float] = None, consecutive_latency_hits: int = 3, ): LLM.__init__(self) FallbackBase.__init__( self, providers, "LLM", temporary_disable_sec=temporary_disable_sec, permanent_disable_after_attempts=permanent_disable_after_attempts, latency_threshold_ms=latency_threshold_ms, consecutive_latency_hits=consecutive_latency_hits, ) self._setup_event_listeners() self._setup_latency_listener() def _setup_event_listeners(self): self.active_provider.on("error", self._on_provider_error) def _setup_latency_listener(self): if self.latency_threshold_ms is None: return global_event_emitter.on("TURN_METRICS_ADDED", self._on_turn_metrics) def _on_turn_metrics(self, event: dict): metrics = event.get("metrics") or {} function_tools = metrics.get("functionToolsCalled") or [] mcp_tools = metrics.get("mcpToolMetrics") or [] if function_tools or mcp_tools: return ttft = metrics.get("ttft") if ttft is None: return asyncio.create_task(self._record_latency(float(ttft))) def _on_provider_error(self, error_msg): failed_p = self.active_provider asyncio.create_task(self._handle_async_error(str(error_msg), failed_p)) async def _handle_async_error(self, error_msg: str, failed_provider: Any): switched = await self._switch_provider(f"Async Error: {error_msg}", failed_provider=failed_provider) self.emit("error", error_msg) async def _switch_provider(self, reason: str, failed_provider: Any = None): provider_to_cleanup = failed_provider if failed_provider else self.active_provider try: provider_to_cleanup.off("error", self._on_provider_error) except: pass active_before = self.active_provider switched = await super()._switch_provider(reason, failed_provider) active_after = self.active_provider if switched: if active_before != active_after: self.active_provider.on("error", self._on_provider_error) return True return False async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]: """ Attempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting. """ self.check_recovery() while True: current_provider = self.active_provider try: async for chunk in current_provider.chat(messages, **kwargs): yield chunk return except Exception as e: switched = await self._switch_provider(str(e), failed_provider=current_provider) self.emit("error", str(e)) if not switched: raise e async def cancel_current_generation(self) -> None: await self.active_provider.cancel_current_generation() async def aclose(self) -> None: if self.latency_threshold_ms is not None: try: global_event_emitter.off("TURN_METRICS_ADDED", self._on_turn_metrics) except Exception: pass for p in self.providers: await p.aclose() await super().aclose()LLM wrapper that automatically fails over to backup providers on errors, latency degradation, and attempts recovery of higher-priority ones.
Initialize the LLM base class.
Ancestors
- LLM
- EventEmitter
- typing.Generic
- FallbackBase
Methods
async def chat(self,
messages: ChatContext,
**kwargs) ‑> AsyncIterator[LLMResponse]-
Expand source code
async def chat(self, messages: ChatContext, **kwargs) -> AsyncIterator[LLMResponse]: """ Attempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting. """ self.check_recovery() while True: current_provider = self.active_provider try: async for chunk in current_provider.chat(messages, **kwargs): yield chunk return except Exception as e: switched = await self._switch_provider(str(e), failed_provider=current_provider) self.emit("error", str(e)) if not switched: raise eAttempts to chat with current provider. Loops until one succeeds or all fail. Checks for recovery of primary providers before starting.
Inherited members
class FunctionCall (**data: Any)-
Expand source code
class FunctionCall(_ChatItemBase): """A tool invocation initiated by the language model.""" id: str = Field(default_factory=lambda: f"call_{uuid.uuid4().hex[:12]}") type: Literal["function_call"] = "function_call" name: str arguments: str call_id: str metadata: Optional[dict] = NoneA tool invocation initiated by the language model.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- agents.llm.context.items._ChatItemBase
- pydantic.main.BaseModel
Class variables
var arguments : strvar call_id : strvar id : strvar metadata : dict | Nonevar model_configvar name : strvar type : Literal['function_call']
class FunctionCallOutput (**data: Any)-
Expand source code
class FunctionCallOutput(_ChatItemBase): """The result of a tool execution.""" id: str = Field(default_factory=lambda: f"output_{uuid.uuid4().hex[:12]}") type: Literal["function_call_output"] = "function_call_output" name: str call_id: str output: str is_error: bool = FalseThe result of a tool execution.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- agents.llm.context.items._ChatItemBase
- pydantic.main.BaseModel
Class variables
var call_id : strvar id : strvar is_error : boolvar model_configvar name : strvar output : strvar type : Literal['function_call_output']
class ImageContent (**data: Any)-
Expand source code
class ImageContent(BaseModel): """Image content in a chat message.""" model_config = ConfigDict(arbitrary_types_allowed=True) id: str = Field(default_factory=lambda: f"img_{uuid.uuid4().hex[:12]}") type: Literal["image"] = "image" image: Union[av.VideoFrame, str] inference_detail: Literal["auto", "high", "low"] = "auto" encode_options: EncodeOptions = Field( default_factory=lambda: EncodeOptions( format="JPEG", quality=90, resize_options=ResizeOptions(width=320, height=240), ) ) def to_data_url(self) -> str: """Convert the image to a data URL string.""" if isinstance(self.image, str): return self.image encoded_image = images.encode(self.image, self.encode_options) b64_image = base64.b64encode(encoded_image).decode("utf-8") return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"Image content in a chat message.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var encode_options : EncodeOptionsvar id : strvar image : av.video.frame.VideoFrame | strvar inference_detail : Literal['auto', 'high', 'low']var model_configvar type : Literal['image']
Methods
def to_data_url(self) ‑> str-
Expand source code
def to_data_url(self) -> str: """Convert the image to a data URL string.""" if isinstance(self.image, str): return self.image encoded_image = images.encode(self.image, self.encode_options) b64_image = base64.b64encode(encoded_image).decode("utf-8") return f"data:image/{self.encode_options.format.lower()};base64,{b64_image}"Convert the image to a data URL string.
class LLM-
Expand source code
class LLM(EventEmitter[Literal["error"]]): """ Base class for LLM implementations. """ def __init__(self) -> None: """ Initialize the LLM base class. """ super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: """ Get the LLM provider label. Returns: str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM"). """ return self._label @abstractmethod async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph:Optional[Any] = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Main method to interact with the LLM. Args: messages (ChatContext): The conversation context containing message history. tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use. conversational_graph(Any | None, optional): GraphAdapter object for using graph methods. **kwargs (Any): Additional arguments specific to the LLM provider implementation. Returns: AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated. Raises: NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedError @abstractmethod async def cancel_current_generation(self) -> None: """ Cancel the current LLM generation if active. Raises: NotImplementedError: This method must be implemented by subclasses. """ # override in subclasses pass async def aclose(self) -> None: """ Cleanup resources. """ logger.info(f"Cleaning up LLM: {self.label}") await self.cancel_current_generation() try: import gc gc.collect() logger.info(f"LLM garbage collection completed: {self.label}") except Exception as e: logger.error(f"Error during LLM garbage collection: {e}") logger.info(f"LLM cleanup completed: {self.label}") async def __aenter__(self) -> LLM: """ Async context manager entry point. """ return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """ Async context manager exit point. """ await self.aclose()Base class for LLM implementations.
Initialize the LLM base class.
Ancestors
- EventEmitter
- typing.Generic
Subclasses
Instance variables
prop label : str-
Expand source code
@property def label(self) -> str: """ Get the LLM provider label. Returns: str: A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM"). """ return self._labelGet the LLM provider label.
Returns
str- A string identifier for the LLM provider (e.g., "videosdk.plugins.openai.llm.OpenAILLM").
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """ Cleanup resources. """ logger.info(f"Cleaning up LLM: {self.label}") await self.cancel_current_generation() try: import gc gc.collect() logger.info(f"LLM garbage collection completed: {self.label}") except Exception as e: logger.error(f"Error during LLM garbage collection: {e}") logger.info(f"LLM cleanup completed: {self.label}")Cleanup resources.
async def cancel_current_generation(self) ‑> None-
Expand source code
@abstractmethod async def cancel_current_generation(self) -> None: """ Cancel the current LLM generation if active. Raises: NotImplementedError: This method must be implemented by subclasses. """ # override in subclasses passCancel the current LLM generation if active.
Raises
NotImplementedError- This method must be implemented by subclasses.
async def chat(self,
messages: ChatContext,
tools: list[FunctionTool] | None = None,
conversational_graph: Optional[Any] = None,
**kwargs: Any) ‑> AsyncIterator[LLMResponse]-
Expand source code
@abstractmethod async def chat( self, messages: ChatContext, tools: list[FunctionTool] | None = None, conversational_graph:Optional[Any] = None, **kwargs: Any ) -> AsyncIterator[LLMResponse]: """ Main method to interact with the LLM. Args: messages (ChatContext): The conversation context containing message history. tools (list[FunctionTool] | None, optional): List of available function tools for the LLM to use. conversational_graph(Any | None, optional): GraphAdapter object for using graph methods. **kwargs (Any): Additional arguments specific to the LLM provider implementation. Returns: AsyncIterator[LLMResponse]: An async iterator yielding LLMResponse objects as they're generated. Raises: NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedErrorMain method to interact with the LLM.
Args
messages:ChatContext- The conversation context containing message history.
tools:list[FunctionTool] | None, optional- List of available function tools for the LLM to use.
- conversational_graph(Any | None, optional): GraphAdapter object for using graph methods.
**kwargs:Any- Additional arguments specific to the LLM provider implementation.
Returns
AsyncIterator[LLMResponse]- An async iterator yielding LLMResponse objects as they're generated.
Raises
NotImplementedError- This method must be implemented by subclasses.
Inherited members
class LLMResponse (**data: Any)-
Expand source code
class LLMResponse(BaseModel): """ Data model to hold LLM response data. Attributes: content (str): The text content generated by the LLM. role (ChatRole): The role of the response (typically ASSISTANT). metadata (Optional[dict[str, Any]]): Additional response metadata from the LLM provider. """ content: str role: ChatRole metadata: Optional[dict[str, Any]] = NoneData model to hold LLM response data.
Attributes
content:str- The text content generated by the LLM.
role:ChatRole- The role of the response (typically ASSISTANT).
metadata:Optional[dict[str, Any]]- Additional response metadata from the LLM provider.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var content : strvar metadata : dict[str, typing.Any] | Nonevar model_configvar role : ChatRole
class ReadOnlyChatContext (items: List[ChatItem])-
Expand source code
class ReadOnlyChatContext(ChatContext): """A read-only view over a ChatContext's items. All read operations (items, messages, copy, fork, get_by_id, active_config_at, serialization) work normally. Every mutating operation raises ``RuntimeError``. Used when a shared context is handed to an agent that must not mutate the parent. """ def __init__(self, items: List[ChatItem]): # Hold a reference to the live items list — this is a view, not a copy. self._items = items def _readonly(self, *args, **kwargs): raise RuntimeError(_MUTATION_ERROR) async def _readonly_async(self, *args, **kwargs): raise RuntimeError(_MUTATION_ERROR) # Sync mutators add_message = _readonly add_function_call = _readonly add_function_output = _readonly add_handoff = _readonly add_config_update = _readonly insert = _readonly insert_many = _readonly truncate = _readonly cleanup = _readonly # Async mutators summarize = _readonly_async merge = _readonly_async merge_result = _readonly_async merge_with_summary = _readonly_asyncA read-only view over a ChatContext's items.
All read operations (items, messages, copy, fork, get_by_id, active_config_at, serialization) work normally. Every mutating operation raises
RuntimeError. Used when a shared context is handed to an agent that must not mutate the parent.Initialize the chat context.
Args
items:Optional[List[ChatItem]]- Initial list of chat items. If None, starts with empty context.
Ancestors
Inherited members
ChatContext:active_config_atadd_config_updateadd_function_calladd_function_outputadd_handoffadd_messagecleanupcopyemptyestimated_tokensforkfork_brieffork_filteredfrom_dictget_by_idinsertinsert_manyitemsmergemerge_resultmerge_with_summarymessagessummarizeto_anthropic_messagesto_dictto_google_contentsto_openai_messagestruncateturn_count
class ResponseChunk (content: str, metadata: dict[str, Any] | None = None, role: str | None = None)-
Expand source code
class ResponseChunk(str): """A string subclass representing a single chunk of an LLM response, carrying optional metadata and role.""" def __new__(cls, content: str, metadata: dict[str, Any] | None = None, role: str | None = None): obj = super().__new__(cls, content or "") obj.metadata = metadata obj.role = role return obj @property def content(self) -> str: return str(self)A string subclass representing a single chunk of an LLM response, carrying optional metadata and role.
Ancestors
- builtins.str
Instance variables
prop content : str-
Expand source code
@property def content(self) -> str: return str(self)