Module agents.content_generation
Classes
class ContentGeneration (agent: Agent | None = None,
llm: LLM | None = None,
graph_adapter: Any | None = None,
context_window: Any | None = None)-
Expand source code
class ContentGeneration(EventEmitter[Literal["generation_started", "generation_chunk", "generation_complete", "tool_called", "agent_switched"]]): """ Handles LLM processing and content generation. Events: - generation_started: LLM generation begins - generation_chunk: Streaming chunk received - generation_complete: Generation finished - tool_called: Function tool invoked - agent_switched: Agent switching occurred """ def __init__( self, agent: Agent | None = None, llm: LLM | None = None, graph_adapter: Any | None = None, context_window: Any | None = None, ) -> None: super().__init__() self.agent = agent self.llm = llm self.graph_adapter = graph_adapter self.context_window = context_window self.llm_lock = asyncio.Lock() self._is_interrupted = False @property def max_tool_calls_per_turn(self) -> int: """Get max_tool_calls_per_turn from context_window, or default to 10.""" if self.context_window and hasattr(self.context_window, 'max_tool_calls_per_turn'): return self.context_window.max_tool_calls_per_turn return 10 @staticmethod def _function_call_metadata(func_call: dict) -> dict | None: """Carry the provider's per-call thought signature onto the stored FunctionCall. Providers (e.g. Gemini) return a per-call ``thought_signature`` that must be echoed back with that exact call on later turns. Storing it on the FunctionCall keeps it correct even for repeated calls to the same tool. """ sig = func_call.get("thought_signature") return {"thought_signature": sig} if sig else None @staticmethod def _tool_output_fallback_text(output_item: FunctionCallOutput) -> str: """Last-resort spoken text when the LLM yields nothing after a tool runs. Prefers the tool's own string result (tool authors commonly return a user-facing confirmation); otherwise a generic acknowledgment — so the agent is never left silent after a successful tool call. """ if output_item.is_error: return "Sorry, I ran into a problem completing that." try: decoded = json.loads(output_item.output) except (ValueError, TypeError): decoded = output_item.output if isinstance(decoded, str) and decoded.strip(): return decoded.strip() return "Okay, I've taken care of that." @staticmethod def _safe_tool_kwargs(tool, arguments: Any) -> dict: """Filter model-supplied arguments to the tool's declared parameters. LLMs — small ones especially — sometimes hallucinate or malform tool arguments (extra keys, an empty-string key, a non-dict value). Dropping what the tool doesn't declare keeps one bad call from crashing the turn. """ if not isinstance(arguments, dict): return {} try: allowed = set( build_openai_schema(tool).get("parameters", {}).get("properties", {}) ) except Exception: return arguments dropped = [k for k in arguments if k not in allowed] if dropped: logger.warning( "Dropping unrecognized argument(s) %s for tool '%s'", dropped, get_tool_info(tool).name, ) return {k: v for k, v in arguments.items() if k in allowed} async def start(self) -> None: """Start the content generation component""" logger.info("ContentGeneration started") async def generate(self, user_text: str, context_prefix: str | None = None) -> AsyncIterator[ResponseChunk]: """ Process user text with LLM and yield response chunks. Args: user_text: User input text context_prefix: Optional temporary context (e.g. KB results) injected for this LLM call only, not persisted in chat history. Yields: ResponseChunk objects with generated content """ async with self.llm_lock: if not self.llm: logger.warning("No LLM available for content generation") return if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent not available for LLM processing") return # Context window handles compression + truncation in one call if self.context_window and self.llm: await self.context_window.manage(self.agent.chat_context, self.llm) self.emit("generation_started", { "user_text": user_text, "context_size": len(self.agent.chat_context.items) }) metrics_collector.on_llm_start() metrics_collector.set_llm_input(user_text) first_chunk_received = False _total_tool_calls = 0 _max_total_tool_calls = self.max_tool_calls_per_turn _call_id_counter = 0 _prefix_original_content = None _prefix_target_msg = None if context_prefix: msgs = self.agent.chat_context.messages() _prefix_target_msg = next( (m for m in reversed(msgs) if m.role == ChatRole.USER), None ) if _prefix_target_msg: _prefix_original_content = _prefix_target_msg.content clean_text = ( _prefix_original_content[0] if isinstance(_prefix_original_content, list) else _prefix_original_content ) _prefix_target_msg.content = [f"{context_prefix}\n\nUser: {clean_text}"] agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._emit_user_state(UserState.IDLE) agent_session._emit_agent_state(AgentState.THINKING) async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools, conversational_graph=self.graph_adapter if self.graph_adapter else None ): if llm_chunk_resp.metadata and "usage" in llm_chunk_resp.metadata: metrics_collector.set_llm_usage(llm_chunk_resp.metadata["usage"]) self.emit("usage_tracked", llm_chunk_resp.metadata["usage"]) if self._is_interrupted: logger.info("LLM processing interrupted") break if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent context unavailable, stopping LLM processing") break if not first_chunk_received: first_chunk_received = True metrics_collector.on_llm_first_token() self.emit("first_chunk", {}) if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] _total_tool_calls += 1 if _total_tool_calls > _max_total_tool_calls: logger.warning(f"Tool call limit reached ({_max_total_tool_calls}), skipping") continue logger.info(f"Tool call: {func_call['name']} ({_total_tool_calls}/{_max_total_tool_calls})") metrics_collector.add_function_tool_call( tool_name=func_call["name"], tool_params=func_call["arguments"], ) self.emit("tool_called", { "name": func_call["name"], "arguments": func_call["arguments"] }) chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing while handling function call") return _call_id_counter += 1 func_call_id = func_call.get("call_id") or f"call_{int(time.time())}_{_call_id_counter}" chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call_id, metadata=self._function_call_metadata(func_call) ) try: if not self.agent: logger.warning("Agent cleaned up before selecting tool") return tool = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._is_executing_tool = True try: result = await tool(**self._safe_tool_kwargs(tool, func_call["arguments"])) if isinstance(result, Agent): new_agent = result current_session = self.agent.session logger.info(f"Agent handoff: {type(self.agent).__name__} \u2192 {type(new_agent).__name__}") chat_context.add_function_output( name=func_call["name"], output=f"Transferred to {type(new_agent).__name__}", call_id=func_call_id ) if getattr(new_agent, 'inherit_context', True): new_agent.chat_context = self.agent.chat_context new_agent.chat_context.add_message( role=ChatRole.SYSTEM, content=new_agent.instructions, replace=True ) new_agent.session = current_session self.agent = new_agent current_session.agent = new_agent if hasattr(current_session.pipeline, 'set_agent'): current_session.pipeline.set_agent(new_agent) if hasattr(new_agent, 'on_enter') and asyncio.iscoroutinefunction(new_agent.on_enter): await new_agent.on_enter() self.emit("agent_switched", { "old_agent": type(result).__name__, "new_agent": type(new_agent).__name__ }) return chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing after tool execution") return chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call_id ) max_tool_rounds = _max_total_tool_calls _tool_loop_text_yielded = False for _round in range(max_tool_rounds): logger.debug(f"Post-tool LLM round {_round + 1}/{max_tool_rounds}") pending_calls = [] buffered_text = [] async for new_resp in self.llm.chat( chat_context, tools=self.agent.tools, conversational_graph=self.graph_adapter if self.graph_adapter else None ): if self._is_interrupted: break if not new_resp: continue if new_resp.metadata and "function_call" in new_resp.metadata: _total_tool_calls += 1 if _total_tool_calls > _max_total_tool_calls: logger.warning(f"Tool call limit reached ({_max_total_tool_calls}), stopping") break next_call = new_resp.metadata["function_call"] _call_id_counter += 1 next_call_id = next_call.get("call_id") or f"call_{int(time.time())}_{_call_id_counter}" logger.info(f"Chained tool call: {next_call['name']} (round {_round + 1}, total {_total_tool_calls}/{_max_total_tool_calls})") pending_calls.append((next_call, next_call_id)) elif new_resp.content: buffered_text.append((new_resp.content, new_resp.metadata, new_resp.role)) if not pending_calls and buffered_text: _tool_loop_text_yielded = True for content, metadata, role in buffered_text: self.emit("generation_chunk", {"content": content, "metadata": metadata}) yield ResponseChunk(content, metadata, role) elif pending_calls and buffered_text: logger.debug("Discarding intermediate text (tool calls pending)") if self._is_interrupted or not pending_calls: break for next_call, next_call_id in pending_calls: chat_context.add_function_call( name=next_call["name"], arguments=json.dumps(next_call["arguments"]), call_id=next_call_id, metadata=self._function_call_metadata(next_call) ) if len(pending_calls) == 1: next_call, next_call_id = pending_calls[0] next_tool = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == next_call["name"]), None ) if next_tool: next_result = await next_tool(**self._safe_tool_kwargs(next_tool, next_call["arguments"])) chat_context.add_function_output( name=next_call["name"], output=json.dumps(next_result), call_id=next_call_id ) else: chat_context.add_function_output( name=next_call["name"], output=json.dumps({"error": f"Tool '{next_call['name']}' not found"}), call_id=next_call_id, is_error=True ) else: logger.info(f"Executing {len(pending_calls)} tools in parallel") async def _exec_tool(call_info): nc, nc_id = call_info t = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == nc["name"]), None ) if t: return nc, nc_id, await t(**self._safe_tool_kwargs(t, nc["arguments"])), False return nc, nc_id, {"error": f"Tool '{nc['name']}' not found"}, True results = await asyncio.gather( *[_exec_tool(pc) for pc in pending_calls], return_exceptions=True ) for r in results: if isinstance(r, Exception): logger.error(f"Parallel tool error: {r}") continue nc, nc_id, output, is_err = r chat_context.add_function_output( name=nc["name"], output=json.dumps(output), call_id=nc_id, is_error=is_err ) last_item = chat_context.items[-1] if chat_context.items else None if not self._is_interrupted and not _tool_loop_text_yielded and isinstance(last_item, FunctionCallOutput): logger.info("Tool loop exhausted, forcing final text response") _final_text_yielded = False async for final_resp in self.llm.chat( chat_context, tools=None, conversational_graph=self.graph_adapter if self.graph_adapter else None ): if self._is_interrupted or not final_resp: break if final_resp.content: _final_text_yielded = True self.emit("generation_chunk", { "content": final_resp.content, "metadata": final_resp.metadata }) yield ResponseChunk(final_resp.content, final_resp.metadata, final_resp.role) if not self._is_interrupted and not _final_text_yielded: fallback_text = self._tool_output_fallback_text(last_item) logger.warning( "LLM produced no response after tool '%s'; " "speaking fallback acknowledgment", last_item.name, ) self.emit("generation_chunk", { "content": fallback_text, "metadata": None }) yield ResponseChunk(fallback_text, None, ChatRole.ASSISTANT) if self._is_interrupted: last_item = chat_context.items[-1] if chat_context.items else None if isinstance(last_item, FunctionCallOutput): logger.info(f"Tool execution interrupted after '{last_item.name}', adding closure") msg = chat_context.add_message( role=ChatRole.ASSISTANT, content=f"[Used {last_item.name} tool — response interrupted]" ) msg.interrupted = True except Exception as e: logger.error(f"Error executing function {func_call['name']}: {e}") continue finally: if agent_session: agent_session._is_executing_tool = False if _tool_loop_text_yielded: break else: has_content = llm_chunk_resp and llm_chunk_resp.content has_graph_meta = ( llm_chunk_resp and isinstance(llm_chunk_resp.metadata, dict) and "graph_response" in llm_chunk_resp.metadata ) if has_content or has_graph_meta: self.emit("generation_chunk", { "content": llm_chunk_resp.content, "metadata": llm_chunk_resp.metadata }) yield ResponseChunk(llm_chunk_resp.content, llm_chunk_resp.metadata, llm_chunk_resp.role) if _prefix_target_msg and _prefix_original_content is not None: _prefix_target_msg.content = _prefix_original_content if not self._is_interrupted: metrics_collector.on_llm_complete() self.emit("generation_complete", {}) def interrupt(self) -> None: """Interrupt the current generation""" self._is_interrupted = True def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = False async def cancel(self) -> None: """Cancel LLM generation""" if self.llm: try: await self.llm.cancel_current_generation() except Exception as e: logger.error(f"LLM cancellation failed: {e}") async def cleanup(self) -> None: """Cleanup content generation resources""" logger.info("Cleaning up content generation") self.llm = None self.agent = None self.graph_adapter = None logger.info("Content generation cleaned up")Handles LLM processing and content generation.
Events: - generation_started: LLM generation begins - generation_chunk: Streaming chunk received - generation_complete: Generation finished - tool_called: Function tool invoked - agent_switched: Agent switching occurred
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop max_tool_calls_per_turn : int-
Expand source code
@property def max_tool_calls_per_turn(self) -> int: """Get max_tool_calls_per_turn from context_window, or default to 10.""" if self.context_window and hasattr(self.context_window, 'max_tool_calls_per_turn'): return self.context_window.max_tool_calls_per_turn return 10Get max_tool_calls_per_turn from context_window, or default to 10.
Methods
async def cancel(self) ‑> None-
Expand source code
async def cancel(self) -> None: """Cancel LLM generation""" if self.llm: try: await self.llm.cancel_current_generation() except Exception as e: logger.error(f"LLM cancellation failed: {e}")Cancel LLM generation
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """Cleanup content generation resources""" logger.info("Cleaning up content generation") self.llm = None self.agent = None self.graph_adapter = None logger.info("Content generation cleaned up")Cleanup content generation resources
async def generate(self, user_text: str, context_prefix: str | None = None) ‑> AsyncIterator[ResponseChunk]-
Expand source code
async def generate(self, user_text: str, context_prefix: str | None = None) -> AsyncIterator[ResponseChunk]: """ Process user text with LLM and yield response chunks. Args: user_text: User input text context_prefix: Optional temporary context (e.g. KB results) injected for this LLM call only, not persisted in chat history. Yields: ResponseChunk objects with generated content """ async with self.llm_lock: if not self.llm: logger.warning("No LLM available for content generation") return if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent not available for LLM processing") return # Context window handles compression + truncation in one call if self.context_window and self.llm: await self.context_window.manage(self.agent.chat_context, self.llm) self.emit("generation_started", { "user_text": user_text, "context_size": len(self.agent.chat_context.items) }) metrics_collector.on_llm_start() metrics_collector.set_llm_input(user_text) first_chunk_received = False _total_tool_calls = 0 _max_total_tool_calls = self.max_tool_calls_per_turn _call_id_counter = 0 _prefix_original_content = None _prefix_target_msg = None if context_prefix: msgs = self.agent.chat_context.messages() _prefix_target_msg = next( (m for m in reversed(msgs) if m.role == ChatRole.USER), None ) if _prefix_target_msg: _prefix_original_content = _prefix_target_msg.content clean_text = ( _prefix_original_content[0] if isinstance(_prefix_original_content, list) else _prefix_original_content ) _prefix_target_msg.content = [f"{context_prefix}\n\nUser: {clean_text}"] agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._emit_user_state(UserState.IDLE) agent_session._emit_agent_state(AgentState.THINKING) async for llm_chunk_resp in self.llm.chat( self.agent.chat_context, tools=self.agent._tools, conversational_graph=self.graph_adapter if self.graph_adapter else None ): if llm_chunk_resp.metadata and "usage" in llm_chunk_resp.metadata: metrics_collector.set_llm_usage(llm_chunk_resp.metadata["usage"]) self.emit("usage_tracked", llm_chunk_resp.metadata["usage"]) if self._is_interrupted: logger.info("LLM processing interrupted") break if not self.agent or not getattr(self.agent, "chat_context", None): logger.warning("Agent context unavailable, stopping LLM processing") break if not first_chunk_received: first_chunk_received = True metrics_collector.on_llm_first_token() self.emit("first_chunk", {}) if llm_chunk_resp.metadata and "function_call" in llm_chunk_resp.metadata: func_call = llm_chunk_resp.metadata["function_call"] _total_tool_calls += 1 if _total_tool_calls > _max_total_tool_calls: logger.warning(f"Tool call limit reached ({_max_total_tool_calls}), skipping") continue logger.info(f"Tool call: {func_call['name']} ({_total_tool_calls}/{_max_total_tool_calls})") metrics_collector.add_function_tool_call( tool_name=func_call["name"], tool_params=func_call["arguments"], ) self.emit("tool_called", { "name": func_call["name"], "arguments": func_call["arguments"] }) chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing while handling function call") return _call_id_counter += 1 func_call_id = func_call.get("call_id") or f"call_{int(time.time())}_{_call_id_counter}" chat_context.add_function_call( name=func_call["name"], arguments=json.dumps(func_call["arguments"]), call_id=func_call_id, metadata=self._function_call_metadata(func_call) ) try: if not self.agent: logger.warning("Agent cleaned up before selecting tool") return tool = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == func_call["name"]), None ) except Exception as e: logger.error(f"Error while selecting tool: {e}") continue if tool: agent_session = getattr(self.agent, "session", None) if agent_session: agent_session._is_executing_tool = True try: result = await tool(**self._safe_tool_kwargs(tool, func_call["arguments"])) if isinstance(result, Agent): new_agent = result current_session = self.agent.session logger.info(f"Agent handoff: {type(self.agent).__name__} \u2192 {type(new_agent).__name__}") chat_context.add_function_output( name=func_call["name"], output=f"Transferred to {type(new_agent).__name__}", call_id=func_call_id ) if getattr(new_agent, 'inherit_context', True): new_agent.chat_context = self.agent.chat_context new_agent.chat_context.add_message( role=ChatRole.SYSTEM, content=new_agent.instructions, replace=True ) new_agent.session = current_session self.agent = new_agent current_session.agent = new_agent if hasattr(current_session.pipeline, 'set_agent'): current_session.pipeline.set_agent(new_agent) if hasattr(new_agent, 'on_enter') and asyncio.iscoroutinefunction(new_agent.on_enter): await new_agent.on_enter() self.emit("agent_switched", { "old_agent": type(result).__name__, "new_agent": type(new_agent).__name__ }) return chat_context = getattr(self.agent, "chat_context", None) if not chat_context: logger.warning("Chat context missing after tool execution") return chat_context.add_function_output( name=func_call["name"], output=json.dumps(result), call_id=func_call_id ) max_tool_rounds = _max_total_tool_calls _tool_loop_text_yielded = False for _round in range(max_tool_rounds): logger.debug(f"Post-tool LLM round {_round + 1}/{max_tool_rounds}") pending_calls = [] buffered_text = [] async for new_resp in self.llm.chat( chat_context, tools=self.agent.tools, conversational_graph=self.graph_adapter if self.graph_adapter else None ): if self._is_interrupted: break if not new_resp: continue if new_resp.metadata and "function_call" in new_resp.metadata: _total_tool_calls += 1 if _total_tool_calls > _max_total_tool_calls: logger.warning(f"Tool call limit reached ({_max_total_tool_calls}), stopping") break next_call = new_resp.metadata["function_call"] _call_id_counter += 1 next_call_id = next_call.get("call_id") or f"call_{int(time.time())}_{_call_id_counter}" logger.info(f"Chained tool call: {next_call['name']} (round {_round + 1}, total {_total_tool_calls}/{_max_total_tool_calls})") pending_calls.append((next_call, next_call_id)) elif new_resp.content: buffered_text.append((new_resp.content, new_resp.metadata, new_resp.role)) if not pending_calls and buffered_text: _tool_loop_text_yielded = True for content, metadata, role in buffered_text: self.emit("generation_chunk", {"content": content, "metadata": metadata}) yield ResponseChunk(content, metadata, role) elif pending_calls and buffered_text: logger.debug("Discarding intermediate text (tool calls pending)") if self._is_interrupted or not pending_calls: break for next_call, next_call_id in pending_calls: chat_context.add_function_call( name=next_call["name"], arguments=json.dumps(next_call["arguments"]), call_id=next_call_id, metadata=self._function_call_metadata(next_call) ) if len(pending_calls) == 1: next_call, next_call_id = pending_calls[0] next_tool = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == next_call["name"]), None ) if next_tool: next_result = await next_tool(**self._safe_tool_kwargs(next_tool, next_call["arguments"])) chat_context.add_function_output( name=next_call["name"], output=json.dumps(next_result), call_id=next_call_id ) else: chat_context.add_function_output( name=next_call["name"], output=json.dumps({"error": f"Tool '{next_call['name']}' not found"}), call_id=next_call_id, is_error=True ) else: logger.info(f"Executing {len(pending_calls)} tools in parallel") async def _exec_tool(call_info): nc, nc_id = call_info t = next( (t for t in self.agent.tools if is_function_tool(t) and get_tool_info(t).name == nc["name"]), None ) if t: return nc, nc_id, await t(**self._safe_tool_kwargs(t, nc["arguments"])), False return nc, nc_id, {"error": f"Tool '{nc['name']}' not found"}, True results = await asyncio.gather( *[_exec_tool(pc) for pc in pending_calls], return_exceptions=True ) for r in results: if isinstance(r, Exception): logger.error(f"Parallel tool error: {r}") continue nc, nc_id, output, is_err = r chat_context.add_function_output( name=nc["name"], output=json.dumps(output), call_id=nc_id, is_error=is_err ) last_item = chat_context.items[-1] if chat_context.items else None if not self._is_interrupted and not _tool_loop_text_yielded and isinstance(last_item, FunctionCallOutput): logger.info("Tool loop exhausted, forcing final text response") _final_text_yielded = False async for final_resp in self.llm.chat( chat_context, tools=None, conversational_graph=self.graph_adapter if self.graph_adapter else None ): if self._is_interrupted or not final_resp: break if final_resp.content: _final_text_yielded = True self.emit("generation_chunk", { "content": final_resp.content, "metadata": final_resp.metadata }) yield ResponseChunk(final_resp.content, final_resp.metadata, final_resp.role) if not self._is_interrupted and not _final_text_yielded: fallback_text = self._tool_output_fallback_text(last_item) logger.warning( "LLM produced no response after tool '%s'; " "speaking fallback acknowledgment", last_item.name, ) self.emit("generation_chunk", { "content": fallback_text, "metadata": None }) yield ResponseChunk(fallback_text, None, ChatRole.ASSISTANT) if self._is_interrupted: last_item = chat_context.items[-1] if chat_context.items else None if isinstance(last_item, FunctionCallOutput): logger.info(f"Tool execution interrupted after '{last_item.name}', adding closure") msg = chat_context.add_message( role=ChatRole.ASSISTANT, content=f"[Used {last_item.name} tool — response interrupted]" ) msg.interrupted = True except Exception as e: logger.error(f"Error executing function {func_call['name']}: {e}") continue finally: if agent_session: agent_session._is_executing_tool = False if _tool_loop_text_yielded: break else: has_content = llm_chunk_resp and llm_chunk_resp.content has_graph_meta = ( llm_chunk_resp and isinstance(llm_chunk_resp.metadata, dict) and "graph_response" in llm_chunk_resp.metadata ) if has_content or has_graph_meta: self.emit("generation_chunk", { "content": llm_chunk_resp.content, "metadata": llm_chunk_resp.metadata }) yield ResponseChunk(llm_chunk_resp.content, llm_chunk_resp.metadata, llm_chunk_resp.role) if _prefix_target_msg and _prefix_original_content is not None: _prefix_target_msg.content = _prefix_original_content if not self._is_interrupted: metrics_collector.on_llm_complete() self.emit("generation_complete", {})Process user text with LLM and yield response chunks.
Args
user_text- User input text
context_prefix- Optional temporary context (e.g. KB results) injected for this LLM call only, not persisted in chat history.
Yields
ResponseChunk objects with generated content
def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: """Interrupt the current generation""" self._is_interrupted = TrueInterrupt the current generation
def reset_interrupt(self) ‑> None-
Expand source code
def reset_interrupt(self) -> None: """Reset interrupt flag""" self._is_interrupted = FalseReset interrupt flag
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start the content generation component""" logger.info("ContentGeneration started")Start the content generation component
Inherited members