Module agents.agent

Classes

class Agent (instructions: str,
tools: List[FunctionTool] = None,
agent_id: str = None,
mcp_servers: List[MCPServiceProvider] = None,
inherit_context: bool = False,
knowledge_base: KnowledgeBase | None = None)
Expand source code
class Agent(EventEmitter[Literal["agent_started"]], ABC):
    """
    Abstract base class for creating custom agents.
    Inherits from EventEmitter to handle agent events and state updates.
    """
    def __init__(self, instructions: str, tools: List[FunctionTool] = None, agent_id: str = None, mcp_servers: List[MCPServiceProvider] = None, inherit_context: bool = False, knowledge_base: KnowledgeBase | None = None):
        super().__init__()
        self._tools = tools
        self._llm = None
        self._stt = None
        self._tts = None
        self.chat_context = ChatContext.empty()
        self.instructions = instructions
        self._tools = tools if tools else []
        self._mcp_servers = mcp_servers if mcp_servers else []
        self._mcp_initialized = False
        self._register_class_tools()
        self.register_tools()
        self.a2a = A2AProtocol(self)
        self._agent_card = None 
        self.id = agent_id or str(uuid.uuid4())
        self.mcp_manager = MCPToolManager()
        self.session: AgentSession | None = None
        self._thinking_background_config: Optional[BackgroundAudioHandlerConfig] = None
        self.knowledge_base = knowledge_base 
        self.inherit_context = inherit_context

    def _register_class_tools(self) -> None:
        """Internal Method: Register all function tools defined in the class"""
        for name, attr in inspect.getmembers(self):
            if is_function_tool(attr):
                self._tools.append(attr)

    @property
    def instructions(self) -> str:
        """Get the instructions for the agent"""
        return self._instructions

    @instructions.setter
    def instructions(self, value: str) -> None:
        """Set the instructions for the agent"""
        self._instructions = value
        self.chat_context.add_message(
            role=ChatRole.SYSTEM,
            content=value
        )

    @property
    def tools(self) -> ToolList[FunctionTool]:
        """Get the tools for the agent"""
        return ToolList(self._tools)
    
    def register_tools(self) -> None:
        """Internal Method: Register external function tools for the agent"""
        for tool in self._tools:
            if not is_function_tool(tool):
                raise ValueError(f"Tool {tool.__name__ if hasattr(tool, '__name__') else tool} is not a valid FunctionTool")

    def update_tools(self, tools: List[FunctionTool]) -> None:
        """Update the tools for the agent"""
        self._tools = tools
        self._register_class_tools()
        self.register_tools()
    
    async def hangup(self) -> None:
        """Hang up the agent"""
        await self.session.hangup("manual_hangup")
    
    def set_thinking_audio(self, file: str = None, volume: float = 0.3):
        """Set the thinking background for the agent"""
        if file is None:
            file = os.path.join(os.path.dirname(__file__), 'resources', 'agent_keyboard.wav')
        self._thinking_background_config = BackgroundAudioHandlerConfig(file_path=file,volume=volume,looping=True,enabled=True)

    async def play_background_audio(self, file: str = None, volume: float = 1.0, looping: bool = False, override_thinking: bool = True) -> None:
        """Play background audio on demand"""
        if file is None:
            file = os.path.join(os.path.dirname(__file__), 'resources', 'classical.wav')
        
        config = BackgroundAudioHandlerConfig(file_path=file,volume=volume,looping=looping,enabled=True,mode='mixing') 
        
        await self.session.play_background_audio(config, override_thinking)

    async def stop_background_audio(self) -> None:
        """Stop background audio on demand"""
        await self.session.stop_background_audio()
    
    async def initialize_mcp(self) -> None:
        """Internal Method: Initialize the agent, including any MCP server if provided."""
        if self._mcp_servers and not self._mcp_initialized:
            for server in self._mcp_servers:
                await self.add_server(server)
            self._mcp_initialized = True
    
    async def add_server(self, mcp_server: MCPServiceProvider) -> None:
        """Internal Method: Initialize the MCP server and register the tools"""
        existing_tool_count = len(self.mcp_manager.tools)
        await self.mcp_manager.add_mcp_server(mcp_server)
        new_tools = self.mcp_manager.tools[existing_tool_count:]
        self._tools.extend(new_tools)
        
    @abstractmethod
    async def on_enter(self) -> None:
        """Called when session starts, to be implemented in your custom agent implementation."""
        pass


    async def register_a2a(self, card: AgentCard) -> None:
        """ Register the agent for A2A communication"""
        self._agent_card = card
        await self.a2a.register(card)

    async def unregister_a2a(self) -> None:
        """Unregister the agent from A2A communication"""
        await self.a2a.unregister()
        self._agent_card = None

    async def cleanup(self) -> None:
        """Internal Method: Cleanup agent resources"""
        logger.info("Cleaning up agent resources")        
        if self.mcp_manager:
            try:
                await self.mcp_manager.cleanup()
                logger.info("MCP manager cleaned up")
            except Exception as e:
                logger.error(f"Error cleaning up MCP manager: {e}")
            self.mcp_manager = None

        self._tools = []
        self._mcp_servers = []
        self.chat_context = None
        self._agent_card = None        
        if hasattr(self, 'session'):
            self.session = None        
        logger.info("Agent cleanup completed")
    
    @abstractmethod
    async def on_exit(self) -> None:
        """Called when session ends, to be implemented in your custom agent implementation."""
        pass

    def capture_frames(self, num_of_frames: int = 1) -> list[av.VideoFrame]:
        """Capture the latest video frames from the pipeline (max 5)."""
        if num_of_frames > 5:
            raise ValueError("num_of_frames cannot exceed 5")

        pipeline = getattr(getattr(self, 'session', None), 'pipeline', None)
        if not pipeline:
            logger.warning("Pipeline not available")
            return []

        return pipeline.get_latest_frames(num_of_frames)

Abstract base class for creating custom agents. Inherits from EventEmitter to handle agent events and state updates.

Ancestors

Instance variables

prop instructions : str
Expand source code
@property
def instructions(self) -> str:
    """Get the instructions for the agent"""
    return self._instructions

Get the instructions for the agent

prop toolsToolList[FunctionTool]
Expand source code
@property
def tools(self) -> ToolList[FunctionTool]:
    """Get the tools for the agent"""
    return ToolList(self._tools)

Get the tools for the agent

Methods

async def add_server(self, mcp_server: MCPServiceProvider) ‑> None
Expand source code
async def add_server(self, mcp_server: MCPServiceProvider) -> None:
    """Internal Method: Initialize the MCP server and register the tools"""
    existing_tool_count = len(self.mcp_manager.tools)
    await self.mcp_manager.add_mcp_server(mcp_server)
    new_tools = self.mcp_manager.tools[existing_tool_count:]
    self._tools.extend(new_tools)

Internal Method: Initialize the MCP server and register the tools

def capture_frames(self, num_of_frames: int = 1) ‑> list[av.video.frame.VideoFrame]
Expand source code
def capture_frames(self, num_of_frames: int = 1) -> list[av.VideoFrame]:
    """Capture the latest video frames from the pipeline (max 5)."""
    if num_of_frames > 5:
        raise ValueError("num_of_frames cannot exceed 5")

    pipeline = getattr(getattr(self, 'session', None), 'pipeline', None)
    if not pipeline:
        logger.warning("Pipeline not available")
        return []

    return pipeline.get_latest_frames(num_of_frames)

Capture the latest video frames from the pipeline (max 5).

async def cleanup(self) ‑> None
Expand source code
async def cleanup(self) -> None:
    """Internal Method: Cleanup agent resources"""
    logger.info("Cleaning up agent resources")        
    if self.mcp_manager:
        try:
            await self.mcp_manager.cleanup()
            logger.info("MCP manager cleaned up")
        except Exception as e:
            logger.error(f"Error cleaning up MCP manager: {e}")
        self.mcp_manager = None

    self._tools = []
    self._mcp_servers = []
    self.chat_context = None
    self._agent_card = None        
    if hasattr(self, 'session'):
        self.session = None        
    logger.info("Agent cleanup completed")

Internal Method: Cleanup agent resources

async def hangup(self) ‑> None
Expand source code
async def hangup(self) -> None:
    """Hang up the agent"""
    await self.session.hangup("manual_hangup")

Hang up the agent

async def initialize_mcp(self) ‑> None
Expand source code
async def initialize_mcp(self) -> None:
    """Internal Method: Initialize the agent, including any MCP server if provided."""
    if self._mcp_servers and not self._mcp_initialized:
        for server in self._mcp_servers:
            await self.add_server(server)
        self._mcp_initialized = True

Internal Method: Initialize the agent, including any MCP server if provided.

async def on_enter(self) ‑> None
Expand source code
@abstractmethod
async def on_enter(self) -> None:
    """Called when session starts, to be implemented in your custom agent implementation."""
    pass

Called when session starts, to be implemented in your custom agent implementation.

async def on_exit(self) ‑> None
Expand source code
@abstractmethod
async def on_exit(self) -> None:
    """Called when session ends, to be implemented in your custom agent implementation."""
    pass

Called when session ends, to be implemented in your custom agent implementation.

async def play_background_audio(self,
file: str = None,
volume: float = 1.0,
looping: bool = False,
override_thinking: bool = True) ‑> None
Expand source code
async def play_background_audio(self, file: str = None, volume: float = 1.0, looping: bool = False, override_thinking: bool = True) -> None:
    """Play background audio on demand"""
    if file is None:
        file = os.path.join(os.path.dirname(__file__), 'resources', 'classical.wav')
    
    config = BackgroundAudioHandlerConfig(file_path=file,volume=volume,looping=looping,enabled=True,mode='mixing') 
    
    await self.session.play_background_audio(config, override_thinking)

Play background audio on demand

async def register_a2a(self, card: AgentCard) ‑> None
Expand source code
async def register_a2a(self, card: AgentCard) -> None:
    """ Register the agent for A2A communication"""
    self._agent_card = card
    await self.a2a.register(card)

Register the agent for A2A communication

def register_tools(self) ‑> None
Expand source code
def register_tools(self) -> None:
    """Internal Method: Register external function tools for the agent"""
    for tool in self._tools:
        if not is_function_tool(tool):
            raise ValueError(f"Tool {tool.__name__ if hasattr(tool, '__name__') else tool} is not a valid FunctionTool")

Internal Method: Register external function tools for the agent

def set_thinking_audio(self, file: str = None, volume: float = 0.3)
Expand source code
def set_thinking_audio(self, file: str = None, volume: float = 0.3):
    """Set the thinking background for the agent"""
    if file is None:
        file = os.path.join(os.path.dirname(__file__), 'resources', 'agent_keyboard.wav')
    self._thinking_background_config = BackgroundAudioHandlerConfig(file_path=file,volume=volume,looping=True,enabled=True)

Set the thinking background for the agent

async def stop_background_audio(self) ‑> None
Expand source code
async def stop_background_audio(self) -> None:
    """Stop background audio on demand"""
    await self.session.stop_background_audio()

Stop background audio on demand

async def unregister_a2a(self) ‑> None
Expand source code
async def unregister_a2a(self) -> None:
    """Unregister the agent from A2A communication"""
    await self.a2a.unregister()
    self._agent_card = None

Unregister the agent from A2A communication

def update_tools(self, tools: List[FunctionTool]) ‑> None
Expand source code
def update_tools(self, tools: List[FunctionTool]) -> None:
    """Update the tools for the agent"""
    self._tools = tools
    self._register_class_tools()
    self.register_tools()

Update the tools for the agent

Inherited members

class ToolList (*args, **kwargs)
Expand source code
class ToolList(list):
    """
    Custom list class that supports addition and subtraction for Tool management.
    """
    def __add__(self, other):
        if isinstance(other, list):
            return ToolList(super().__add__(other))
        return ToolList(super().__add__([other]))

    def __sub__(self, other):
        if not isinstance(other, list):
            other = [other]
        return ToolList([item for item in self if item not in other])

Custom list class that supports addition and subtraction for Tool management.

Ancestors

  • builtins.list