Module agents.agent
Classes
class Agent (instructions: str,
tools: List[FunctionTool] = None,
agent_id: str = None,
mcp_servers: List[MCPServiceProvider] = None)-
Expand source code
class Agent(EventEmitter[Literal["agent_started"]], ABC): """ Abstract base class for creating custom agents. Inherits from EventEmitter to handle agent events and state updates. """ def __init__(self, instructions: str, tools: List[FunctionTool] = None, agent_id: str = None, mcp_servers: List[MCPServiceProvider] = None): super().__init__() self._tools = tools self._llm = None self._stt = None self._tts = None self.chat_context = ChatContext.empty() self.instructions = instructions self._tools = tools if tools else [] self._mcp_servers = mcp_servers if mcp_servers else [] self._mcp_initialized = False self._register_class_tools() self.register_tools() self.a2a = A2AProtocol(self) self._agent_card = None self.id = agent_id or str(uuid.uuid4()) self.mcp_manager = MCPToolManager() def _register_class_tools(self) -> None: """Internal Method: Register all function tools defined in the class""" for name, attr in inspect.getmembers(self): if is_function_tool(attr): self._tools.append(attr) @property def instructions(self) -> str: """Get the instructions for the agent""" return self._instructions @instructions.setter def instructions(self, value: str) -> None: """Set the instructions for the agent""" self._instructions = value self.chat_context.add_message( role=ChatRole.SYSTEM, content=value ) @property def tools(self) -> List[FunctionTool]: """Get the tools for the agent""" return self._tools def register_tools(self) -> None: """Internal Method: Register external function tools for the agent""" for tool in self._tools: if not is_function_tool(tool): raise ValueError(f"Tool {tool.__name__ if hasattr(tool, '__name__') else tool} is not a valid FunctionTool") def update_tools(self, tools: List[FunctionTool]) -> None: """Update the tools for the agent""" self._tools.extend(tools) self._register_class_tools() self.register_tools() async def hangup(self) -> None: """Hang up the agent""" await self.session.close() async def initialize_mcp(self) -> None: """Internal Method: Initialize the agent, including any MCP server if provided.""" if self._mcp_servers and not self._mcp_initialized: for server in self._mcp_servers: await self.add_server(server) self._mcp_initialized = True async def add_server(self, mcp_server: MCPServiceProvider) -> None: """Internal Method: Initialize the MCP server and register the tools""" await self.mcp_manager.add_mcp_server(mcp_server) self._tools.extend(self.mcp_manager.tools) @abstractmethod async def on_enter(self) -> None: """Called when session starts, to be implemented in your custom agent implementation.""" pass def on_speech_in(self, data: dict) -> None: """Called when user speech is detected, to be implemented in your custom agent implementation.""" pass def on_speech_out(self, data: dict) -> None: """Called when agent speech is generated, to be implemented in your custom agent implementation.""" pass async def register_a2a(self, card: AgentCard) -> None: """ Register the agent for A2A communication""" self._agent_card = card await self.a2a.register(card) async def unregister_a2a(self) -> None: """Unregister the agent from A2A communication""" await self.a2a.unregister() self._agent_card = None async def cleanup(self) -> None: """Internal Method: Cleanup agent resources""" logger.info("Cleaning up agent resources") if self.mcp_manager: try: await self.mcp_manager.cleanup() logger.info("MCP manager cleaned up") except Exception as e: logger.error(f"Error cleaning up MCP manager: {e}") self.mcp_manager = None self._tools = [] self._mcp_servers = [] self.chat_context = None self._agent_card = None if hasattr(self, 'session'): self.session = None logger.info("Agent cleanup completed") @abstractmethod async def on_exit(self) -> None: """Called when session ends, to be implemented in your custom agent implementation.""" passAbstract base class for creating custom agents. Inherits from EventEmitter to handle agent events and state updates.
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Instance variables
prop instructions : str-
Expand source code
@property def instructions(self) -> str: """Get the instructions for the agent""" return self._instructionsGet the instructions for the agent
prop tools : List[FunctionTool]-
Expand source code
@property def tools(self) -> List[FunctionTool]: """Get the tools for the agent""" return self._toolsGet the tools for the agent
Methods
async def add_server(self, mcp_server: MCPServiceProvider) ‑> None-
Expand source code
async def add_server(self, mcp_server: MCPServiceProvider) -> None: """Internal Method: Initialize the MCP server and register the tools""" await self.mcp_manager.add_mcp_server(mcp_server) self._tools.extend(self.mcp_manager.tools)Internal Method: Initialize the MCP server and register the tools
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Internal Method: Cleanup agent resources""" logger.info("Cleaning up agent resources") if self.mcp_manager: try: await self.mcp_manager.cleanup() logger.info("MCP manager cleaned up") except Exception as e: logger.error(f"Error cleaning up MCP manager: {e}") self.mcp_manager = None self._tools = [] self._mcp_servers = [] self.chat_context = None self._agent_card = None if hasattr(self, 'session'): self.session = None logger.info("Agent cleanup completed")Internal Method: Cleanup agent resources
async def hangup(self) ‑> None-
Expand source code
async def hangup(self) -> None: """Hang up the agent""" await self.session.close()Hang up the agent
async def initialize_mcp(self) ‑> None-
Expand source code
async def initialize_mcp(self) -> None: """Internal Method: Initialize the agent, including any MCP server if provided.""" if self._mcp_servers and not self._mcp_initialized: for server in self._mcp_servers: await self.add_server(server) self._mcp_initialized = TrueInternal Method: Initialize the agent, including any MCP server if provided.
async def on_enter(self) ‑> None-
Expand source code
@abstractmethod async def on_enter(self) -> None: """Called when session starts, to be implemented in your custom agent implementation.""" passCalled when session starts, to be implemented in your custom agent implementation.
async def on_exit(self) ‑> None-
Expand source code
@abstractmethod async def on_exit(self) -> None: """Called when session ends, to be implemented in your custom agent implementation.""" passCalled when session ends, to be implemented in your custom agent implementation.
def on_speech_in(self, data: dict) ‑> None-
Expand source code
def on_speech_in(self, data: dict) -> None: """Called when user speech is detected, to be implemented in your custom agent implementation.""" passCalled when user speech is detected, to be implemented in your custom agent implementation.
def on_speech_out(self, data: dict) ‑> None-
Expand source code
def on_speech_out(self, data: dict) -> None: """Called when agent speech is generated, to be implemented in your custom agent implementation.""" passCalled when agent speech is generated, to be implemented in your custom agent implementation.
async def register_a2a(self, card: AgentCard) ‑> None-
Expand source code
async def register_a2a(self, card: AgentCard) -> None: """ Register the agent for A2A communication""" self._agent_card = card await self.a2a.register(card)Register the agent for A2A communication
def register_tools(self) ‑> None-
Expand source code
def register_tools(self) -> None: """Internal Method: Register external function tools for the agent""" for tool in self._tools: if not is_function_tool(tool): raise ValueError(f"Tool {tool.__name__ if hasattr(tool, '__name__') else tool} is not a valid FunctionTool")Internal Method: Register external function tools for the agent
async def unregister_a2a(self) ‑> None-
Expand source code
async def unregister_a2a(self) -> None: """Unregister the agent from A2A communication""" await self.a2a.unregister() self._agent_card = NoneUnregister the agent from A2A communication
def update_tools(self, tools: List[FunctionTool]) ‑> None-
Expand source code
def update_tools(self, tools: List[FunctionTool]) -> None: """Update the tools for the agent""" self._tools.extend(tools) self._register_class_tools() self.register_tools()Update the tools for the agent