Module agents.content_generation
Classes
class ContentGeneration (agent: Agent | None = None,
llm: LLM | None = None,
conversational_graph: Any | None = None,
max_context_items: int | None = None)-
Expand source code
class ContentGeneration(EventEmitter[Literal["generation_started", "generation_chunk", "generation_complete", "tool_called", "agent_switched"]]): """ Handles LLM processing and content generation. Events: - generation_started: LLM generation begins - generation_chunk: Streaming chunk received - generation_complete: Generation finished - tool_called: Function tool invoked - agent_switched: Agent switching occurred """ def __init__( self, agent: Agent | None = None, llm: LLM | None = None, conversational_graph: Any | None = None, max_context_items: int | None = None, ) -> None: super().__init__() self.agent = agent self.llm = llm self.conversational_graph = conversational_graph self.max_context_items = max_context_items self.llm_lock = asyncio.Lock() self._is_interrupted = False async def start(self) -> None: """Start the content generation component""" logger.info("ContentGeneration started") async def generate(self, user_text: str) -> AsyncIterator[ResponseChunk]: """ Process user text with LLM and yield response chunks. Args: user_text: User input text knowledge_base: Optional knowledge base for context enrichment Yields: ResponseChunk objects with generated content """ async with self.llm_lock: if not self.llm: logger.warning("No LLM available for content generation") return if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent not available for LLM processing") return if self.max_context_items: current_items = len(self.agent.chat_context.items) if current_items > self.max_context_items: try: logger.info(f"Truncating context from {current_items} to {self.max_context_items} items") self.agent.chat_context.truncate(self.max_context_items) logger.info(f"Truncation complete. Final size: {len(self.agent.chat_context.items)} items") except Exception as e: logger.error(f"Error during truncation: {e}", exc_info=True) self.emit("generation_started", { "user_text": user_text, "context_size": len(self.agent.chat_context.items) }) metrics_collector.on_llm_start() metrics_collector.set_llm_input(user_text) first_chunk_received = False agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._emit_user_state(UserState.IDLE) agent_session._emit_agent_state(AgentState.THINKING) async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if llm_chunk_resp.metadata and "usage" in llm_chunk_resp.metadata: metrics_collector.set_llm_usage(llm_chunk_resp.metadata["usage"]) self.emit("usage_tracked", llm_chunk_resp.metadata["usage"]) if self._is_interrupted: logger.info("LLM processing interrupted") break if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent context unavailable, stopping LLM processing") break if not first_chunk_received: first_chunk_received = True metrics_collector.on_llm_first_token() self.emit("first_chunk", {}) if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] metrics_collector.add_function_tool_call( tool_name=func_call["name"], tool_params=func_call["arguments"], ) self.emit("tool_called", { "name": func_call["name"], "arguments": func_call["arguments"] }) chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing while handling function call") return chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call.get("call_id", f"call_{int(time.time())}") ) try: if not self.agent: logger.warning("Agent cleaned up before selecting tool") return tool = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._is_executing_tool = True try: result = await tool(**func_call["arguments"]) if isinstance(result, Agent): new_agent = result current_session = self.agent.session logger.info(f"Switching from {type(self.agent).__name__} to {type(new_agent).__name__}") if getattr(new_agent, 'inherit_context', True): logger.info(f"Inheriting context from {type(self.agent).__name__}") new_agent.chat_context = self.agent.chat_context new_agent.chat_context.add_message( role=ChatRole.SYSTEM, content=new_agent.instructions, replace=True ) new_agent.session = current_session self.agent = new_agent current_session.agent = new_agent if hasattr(current_session.pipeline, 'set_agent'): current_session.pipeline.set_agent(new_agent) if hasattr(new_agent, 'on_enter') and asyncio.iscoroutinefunction(new_agent.on_enter): await new_agent.on_enter() self.emit("agent_switched", { "old_agent": type(result).__name__, "new_agent": type(new_agent).__name__ }) return chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing after tool execution") return chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call.get("call_id", f"call_{int(time.time())}") ) async for new_resp in self.llm.chat( chat_context, tools=self.agent.tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if self._is_interrupted: break if new_resp: self.emit("generation_chunk", { "content": new_resp.content, "metadata": new_resp.metadata }) yield ResponseChunk(new_resp.content, new_resp.metadata, new_resp.role) except Exception as e: logger.error(f"Error executing function {func_call['name']}: {e}") continue finally: if agent_session: agent_session._is_executing_tool = False else: if llm_chunk_resp: self.emit("generation_chunk", { "content": llm_chunk_resp.content, "metadata": llm_chunk_resp.metadata }) yield ResponseChunk(llm_chunk_resp.content, llm_chunk_resp.metadata, llm_chunk_resp.role) if not self._is_interrupted: metrics_collector.on_llm_complete() self.emit("generation_complete", {}) def interrupt(self) -> None: """Interrupt the current generation""" self._is_interrupted = True def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = False async def cancel(self) -> None: """Cancel LLM generation""" if self.llm: try: await self.llm.cancel_current_generation() except Exception as e: logger.error(f"LLM cancellation failed: {e}") async def cleanup(self) -> None: """Cleanup content generation resources""" logger.info("Cleaning up content generation") self.llm = None self.agent = None self.conversational_graph = None logger.info("Content generation cleaned up")Handles LLM processing and content generation.
Events: - generation_started: LLM generation begins - generation_chunk: Streaming chunk received - generation_complete: Generation finished - tool_called: Function tool invoked - agent_switched: Agent switching occurred
Ancestors
- EventEmitter
- typing.Generic
Methods
async def cancel(self) ‑> None-
Expand source code
async def cancel(self) -> None: """Cancel LLM generation""" if self.llm: try: await self.llm.cancel_current_generation() except Exception as e: logger.error(f"LLM cancellation failed: {e}")Cancel LLM generation
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup content generation resources""" logger.info("Cleaning up content generation") self.llm = None self.agent = None self.conversational_graph = None logger.info("Content generation cleaned up")Cleanup content generation resources
async def generate(self, user_text: str) ‑> AsyncIterator[ResponseChunk]-
Expand source code
async def generate(self, user_text: str) -> AsyncIterator[ResponseChunk]: """ Process user text with LLM and yield response chunks. Args: user_text: User input text knowledge_base: Optional knowledge base for context enrichment Yields: ResponseChunk objects with generated content """ async with self.llm_lock: if not self.llm: logger.warning("No LLM available for content generation") return if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent not available for LLM processing") return if self.max_context_items: current_items = len(self.agent.chat_context.items) if current_items > self.max_context_items: try: logger.info(f"Truncating context from {current_items} to {self.max_context_items} items") self.agent.chat_context.truncate(self.max_context_items) logger.info(f"Truncation complete. Final size: {len(self.agent.chat_context.items)} items") except Exception as e: logger.error(f"Error during truncation: {e}", exc_info=True) self.emit("generation_started", { "user_text": user_text, "context_size": len(self.agent.chat_context.items) }) metrics_collector.on_llm_start() metrics_collector.set_llm_input(user_text) first_chunk_received = False agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._emit_user_state(UserState.IDLE) agent_session._emit_agent_state(AgentState.THINKING) async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if llm_chunk_resp.metadata and "usage" in llm_chunk_resp.metadata: metrics_collector.set_llm_usage(llm_chunk_resp.metadata["usage"]) self.emit("usage_tracked", llm_chunk_resp.metadata["usage"]) if self._is_interrupted: logger.info("LLM processing interrupted") break if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent context unavailable, stopping LLM processing") break if not first_chunk_received: first_chunk_received = True metrics_collector.on_llm_first_token() self.emit("first_chunk", {}) if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] metrics_collector.add_function_tool_call( tool_name=func_call["name"], tool_params=func_call["arguments"], ) self.emit("tool_called", { "name": func_call["name"], "arguments": func_call["arguments"] }) chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing while handling function call") return chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call.get("call_id", f"call_{int(time.time())}") ) try: if not self.agent: logger.warning("Agent cleaned up before selecting tool") return tool = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._is_executing_tool = True try: result = await tool(**func_call["arguments"]) if isinstance(result, Agent): new_agent = result current_session = self.agent.session logger.info(f"Switching from {type(self.agent).__name__} to {type(new_agent).__name__}") if getattr(new_agent, 'inherit_context', True): logger.info(f"Inheriting context from {type(self.agent).__name__}") new_agent.chat_context = self.agent.chat_context new_agent.chat_context.add_message( role=ChatRole.SYSTEM, content=new_agent.instructions, replace=True ) new_agent.session = current_session self.agent = new_agent current_session.agent = new_agent if hasattr(current_session.pipeline, 'set_agent'): current_session.pipeline.set_agent(new_agent) if hasattr(new_agent, 'on_enter') and asyncio.iscoroutinefunction(new_agent.on_enter): await new_agent.on_enter() self.emit("agent_switched", { "old_agent": type(result).__name__, "new_agent": type(new_agent).__name__ }) return chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing after tool execution") return chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call.get("call_id", f"call_{int(time.time())}") ) async for new_resp in self.llm.chat( chat_context, tools=self.agent.tools, conversational_graph=self.conversational_graph if self.conversational_graph else None ): if self._is_interrupted: break if new_resp: self.emit("generation_chunk", { "content": new_resp.content, "metadata": new_resp.metadata }) yield ResponseChunk(new_resp.content, new_resp.metadata, new_resp.role) except Exception as e: logger.error(f"Error executing function {func_call['name']}: {e}") continue finally: if agent_session: agent_session._is_executing_tool = False else: if llm_chunk_resp: self.emit("generation_chunk", { "content": llm_chunk_resp.content, "metadata": llm_chunk_resp.metadata }) yield ResponseChunk(llm_chunk_resp.content, llm_chunk_resp.metadata, llm_chunk_resp.role) if not self._is_interrupted: metrics_collector.on_llm_complete() self.emit("generation_complete", {})Process user text with LLM and yield response chunks.
Args
user_text- User input text
knowledge_base- Optional knowledge base for context enrichment
Yields
ResponseChunk objects with generated content
def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: """Interrupt the current generation""" self._is_interrupted = TrueInterrupt the current generation
def reset_interrupt(self) ‑> None-
Expand source code
def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = FalseReset interrupt flag
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start the content generation component""" logger.info("ContentGeneration started")Start the content generation component
Inherited members